Database Handler Development
Learn how to create custom database handlers to support new data sources and repositories in Celline.
Database Handler Architecture
Handler Interface
All database handlers must implement the DatabaseHandler
interface:
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from celline.DB.dev.model import SampleSchema, RunSchema
class DatabaseHandler(ABC):
"""Base interface for database handlers"""
@abstractmethod
def can_handle(self, sample_id: str) -> bool:
"""Check if this handler can process the given sample ID"""
pass
@abstractmethod
def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
"""Fetch sample metadata from the database"""
pass
@abstractmethod
def fetch_run_metadata(self, run_id: str) -> Optional[RunSchema]:
"""Fetch run metadata from the database"""
pass
@abstractmethod
def add(self, sample_id: str, force_search: bool = False) -> bool:
"""Add sample to local database"""
pass
def validate_sample_id(self, sample_id: str) -> bool:
"""Validate sample ID format (optional override)"""
return True
def get_download_urls(self, sample_id: str) -> List[str]:
"""Get download URLs for sample data (optional override)"""
return []
Data Models
Understand the core data models used in Celline:
from dataclasses import dataclass
from typing import Optional
@dataclass
class SampleSchema:
"""Sample metadata schema"""
key: str # Primary identifier
title: Optional[str] # Sample title/description
organism: Optional[str] # Organism name
tissue: Optional[str] # Tissue type
cell_type: Optional[str] # Cell type
library_strategy: Optional[str] # Sequencing strategy
library_source: Optional[str] # Library source
library_selection: Optional[str] # Library selection
platform: Optional[str] # Sequencing platform
instrument: Optional[str] # Sequencing instrument
parent: Optional[str] # Parent study/project ID
children: Optional[str] # Child run IDs (comma-separated)
species: Optional[str] # Species name
date: Optional[str] # Sample date
@dataclass
class RunSchema:
"""Run metadata schema"""
key: str # Primary identifier
title: Optional[str] # Run title
strategy: Optional[str] # Sequencing strategy
raw_link: Optional[str] # Download links (comma-separated)
parent: Optional[str] # Parent sample ID
file_type: Optional[str] # File type (fastq, bam, etc.)
file_size: Optional[int] # File size in bytes
Creating a Custom Handler
Step 1: Basic Handler Template
# mypackage/handlers/custom_handler.py
import requests
import re
from typing import Optional, List, Dict, Any
from celline.DB.dev.handler import DatabaseHandler
from celline.DB.dev.model import SampleSchema, RunSchema
from celline.log.logger import get_logger
class CustomDatabaseHandler(DatabaseHandler):
"""
Custom database handler for MyCustomDB
This handler supports sample IDs in the format: CUSTOM[0-9]{6}
Example: CUSTOM123456
"""
def __init__(self, api_base_url: str = "https://api.mycustomdb.org/v1"):
"""
Initialize the handler
Args:
api_base_url: Base URL for the custom database API
"""
self.api_base_url = api_base_url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
'Accept': 'application/json',
'User-Agent': 'Celline-CustomHandler/1.0'
})
self.logger = get_logger(__name__)
# Cache for metadata to avoid repeated API calls
self._metadata_cache: Dict[str, Any] = {}
# Validate API connection
self._validate_api_connection()
def _validate_api_connection(self):
"""Validate connection to the custom database API"""
try:
response = self.session.get(f"{self.api_base_url}/status", timeout=10)
response.raise_for_status()
self.logger.info("Custom database API connection validated")
except requests.RequestException as e:
self.logger.warning(f"Cannot connect to custom database API: {e}")
def can_handle(self, sample_id: str) -> bool:
"""
Check if this handler can process the sample ID
Args:
sample_id: Sample identifier to check
Returns:
True if this handler can process the sample ID
"""
# Custom ID pattern: CUSTOM followed by 6 digits
pattern = r'^CUSTOM\d{6}$'
return bool(re.match(pattern, sample_id))
def validate_sample_id(self, sample_id: str) -> bool:
"""
Validate sample ID format
Args:
sample_id: Sample identifier to validate
Returns:
True if sample ID is valid
"""
if not self.can_handle(sample_id):
return False
# Additional validation if needed
numeric_part = sample_id[6:] # Remove 'CUSTOM' prefix
return numeric_part.isdigit() and len(numeric_part) == 6
def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
"""
Fetch sample metadata from the custom database
Args:
sample_id: Sample identifier
Returns:
SampleSchema object or None if not found
"""
if not self.can_handle(sample_id):
self.logger.error(f"Cannot handle sample ID: {sample_id}")
return None
# Check cache first
if sample_id in self._metadata_cache:
self.logger.debug(f"Using cached metadata for {sample_id}")
return self._create_sample_schema(sample_id, self._metadata_cache[sample_id])
try:
# Fetch from API
url = f"{self.api_base_url}/samples/{sample_id}"
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
# Cache the result
self._metadata_cache[sample_id] = data
# Convert to SampleSchema
return self._create_sample_schema(sample_id, data)
except requests.HTTPError as e:
if e.response.status_code == 404:
self.logger.warning(f"Sample not found: {sample_id}")
else:
self.logger.error(f"HTTP error fetching sample {sample_id}: {e}")
return None
except requests.RequestException as e:
self.logger.error(f"Network error fetching sample {sample_id}: {e}")
return None
except (KeyError, ValueError) as e:
self.logger.error(f"Invalid response format for sample {sample_id}: {e}")
return None
def _create_sample_schema(self, sample_id: str, data: Dict[str, Any]) -> SampleSchema:
"""
Create SampleSchema from API response data
Args:
sample_id: Sample identifier
data: Raw API response data
Returns:
SampleSchema object
"""
return SampleSchema(
key=sample_id,
title=data.get('title', ''),
organism=data.get('organism', ''),
tissue=data.get('tissue_type', ''),
cell_type=data.get('cell_type', ''),
library_strategy=data.get('library_strategy', ''),
library_source=data.get('library_source', ''),
library_selection=data.get('library_selection', ''),
platform=data.get('platform', ''),
instrument=data.get('instrument_model', ''),
parent=data.get('study_id', ''),
children=','.join(data.get('run_ids', [])),
species=data.get('species', data.get('organism', '')),
date=data.get('collection_date', '')
)
def fetch_run_metadata(self, run_id: str) -> Optional[RunSchema]:
"""
Fetch run metadata from the custom database
Args:
run_id: Run identifier
Returns:
RunSchema object or None if not found
"""
try:
url = f"{self.api_base_url}/runs/{run_id}"
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
return self._create_run_schema(run_id, data)
except requests.RequestException as e:
self.logger.error(f"Error fetching run {run_id}: {e}")
return None
def _create_run_schema(self, run_id: str, data: Dict[str, Any]) -> RunSchema:
"""
Create RunSchema from API response data
Args:
run_id: Run identifier
data: Raw API response data
Returns:
RunSchema object
"""
# Extract download URLs
file_urls = data.get('file_urls', [])
if isinstance(file_urls, list):
raw_links = ','.join(file_urls)
else:
raw_links = str(file_urls)
return RunSchema(
key=run_id,
title=data.get('title', ''),
strategy=data.get('library_strategy', ''),
raw_link=raw_links,
parent=data.get('sample_id', ''),
file_type=data.get('file_type', 'fastq'),
file_size=data.get('file_size', 0)
)
def add(self, sample_id: str, force_search: bool = False) -> bool:
"""
Add sample to local database
Args:
sample_id: Sample identifier to add
force_search: Force refetch even if already cached
Returns:
True if successfully added
"""
self.logger.info(f"Adding sample {sample_id} to local database")
# Clear cache if force search
if force_search and sample_id in self._metadata_cache:
del self._metadata_cache[sample_id]
# Fetch sample metadata
sample_schema = self.fetch_sample_metadata(sample_id)
if sample_schema is None:
self.logger.error(f"Could not fetch metadata for sample: {sample_id}")
return False
# Save to local database
success = self._save_to_local_database(sample_schema)
if success:
# Also fetch and save run metadata
if sample_schema.children:
run_ids = sample_schema.children.split(',')
for run_id in run_ids:
run_id = run_id.strip()
if run_id:
run_schema = self.fetch_run_metadata(run_id)
if run_schema:
self._save_run_to_local_database(run_schema)
return success
def _save_to_local_database(self, schema: SampleSchema) -> bool:
"""
Save sample schema to local Parquet database
Args:
schema: SampleSchema to save
Returns:
True if successfully saved
"""
try:
import polars as pl
import os
from celline.config import Config
# Define database file path
db_file = os.path.join(Config.EXEC_ROOT, "DB", "CUSTOM_SAMPLES.parquet")
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(db_file), exist_ok=True)
# Convert schema to dictionary
sample_data = {
'key': schema.key,
'title': schema.title,
'organism': schema.organism,
'tissue': schema.tissue,
'cell_type': schema.cell_type,
'library_strategy': schema.library_strategy,
'library_source': schema.library_source,
'library_selection': schema.library_selection,
'platform': schema.platform,
'instrument': schema.instrument,
'parent': schema.parent,
'children': schema.children,
'species': schema.species,
'date': schema.date
}
# Load existing data or create new DataFrame
if os.path.exists(db_file):
existing_df = pl.read_parquet(db_file)
# Check if sample already exists
existing_keys = existing_df.get_column('key').to_list()
if schema.key in existing_keys:
# Update existing record
updated_df = existing_df.filter(pl.col('key') != schema.key)
new_record = pl.DataFrame([sample_data])
final_df = pl.concat([updated_df, new_record])
else:
# Add new record
new_record = pl.DataFrame([sample_data])
final_df = pl.concat([existing_df, new_record])
else:
# Create new database
final_df = pl.DataFrame([sample_data])
# Save to file
final_df.write_parquet(db_file)
self.logger.info(f"Successfully saved sample {schema.key} to local database")
return True
except Exception as e:
self.logger.error(f"Error saving sample {schema.key} to database: {e}")
return False
def _save_run_to_local_database(self, schema: RunSchema) -> bool:
"""
Save run schema to local Parquet database
Args:
schema: RunSchema to save
Returns:
True if successfully saved
"""
try:
import polars as pl
import os
from celline.config import Config
# Define database file path
db_file = os.path.join(Config.EXEC_ROOT, "DB", "CUSTOM_RUNS.parquet")
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(db_file), exist_ok=True)
# Convert schema to dictionary
run_data = {
'key': schema.key,
'title': schema.title,
'strategy': schema.strategy,
'raw_link': schema.raw_link,
'parent': schema.parent,
'file_type': schema.file_type,
'file_size': schema.file_size
}
# Load existing data or create new DataFrame
if os.path.exists(db_file):
existing_df = pl.read_parquet(db_file)
# Check if run already exists
existing_keys = existing_df.get_column('key').to_list()
if schema.key in existing_keys:
# Update existing record
updated_df = existing_df.filter(pl.col('key') != schema.key)
new_record = pl.DataFrame([run_data])
final_df = pl.concat([updated_df, new_record])
else:
# Add new record
new_record = pl.DataFrame([run_data])
final_df = pl.concat([existing_df, new_record])
else:
# Create new database
final_df = pl.DataFrame([run_data])
# Save to file
final_df.write_parquet(db_file)
self.logger.debug(f"Successfully saved run {schema.key} to local database")
return True
except Exception as e:
self.logger.error(f"Error saving run {schema.key} to database: {e}")
return False
def get_download_urls(self, sample_id: str) -> List[str]:
"""
Get download URLs for sample data
Args:
sample_id: Sample identifier
Returns:
List of download URLs
"""
sample_schema = self.fetch_sample_metadata(sample_id)
if not sample_schema or not sample_schema.children:
return []
urls = []
run_ids = sample_schema.children.split(',')
for run_id in run_ids:
run_id = run_id.strip()
if run_id:
run_schema = self.fetch_run_metadata(run_id)
if run_schema and run_schema.raw_link:
run_urls = run_schema.raw_link.split(',')
urls.extend([url.strip() for url in run_urls if url.strip()])
return urls
def search_samples(self, query: Dict[str, Any]) -> List[SampleSchema]:
"""
Search samples by various criteria
Args:
query: Search parameters
Returns:
List of matching SampleSchema objects
"""
try:
url = f"{self.api_base_url}/samples/search"
response = self.session.post(url, json=query, timeout=30)
response.raise_for_status()
data = response.json()
samples = data.get('samples', [])
results = []
for sample_data in samples:
sample_id = sample_data.get('id', '')
if sample_id and self.can_handle(sample_id):
schema = self._create_sample_schema(sample_id, sample_data)
results.append(schema)
self.logger.info(f"Found {len(results)} samples matching query")
return results
except requests.RequestException as e:
self.logger.error(f"Error searching samples: {e}")
return []
Step 2: Advanced Handler Features
Authentication Support
class AuthenticatedCustomHandler(CustomDatabaseHandler):
"""Custom handler with authentication support"""
def __init__(self, api_base_url: str, api_key: Optional[str] = None,
username: Optional[str] = None, password: Optional[str] = None):
self.api_key = api_key
self.username = username
self.password = password
super().__init__(api_base_url)
# Setup authentication
self._setup_authentication()
def _setup_authentication(self):
"""Setup authentication for API requests"""
if self.api_key:
# API key authentication
self.session.headers.update({
'Authorization': f'Bearer {self.api_key}'
})
elif self.username and self.password:
# Basic authentication
from requests.auth import HTTPBasicAuth
self.session.auth = HTTPBasicAuth(self.username, self.password)
# Validate authentication
self._validate_authentication()
def _validate_authentication(self):
"""Validate authentication credentials"""
try:
response = self.session.get(f"{self.api_base_url}/auth/validate", timeout=10)
if response.status_code == 200:
self.logger.info("Authentication validated successfully")
elif response.status_code == 401:
self.logger.error("Authentication failed - invalid credentials")
raise ValueError("Invalid authentication credentials")
else:
self.logger.warning(f"Authentication validation returned status {response.status_code}")
except requests.RequestException as e:
self.logger.warning(f"Could not validate authentication: {e}")
Rate Limiting Support
import time
from functools import wraps
class RateLimitedCustomHandler(CustomDatabaseHandler):
"""Custom handler with rate limiting support"""
def __init__(self, api_base_url: str, requests_per_second: float = 2.0):
self.requests_per_second = requests_per_second
self.min_request_interval = 1.0 / requests_per_second
self.last_request_time = 0.0
super().__init__(api_base_url)
def _rate_limited_request(self, method: str, url: str, **kwargs):
"""Make rate-limited request"""
# Wait if necessary to respect rate limit
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.min_request_interval:
sleep_time = self.min_request_interval - time_since_last
time.sleep(sleep_time)
# Make request
response = self.session.request(method, url, **kwargs)
self.last_request_time = time.time()
return response
def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
"""Rate-limited sample metadata fetching"""
if not self.can_handle(sample_id):
return None
# Check cache first
if sample_id in self._metadata_cache:
return self._create_sample_schema(sample_id, self._metadata_cache[sample_id])
try:
url = f"{self.api_base_url}/samples/{sample_id}"
response = self._rate_limited_request('GET', url, timeout=30)
response.raise_for_status()
data = response.json()
self._metadata_cache[sample_id] = data
return self._create_sample_schema(sample_id, data)
except requests.RequestException as e:
self.logger.error(f"Error fetching sample {sample_id}: {e}")
return None
Caching and Persistence
import pickle
import hashlib
from pathlib import Path
class CachedCustomHandler(CustomDatabaseHandler):
"""Custom handler with persistent caching"""
def __init__(self, api_base_url: str, cache_dir: Optional[str] = None,
cache_ttl: int = 3600):
self.cache_ttl = cache_ttl # Cache time-to-live in seconds
if cache_dir:
self.cache_dir = Path(cache_dir)
else:
from celline.config import Config
self.cache_dir = Path(Config.PROJ_ROOT) / ".cache" / "custom_handler"
self.cache_dir.mkdir(parents=True, exist_ok=True)
super().__init__(api_base_url)
def _get_cache_file(self, key: str) -> Path:
"""Get cache file path for given key"""
# Create hash of key for filename
key_hash = hashlib.md5(key.encode()).hexdigest()
return self.cache_dir / f"{key_hash}.cache"
def _is_cache_valid(self, cache_file: Path) -> bool:
"""Check if cache file is still valid"""
if not cache_file.exists():
return False
# Check age
file_age = time.time() - cache_file.stat().st_mtime
return file_age < self.cache_ttl
def _load_from_cache(self, key: str) -> Optional[Dict[str, Any]]:
"""Load data from cache"""
cache_file = self._get_cache_file(key)
if not self._is_cache_valid(cache_file):
return None
try:
with open(cache_file, 'rb') as f:
return pickle.load(f)
except Exception as e:
self.logger.warning(f"Error loading cache for {key}: {e}")
return None
def _save_to_cache(self, key: str, data: Dict[str, Any]):
"""Save data to cache"""
cache_file = self._get_cache_file(key)
try:
with open(cache_file, 'wb') as f:
pickle.dump(data, f)
except Exception as e:
self.logger.warning(f"Error saving cache for {key}: {e}")
def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
"""Fetch with persistent caching"""
if not self.can_handle(sample_id):
return None
# Try cache first
cached_data = self._load_from_cache(f"sample_{sample_id}")
if cached_data:
self.logger.debug(f"Using cached metadata for {sample_id}")
return self._create_sample_schema(sample_id, cached_data)
# Fetch from API
try:
url = f"{self.api_base_url}/samples/{sample_id}"
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
# Save to cache
self._save_to_cache(f"sample_{sample_id}", data)
return self._create_sample_schema(sample_id, data)
except requests.RequestException as e:
self.logger.error(f"Error fetching sample {sample_id}: {e}")
return None
Handler Registration
Step 3: Register Your Handler
# mypackage/handlers/registry.py
from celline.DB.dev.handler import HandleResolver
from .custom_handler import CustomDatabaseHandler
def register_custom_handlers():
"""Register all custom database handlers"""
# Create handler instance
custom_handler = CustomDatabaseHandler(
api_base_url="https://api.mycustomdb.org/v1"
)
# Register with Celline
HandleResolver.register_handler(custom_handler)
print("Custom database handlers registered successfully")
# Auto-register when package is imported
register_custom_handlers()
Configuration-Based Registration
# mypackage/config.py
import os
from typing import Dict, Any
class HandlerConfig:
"""Configuration for custom handlers"""
@classmethod
def get_custom_db_config(cls) -> Dict[str, Any]:
"""Get configuration for custom database handler"""
return {
'api_base_url': os.getenv('CUSTOM_DB_API_URL', 'https://api.mycustomdb.org/v1'),
'api_key': os.getenv('CUSTOM_DB_API_KEY'),
'requests_per_second': float(os.getenv('CUSTOM_DB_RATE_LIMIT', '2.0')),
'cache_ttl': int(os.getenv('CUSTOM_DB_CACHE_TTL', '3600'))
}
# mypackage/handlers/factory.py
from .custom_handler import CustomDatabaseHandler
from .authenticated_handler import AuthenticatedCustomHandler
from .rate_limited_handler import RateLimitedCustomHandler
from .cached_handler import CachedCustomHandler
from ..config import HandlerConfig
def create_custom_handler() -> CustomDatabaseHandler:
"""Factory function to create configured handler"""
config = HandlerConfig.get_custom_db_config()
# Choose handler based on configuration
if config.get('api_key'):
return AuthenticatedCustomHandler(
api_base_url=config['api_base_url'],
api_key=config['api_key']
)
elif config.get('requests_per_second') < 10:
return RateLimitedCustomHandler(
api_base_url=config['api_base_url'],
requests_per_second=config['requests_per_second']
)
else:
return CachedCustomHandler(
api_base_url=config['api_base_url'],
cache_ttl=config['cache_ttl']
)
Testing Database Handlers
Unit Tests
# tests/test_custom_handler.py
import pytest
import responses
from unittest.mock import patch, Mock
from mypackage.handlers.custom_handler import CustomDatabaseHandler
class TestCustomDatabaseHandler:
@pytest.fixture
def handler(self):
"""Create handler instance for testing"""
with patch.object(CustomDatabaseHandler, '_validate_api_connection'):
return CustomDatabaseHandler("https://api.test.com/v1")
def test_can_handle_valid_ids(self, handler):
"""Test sample ID validation"""
assert handler.can_handle("CUSTOM123456")
assert handler.can_handle("CUSTOM000001")
assert not handler.can_handle("GSM123456")
assert not handler.can_handle("CUSTOM12345") # Too short
assert not handler.can_handle("CUSTOM1234567") # Too long
assert not handler.can_handle("CUSTOMA23456") # Contains letter
@responses.activate
def test_fetch_sample_metadata_success(self, handler):
"""Test successful metadata fetching"""
sample_id = "CUSTOM123456"
# Mock API response
responses.add(
responses.GET,
f"https://api.test.com/v1/samples/{sample_id}",
json={
'id': sample_id,
'title': 'Test Sample',
'organism': 'Homo sapiens',
'tissue_type': 'brain',
'library_strategy': 'RNA-Seq',
'platform': 'Illumina',
'study_id': 'STUDY001',
'run_ids': ['RUN001', 'RUN002']
},
status=200
)
# Test fetch
schema = handler.fetch_sample_metadata(sample_id)
assert schema is not None
assert schema.key == sample_id
assert schema.title == 'Test Sample'
assert schema.organism == 'Homo sapiens'
assert schema.tissue == 'brain'
assert schema.children == 'RUN001,RUN002'
@responses.activate
def test_fetch_sample_metadata_not_found(self, handler):
"""Test handling of 404 responses"""
sample_id = "CUSTOM999999"
responses.add(
responses.GET,
f"https://api.test.com/v1/samples/{sample_id}",
status=404
)
schema = handler.fetch_sample_metadata(sample_id)
assert schema is None
@patch('mypackage.handlers.custom_handler.pl.read_parquet')
@patch('mypackage.handlers.custom_handler.pl.DataFrame.write_parquet')
def test_save_to_local_database(self, mock_write, mock_read, handler):
"""Test saving to local database"""
from celline.DB.dev.model import SampleSchema
# Mock existing data
mock_read.return_value = Mock()
mock_read.return_value.get_column.return_value.to_list.return_value = []
# Create test schema
schema = SampleSchema(
key="CUSTOM123456",
title="Test Sample",
organism="Homo sapiens",
tissue="brain",
cell_type=None,
library_strategy="RNA-Seq",
library_source=None,
library_selection=None,
platform="Illumina",
instrument=None,
parent="STUDY001",
children="RUN001,RUN002",
species="Homo sapiens",
date="2024-01-01"
)
# Test save
result = handler._save_to_local_database(schema)
assert result is True
mock_write.assert_called_once()
Integration Tests
# tests/integration/test_custom_handler_integration.py
import pytest
from unittest.mock import patch
import tempfile
import os
from mypackage.handlers.custom_handler import CustomDatabaseHandler
class TestCustomHandlerIntegration:
@pytest.fixture
def temp_db_dir(self):
"""Create temporary database directory"""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.integration
def test_full_sample_addition_workflow(self, temp_db_dir):
"""Test complete sample addition workflow"""
# Setup handler with temporary database
with patch('celline.config.Config') as mock_config:
mock_config.EXEC_ROOT = temp_db_dir
handler = CustomDatabaseHandler("https://api.test.com/v1")
# Mock API responses
with patch.object(handler, 'fetch_sample_metadata') as mock_fetch_sample, \
patch.object(handler, 'fetch_run_metadata') as mock_fetch_run:
# Setup mock responses
from celline.DB.dev.model import SampleSchema, RunSchema
mock_fetch_sample.return_value = SampleSchema(
key="CUSTOM123456",
title="Integration Test Sample",
organism="Homo sapiens",
tissue="brain",
cell_type=None,
library_strategy="RNA-Seq",
library_source="TRANSCRIPTOMIC",
library_selection="cDNA",
platform="Illumina",
instrument="HiSeq 2500",
parent="STUDY001",
children="RUN001",
species="Homo sapiens",
date="2024-01-01"
)
mock_fetch_run.return_value = RunSchema(
key="RUN001",
title="Test Run",
strategy="RNA-Seq",
raw_link="https://data.test.com/RUN001.fastq.gz",
parent="CUSTOM123456",
file_type="fastq",
file_size=1000000
)
# Test addition
result = handler.add("CUSTOM123456")
assert result is True
# Verify database files were created
sample_db = os.path.join(temp_db_dir, "DB", "CUSTOM_SAMPLES.parquet")
run_db = os.path.join(temp_db_dir, "DB", "CUSTOM_RUNS.parquet")
assert os.path.exists(sample_db)
assert os.path.exists(run_db)
# Verify data content
import polars as pl
sample_df = pl.read_parquet(sample_db)
assert len(sample_df) == 1
assert sample_df.get_column('key')[0] == "CUSTOM123456"
run_df = pl.read_parquet(run_db)
assert len(run_df) == 1
assert run_df.get_column('key')[0] == "RUN001"
Best Practices
Error Handling
class RobustCustomHandler(CustomDatabaseHandler):
"""Handler with comprehensive error handling"""
def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
"""Robust metadata fetching with comprehensive error handling"""
if not self.can_handle(sample_id):
self.logger.error(f"Handler cannot process sample ID: {sample_id}")
return None
try:
return self._fetch_with_retry(sample_id)
except Exception as e:
self.logger.exception(f"Unexpected error fetching sample {sample_id}: {e}")
return None
def _fetch_with_retry(self, sample_id: str, max_retries: int = 3) -> Optional[SampleSchema]:
"""Fetch with exponential backoff retry"""
for attempt in range(max_retries):
try:
url = f"{self.api_base_url}/samples/{sample_id}"
response = self.session.get(url, timeout=30)
response.raise_for_status()
data = response.json()
return self._create_sample_schema(sample_id, data)
except requests.HTTPError as e:
if e.response.status_code == 404:
# Don't retry for 404s
self.logger.warning(f"Sample not found: {sample_id}")
return None
elif e.response.status_code >= 500:
# Server error - retry
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
self.logger.warning(f"Server error, retrying in {wait_time}s...")
time.sleep(wait_time)
continue
else:
self.logger.error(f"Server error persists for {sample_id}: {e}")
return None
else:
# Client error - don't retry
self.logger.error(f"Client error for {sample_id}: {e}")
return None
except requests.RequestException as e:
# Network error - retry
if attempt < max_retries - 1:
wait_time = 2 ** attempt
self.logger.warning(f"Network error, retrying in {wait_time}s...")
time.sleep(wait_time)
continue
else:
self.logger.error(f"Network error persists for {sample_id}: {e}")
return None
return None
Performance Optimization
class OptimizedCustomHandler(CustomDatabaseHandler):
"""Handler optimized for performance"""
def __init__(self, api_base_url: str, max_connections: int = 10):
super().__init__(api_base_url)
# Configure session for performance
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"],
backoff_factor=1
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=max_connections,
pool_maxsize=max_connections
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def batch_fetch_samples(self, sample_ids: List[str]) -> Dict[str, Optional[SampleSchema]]:
"""Batch fetch multiple samples for better performance"""
# Check for batch API endpoint
if hasattr(self, '_supports_batch_fetch') and self._supports_batch_fetch:
return self._batch_fetch_api(sample_ids)
else:
return self._batch_fetch_concurrent(sample_ids)
def _batch_fetch_concurrent(self, sample_ids: List[str]) -> Dict[str, Optional[SampleSchema]]:
"""Concurrent fetching of multiple samples"""
from concurrent.futures import ThreadPoolExecutor, as_completed
results = {}
with ThreadPoolExecutor(max_workers=5) as executor:
# Submit all requests
futures = {
executor.submit(self.fetch_sample_metadata, sample_id): sample_id
for sample_id in sample_ids
}
# Collect results
for future in as_completed(futures):
sample_id = futures[future]
try:
result = future.result()
results[sample_id] = result
except Exception as e:
self.logger.error(f"Error fetching {sample_id}: {e}")
results[sample_id] = None
return results
This comprehensive guide covers all aspects of creating robust, efficient, and well-tested database handlers for Celline.