/Database Handler Development

Database Handler Development

Learn how to create custom database handlers to support new data sources and repositories in Celline.

Database Handler Architecture

Handler Interface

All database handlers must implement the DatabaseHandler interface:

      from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from celline.DB.dev.model import SampleSchema, RunSchema

class DatabaseHandler(ABC):
    """Base interface for database handlers"""
    
    @abstractmethod
    def can_handle(self, sample_id: str) -> bool:
        """Check if this handler can process the given sample ID"""
        pass
    
    @abstractmethod
    def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
        """Fetch sample metadata from the database"""
        pass
    
    @abstractmethod
    def fetch_run_metadata(self, run_id: str) -> Optional[RunSchema]:
        """Fetch run metadata from the database"""
        pass
    
    @abstractmethod
    def add(self, sample_id: str, force_search: bool = False) -> bool:
        """Add sample to local database"""
        pass
    
    def validate_sample_id(self, sample_id: str) -> bool:
        """Validate sample ID format (optional override)"""
        return True
    
    def get_download_urls(self, sample_id: str) -> List[str]:
        """Get download URLs for sample data (optional override)"""
        return []

    

Data Models

Understand the core data models used in Celline:

      from dataclasses import dataclass
from typing import Optional

@dataclass
class SampleSchema:
    """Sample metadata schema"""
    key: str                    # Primary identifier
    title: Optional[str]        # Sample title/description
    organism: Optional[str]     # Organism name
    tissue: Optional[str]       # Tissue type
    cell_type: Optional[str]    # Cell type
    library_strategy: Optional[str]  # Sequencing strategy
    library_source: Optional[str]   # Library source
    library_selection: Optional[str] # Library selection
    platform: Optional[str]    # Sequencing platform
    instrument: Optional[str]   # Sequencing instrument
    parent: Optional[str]       # Parent study/project ID
    children: Optional[str]     # Child run IDs (comma-separated)
    species: Optional[str]      # Species name
    date: Optional[str]         # Sample date

@dataclass
class RunSchema:
    """Run metadata schema"""
    key: str                    # Primary identifier
    title: Optional[str]        # Run title
    strategy: Optional[str]     # Sequencing strategy
    raw_link: Optional[str]     # Download links (comma-separated)
    parent: Optional[str]       # Parent sample ID
    file_type: Optional[str]    # File type (fastq, bam, etc.)
    file_size: Optional[int]    # File size in bytes

    

Creating a Custom Handler

Step 1: Basic Handler Template

      # mypackage/handlers/custom_handler.py
import requests
import re
from typing import Optional, List, Dict, Any
from celline.DB.dev.handler import DatabaseHandler
from celline.DB.dev.model import SampleSchema, RunSchema
from celline.log.logger import get_logger

class CustomDatabaseHandler(DatabaseHandler):
    """
    Custom database handler for MyCustomDB
    
    This handler supports sample IDs in the format: CUSTOM[0-9]{6}
    Example: CUSTOM123456
    """
    
    def __init__(self, api_base_url: str = "https://api.mycustomdb.org/v1"):
        """
        Initialize the handler
        
        Args:
            api_base_url: Base URL for the custom database API
        """
        self.api_base_url = api_base_url.rstrip('/')
        self.session = requests.Session()
        self.session.headers.update({
            'Accept': 'application/json',
            'User-Agent': 'Celline-CustomHandler/1.0'
        })
        self.logger = get_logger(__name__)
        
        # Cache for metadata to avoid repeated API calls
        self._metadata_cache: Dict[str, Any] = {}
        
        # Validate API connection
        self._validate_api_connection()
    
    def _validate_api_connection(self):
        """Validate connection to the custom database API"""
        try:
            response = self.session.get(f"{self.api_base_url}/status", timeout=10)
            response.raise_for_status()
            self.logger.info("Custom database API connection validated")
        except requests.RequestException as e:
            self.logger.warning(f"Cannot connect to custom database API: {e}")
    
    def can_handle(self, sample_id: str) -> bool:
        """
        Check if this handler can process the sample ID
        
        Args:
            sample_id: Sample identifier to check
            
        Returns:
            True if this handler can process the sample ID
        """
        # Custom ID pattern: CUSTOM followed by 6 digits
        pattern = r'^CUSTOM\d{6}$'
        return bool(re.match(pattern, sample_id))
    
    def validate_sample_id(self, sample_id: str) -> bool:
        """
        Validate sample ID format
        
        Args:
            sample_id: Sample identifier to validate
            
        Returns:
            True if sample ID is valid
        """
        if not self.can_handle(sample_id):
            return False
        
        # Additional validation if needed
        numeric_part = sample_id[6:]  # Remove 'CUSTOM' prefix
        return numeric_part.isdigit() and len(numeric_part) == 6
    
    def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
        """
        Fetch sample metadata from the custom database
        
        Args:
            sample_id: Sample identifier
            
        Returns:
            SampleSchema object or None if not found
        """
        if not self.can_handle(sample_id):
            self.logger.error(f"Cannot handle sample ID: {sample_id}")
            return None
        
        # Check cache first
        if sample_id in self._metadata_cache:
            self.logger.debug(f"Using cached metadata for {sample_id}")
            return self._create_sample_schema(sample_id, self._metadata_cache[sample_id])
        
        try:
            # Fetch from API
            url = f"{self.api_base_url}/samples/{sample_id}"
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            
            # Cache the result
            self._metadata_cache[sample_id] = data
            
            # Convert to SampleSchema
            return self._create_sample_schema(sample_id, data)
            
        except requests.HTTPError as e:
            if e.response.status_code == 404:
                self.logger.warning(f"Sample not found: {sample_id}")
            else:
                self.logger.error(f"HTTP error fetching sample {sample_id}: {e}")
            return None
        except requests.RequestException as e:
            self.logger.error(f"Network error fetching sample {sample_id}: {e}")
            return None
        except (KeyError, ValueError) as e:
            self.logger.error(f"Invalid response format for sample {sample_id}: {e}")
            return None
    
    def _create_sample_schema(self, sample_id: str, data: Dict[str, Any]) -> SampleSchema:
        """
        Create SampleSchema from API response data
        
        Args:
            sample_id: Sample identifier
            data: Raw API response data
            
        Returns:
            SampleSchema object
        """
        return SampleSchema(
            key=sample_id,
            title=data.get('title', ''),
            organism=data.get('organism', ''),
            tissue=data.get('tissue_type', ''),
            cell_type=data.get('cell_type', ''),
            library_strategy=data.get('library_strategy', ''),
            library_source=data.get('library_source', ''),
            library_selection=data.get('library_selection', ''),
            platform=data.get('platform', ''),
            instrument=data.get('instrument_model', ''),
            parent=data.get('study_id', ''),
            children=','.join(data.get('run_ids', [])),
            species=data.get('species', data.get('organism', '')),
            date=data.get('collection_date', '')
        )
    
    def fetch_run_metadata(self, run_id: str) -> Optional[RunSchema]:
        """
        Fetch run metadata from the custom database
        
        Args:
            run_id: Run identifier
            
        Returns:
            RunSchema object or None if not found
        """
        try:
            url = f"{self.api_base_url}/runs/{run_id}"
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            
            return self._create_run_schema(run_id, data)
            
        except requests.RequestException as e:
            self.logger.error(f"Error fetching run {run_id}: {e}")
            return None
    
    def _create_run_schema(self, run_id: str, data: Dict[str, Any]) -> RunSchema:
        """
        Create RunSchema from API response data
        
        Args:
            run_id: Run identifier
            data: Raw API response data
            
        Returns:
            RunSchema object
        """
        # Extract download URLs
        file_urls = data.get('file_urls', [])
        if isinstance(file_urls, list):
            raw_links = ','.join(file_urls)
        else:
            raw_links = str(file_urls)
        
        return RunSchema(
            key=run_id,
            title=data.get('title', ''),
            strategy=data.get('library_strategy', ''),
            raw_link=raw_links,
            parent=data.get('sample_id', ''),
            file_type=data.get('file_type', 'fastq'),
            file_size=data.get('file_size', 0)
        )
    
    def add(self, sample_id: str, force_search: bool = False) -> bool:
        """
        Add sample to local database
        
        Args:
            sample_id: Sample identifier to add
            force_search: Force refetch even if already cached
            
        Returns:
            True if successfully added
        """
        self.logger.info(f"Adding sample {sample_id} to local database")
        
        # Clear cache if force search
        if force_search and sample_id in self._metadata_cache:
            del self._metadata_cache[sample_id]
        
        # Fetch sample metadata
        sample_schema = self.fetch_sample_metadata(sample_id)
        if sample_schema is None:
            self.logger.error(f"Could not fetch metadata for sample: {sample_id}")
            return False
        
        # Save to local database
        success = self._save_to_local_database(sample_schema)
        
        if success:
            # Also fetch and save run metadata
            if sample_schema.children:
                run_ids = sample_schema.children.split(',')
                for run_id in run_ids:
                    run_id = run_id.strip()
                    if run_id:
                        run_schema = self.fetch_run_metadata(run_id)
                        if run_schema:
                            self._save_run_to_local_database(run_schema)
        
        return success
    
    def _save_to_local_database(self, schema: SampleSchema) -> bool:
        """
        Save sample schema to local Parquet database
        
        Args:
            schema: SampleSchema to save
            
        Returns:
            True if successfully saved
        """
        try:
            import polars as pl
            import os
            from celline.config import Config
            
            # Define database file path
            db_file = os.path.join(Config.EXEC_ROOT, "DB", "CUSTOM_SAMPLES.parquet")
            
            # Create directory if it doesn't exist
            os.makedirs(os.path.dirname(db_file), exist_ok=True)
            
            # Convert schema to dictionary
            sample_data = {
                'key': schema.key,
                'title': schema.title,
                'organism': schema.organism,
                'tissue': schema.tissue,
                'cell_type': schema.cell_type,
                'library_strategy': schema.library_strategy,
                'library_source': schema.library_source,
                'library_selection': schema.library_selection,
                'platform': schema.platform,
                'instrument': schema.instrument,
                'parent': schema.parent,
                'children': schema.children,
                'species': schema.species,
                'date': schema.date
            }
            
            # Load existing data or create new DataFrame
            if os.path.exists(db_file):
                existing_df = pl.read_parquet(db_file)
                
                # Check if sample already exists
                existing_keys = existing_df.get_column('key').to_list()
                if schema.key in existing_keys:
                    # Update existing record
                    updated_df = existing_df.filter(pl.col('key') != schema.key)
                    new_record = pl.DataFrame([sample_data])
                    final_df = pl.concat([updated_df, new_record])
                else:
                    # Add new record
                    new_record = pl.DataFrame([sample_data])
                    final_df = pl.concat([existing_df, new_record])
            else:
                # Create new database
                final_df = pl.DataFrame([sample_data])
            
            # Save to file
            final_df.write_parquet(db_file)
            
            self.logger.info(f"Successfully saved sample {schema.key} to local database")
            return True
            
        except Exception as e:
            self.logger.error(f"Error saving sample {schema.key} to database: {e}")
            return False
    
    def _save_run_to_local_database(self, schema: RunSchema) -> bool:
        """
        Save run schema to local Parquet database
        
        Args:
            schema: RunSchema to save
            
        Returns:
            True if successfully saved
        """
        try:
            import polars as pl
            import os
            from celline.config import Config
            
            # Define database file path
            db_file = os.path.join(Config.EXEC_ROOT, "DB", "CUSTOM_RUNS.parquet")
            
            # Create directory if it doesn't exist
            os.makedirs(os.path.dirname(db_file), exist_ok=True)
            
            # Convert schema to dictionary
            run_data = {
                'key': schema.key,
                'title': schema.title,
                'strategy': schema.strategy,
                'raw_link': schema.raw_link,
                'parent': schema.parent,
                'file_type': schema.file_type,
                'file_size': schema.file_size
            }
            
            # Load existing data or create new DataFrame
            if os.path.exists(db_file):
                existing_df = pl.read_parquet(db_file)
                
                # Check if run already exists
                existing_keys = existing_df.get_column('key').to_list()
                if schema.key in existing_keys:
                    # Update existing record
                    updated_df = existing_df.filter(pl.col('key') != schema.key)
                    new_record = pl.DataFrame([run_data])
                    final_df = pl.concat([updated_df, new_record])
                else:
                    # Add new record
                    new_record = pl.DataFrame([run_data])
                    final_df = pl.concat([existing_df, new_record])
            else:
                # Create new database
                final_df = pl.DataFrame([run_data])
            
            # Save to file
            final_df.write_parquet(db_file)
            
            self.logger.debug(f"Successfully saved run {schema.key} to local database")
            return True
            
        except Exception as e:
            self.logger.error(f"Error saving run {schema.key} to database: {e}")
            return False
    
    def get_download_urls(self, sample_id: str) -> List[str]:
        """
        Get download URLs for sample data
        
        Args:
            sample_id: Sample identifier
            
        Returns:
            List of download URLs
        """
        sample_schema = self.fetch_sample_metadata(sample_id)
        if not sample_schema or not sample_schema.children:
            return []
        
        urls = []
        run_ids = sample_schema.children.split(',')
        
        for run_id in run_ids:
            run_id = run_id.strip()
            if run_id:
                run_schema = self.fetch_run_metadata(run_id)
                if run_schema and run_schema.raw_link:
                    run_urls = run_schema.raw_link.split(',')
                    urls.extend([url.strip() for url in run_urls if url.strip()])
        
        return urls
    
    def search_samples(self, query: Dict[str, Any]) -> List[SampleSchema]:
        """
        Search samples by various criteria
        
        Args:
            query: Search parameters
            
        Returns:
            List of matching SampleSchema objects
        """
        try:
            url = f"{self.api_base_url}/samples/search"
            response = self.session.post(url, json=query, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            samples = data.get('samples', [])
            
            results = []
            for sample_data in samples:
                sample_id = sample_data.get('id', '')
                if sample_id and self.can_handle(sample_id):
                    schema = self._create_sample_schema(sample_id, sample_data)
                    results.append(schema)
            
            self.logger.info(f"Found {len(results)} samples matching query")
            return results
            
        except requests.RequestException as e:
            self.logger.error(f"Error searching samples: {e}")
            return []

    

Step 2: Advanced Handler Features

Authentication Support

      class AuthenticatedCustomHandler(CustomDatabaseHandler):
    """Custom handler with authentication support"""
    
    def __init__(self, api_base_url: str, api_key: Optional[str] = None, 
                 username: Optional[str] = None, password: Optional[str] = None):
        self.api_key = api_key
        self.username = username
        self.password = password
        
        super().__init__(api_base_url)
        
        # Setup authentication
        self._setup_authentication()
    
    def _setup_authentication(self):
        """Setup authentication for API requests"""
        if self.api_key:
            # API key authentication
            self.session.headers.update({
                'Authorization': f'Bearer {self.api_key}'
            })
        elif self.username and self.password:
            # Basic authentication
            from requests.auth import HTTPBasicAuth
            self.session.auth = HTTPBasicAuth(self.username, self.password)
        
        # Validate authentication
        self._validate_authentication()
    
    def _validate_authentication(self):
        """Validate authentication credentials"""
        try:
            response = self.session.get(f"{self.api_base_url}/auth/validate", timeout=10)
            if response.status_code == 200:
                self.logger.info("Authentication validated successfully")
            elif response.status_code == 401:
                self.logger.error("Authentication failed - invalid credentials")
                raise ValueError("Invalid authentication credentials")
            else:
                self.logger.warning(f"Authentication validation returned status {response.status_code}")
        except requests.RequestException as e:
            self.logger.warning(f"Could not validate authentication: {e}")

    

Rate Limiting Support

      import time
from functools import wraps

class RateLimitedCustomHandler(CustomDatabaseHandler):
    """Custom handler with rate limiting support"""
    
    def __init__(self, api_base_url: str, requests_per_second: float = 2.0):
        self.requests_per_second = requests_per_second
        self.min_request_interval = 1.0 / requests_per_second
        self.last_request_time = 0.0
        
        super().__init__(api_base_url)
    
    def _rate_limited_request(self, method: str, url: str, **kwargs):
        """Make rate-limited request"""
        # Wait if necessary to respect rate limit
        current_time = time.time()
        time_since_last = current_time - self.last_request_time
        
        if time_since_last < self.min_request_interval:
            sleep_time = self.min_request_interval - time_since_last
            time.sleep(sleep_time)
        
        # Make request
        response = self.session.request(method, url, **kwargs)
        self.last_request_time = time.time()
        
        return response
    
    def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
        """Rate-limited sample metadata fetching"""
        if not self.can_handle(sample_id):
            return None
        
        # Check cache first
        if sample_id in self._metadata_cache:
            return self._create_sample_schema(sample_id, self._metadata_cache[sample_id])
        
        try:
            url = f"{self.api_base_url}/samples/{sample_id}"
            response = self._rate_limited_request('GET', url, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            self._metadata_cache[sample_id] = data
            
            return self._create_sample_schema(sample_id, data)
            
        except requests.RequestException as e:
            self.logger.error(f"Error fetching sample {sample_id}: {e}")
            return None

    

Caching and Persistence

      import pickle
import hashlib
from pathlib import Path

class CachedCustomHandler(CustomDatabaseHandler):
    """Custom handler with persistent caching"""
    
    def __init__(self, api_base_url: str, cache_dir: Optional[str] = None, 
                 cache_ttl: int = 3600):
        self.cache_ttl = cache_ttl  # Cache time-to-live in seconds
        
        if cache_dir:
            self.cache_dir = Path(cache_dir)
        else:
            from celline.config import Config
            self.cache_dir = Path(Config.PROJ_ROOT) / ".cache" / "custom_handler"
        
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        
        super().__init__(api_base_url)
    
    def _get_cache_file(self, key: str) -> Path:
        """Get cache file path for given key"""
        # Create hash of key for filename
        key_hash = hashlib.md5(key.encode()).hexdigest()
        return self.cache_dir / f"{key_hash}.cache"
    
    def _is_cache_valid(self, cache_file: Path) -> bool:
        """Check if cache file is still valid"""
        if not cache_file.exists():
            return False
        
        # Check age
        file_age = time.time() - cache_file.stat().st_mtime
        return file_age < self.cache_ttl
    
    def _load_from_cache(self, key: str) -> Optional[Dict[str, Any]]:
        """Load data from cache"""
        cache_file = self._get_cache_file(key)
        
        if not self._is_cache_valid(cache_file):
            return None
        
        try:
            with open(cache_file, 'rb') as f:
                return pickle.load(f)
        except Exception as e:
            self.logger.warning(f"Error loading cache for {key}: {e}")
            return None
    
    def _save_to_cache(self, key: str, data: Dict[str, Any]):
        """Save data to cache"""
        cache_file = self._get_cache_file(key)
        
        try:
            with open(cache_file, 'wb') as f:
                pickle.dump(data, f)
        except Exception as e:
            self.logger.warning(f"Error saving cache for {key}: {e}")
    
    def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
        """Fetch with persistent caching"""
        if not self.can_handle(sample_id):
            return None
        
        # Try cache first
        cached_data = self._load_from_cache(f"sample_{sample_id}")
        if cached_data:
            self.logger.debug(f"Using cached metadata for {sample_id}")
            return self._create_sample_schema(sample_id, cached_data)
        
        # Fetch from API
        try:
            url = f"{self.api_base_url}/samples/{sample_id}"
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            
            # Save to cache
            self._save_to_cache(f"sample_{sample_id}", data)
            
            return self._create_sample_schema(sample_id, data)
            
        except requests.RequestException as e:
            self.logger.error(f"Error fetching sample {sample_id}: {e}")
            return None

    

Handler Registration

Step 3: Register Your Handler

      # mypackage/handlers/registry.py
from celline.DB.dev.handler import HandleResolver
from .custom_handler import CustomDatabaseHandler

def register_custom_handlers():
    """Register all custom database handlers"""
    
    # Create handler instance
    custom_handler = CustomDatabaseHandler(
        api_base_url="https://api.mycustomdb.org/v1"
    )
    
    # Register with Celline
    HandleResolver.register_handler(custom_handler)
    
    print("Custom database handlers registered successfully")

# Auto-register when package is imported
register_custom_handlers()

    

Configuration-Based Registration

      # mypackage/config.py
import os
from typing import Dict, Any

class HandlerConfig:
    """Configuration for custom handlers"""
    
    @classmethod
    def get_custom_db_config(cls) -> Dict[str, Any]:
        """Get configuration for custom database handler"""
        return {
            'api_base_url': os.getenv('CUSTOM_DB_API_URL', 'https://api.mycustomdb.org/v1'),
            'api_key': os.getenv('CUSTOM_DB_API_KEY'),
            'requests_per_second': float(os.getenv('CUSTOM_DB_RATE_LIMIT', '2.0')),
            'cache_ttl': int(os.getenv('CUSTOM_DB_CACHE_TTL', '3600'))
        }

# mypackage/handlers/factory.py
from .custom_handler import CustomDatabaseHandler
from .authenticated_handler import AuthenticatedCustomHandler
from .rate_limited_handler import RateLimitedCustomHandler
from .cached_handler import CachedCustomHandler
from ..config import HandlerConfig

def create_custom_handler() -> CustomDatabaseHandler:
    """Factory function to create configured handler"""
    config = HandlerConfig.get_custom_db_config()
    
    # Choose handler based on configuration
    if config.get('api_key'):
        return AuthenticatedCustomHandler(
            api_base_url=config['api_base_url'],
            api_key=config['api_key']
        )
    elif config.get('requests_per_second') < 10:
        return RateLimitedCustomHandler(
            api_base_url=config['api_base_url'],
            requests_per_second=config['requests_per_second']
        )
    else:
        return CachedCustomHandler(
            api_base_url=config['api_base_url'],
            cache_ttl=config['cache_ttl']
        )

    

Testing Database Handlers

Unit Tests

      # tests/test_custom_handler.py
import pytest
import responses
from unittest.mock import patch, Mock

from mypackage.handlers.custom_handler import CustomDatabaseHandler

class TestCustomDatabaseHandler:
    
    @pytest.fixture
    def handler(self):
        """Create handler instance for testing"""
        with patch.object(CustomDatabaseHandler, '_validate_api_connection'):
            return CustomDatabaseHandler("https://api.test.com/v1")
    
    def test_can_handle_valid_ids(self, handler):
        """Test sample ID validation"""
        assert handler.can_handle("CUSTOM123456")
        assert handler.can_handle("CUSTOM000001")
        assert not handler.can_handle("GSM123456")
        assert not handler.can_handle("CUSTOM12345")  # Too short
        assert not handler.can_handle("CUSTOM1234567")  # Too long
        assert not handler.can_handle("CUSTOMA23456")  # Contains letter
    
    @responses.activate
    def test_fetch_sample_metadata_success(self, handler):
        """Test successful metadata fetching"""
        sample_id = "CUSTOM123456"
        
        # Mock API response
        responses.add(
            responses.GET,
            f"https://api.test.com/v1/samples/{sample_id}",
            json={
                'id': sample_id,
                'title': 'Test Sample',
                'organism': 'Homo sapiens',
                'tissue_type': 'brain',
                'library_strategy': 'RNA-Seq',
                'platform': 'Illumina',
                'study_id': 'STUDY001',
                'run_ids': ['RUN001', 'RUN002']
            },
            status=200
        )
        
        # Test fetch
        schema = handler.fetch_sample_metadata(sample_id)
        
        assert schema is not None
        assert schema.key == sample_id
        assert schema.title == 'Test Sample'
        assert schema.organism == 'Homo sapiens'
        assert schema.tissue == 'brain'
        assert schema.children == 'RUN001,RUN002'
    
    @responses.activate
    def test_fetch_sample_metadata_not_found(self, handler):
        """Test handling of 404 responses"""
        sample_id = "CUSTOM999999"
        
        responses.add(
            responses.GET,
            f"https://api.test.com/v1/samples/{sample_id}",
            status=404
        )
        
        schema = handler.fetch_sample_metadata(sample_id)
        assert schema is None
    
    @patch('mypackage.handlers.custom_handler.pl.read_parquet')
    @patch('mypackage.handlers.custom_handler.pl.DataFrame.write_parquet')
    def test_save_to_local_database(self, mock_write, mock_read, handler):
        """Test saving to local database"""
        from celline.DB.dev.model import SampleSchema
        
        # Mock existing data
        mock_read.return_value = Mock()
        mock_read.return_value.get_column.return_value.to_list.return_value = []
        
        # Create test schema
        schema = SampleSchema(
            key="CUSTOM123456",
            title="Test Sample",
            organism="Homo sapiens",
            tissue="brain",
            cell_type=None,
            library_strategy="RNA-Seq",
            library_source=None,
            library_selection=None,
            platform="Illumina",
            instrument=None,
            parent="STUDY001",
            children="RUN001,RUN002",
            species="Homo sapiens",
            date="2024-01-01"
        )
        
        # Test save
        result = handler._save_to_local_database(schema)
        
        assert result is True
        mock_write.assert_called_once()

    

Integration Tests

      # tests/integration/test_custom_handler_integration.py
import pytest
from unittest.mock import patch
import tempfile
import os

from mypackage.handlers.custom_handler import CustomDatabaseHandler

class TestCustomHandlerIntegration:
    
    @pytest.fixture
    def temp_db_dir(self):
        """Create temporary database directory"""
        temp_dir = tempfile.mkdtemp()
        yield temp_dir
        shutil.rmtree(temp_dir, ignore_errors=True)
    
    @pytest.mark.integration
    def test_full_sample_addition_workflow(self, temp_db_dir):
        """Test complete sample addition workflow"""
        # Setup handler with temporary database
        with patch('celline.config.Config') as mock_config:
            mock_config.EXEC_ROOT = temp_db_dir
            
            handler = CustomDatabaseHandler("https://api.test.com/v1")
            
            # Mock API responses
            with patch.object(handler, 'fetch_sample_metadata') as mock_fetch_sample, \
                 patch.object(handler, 'fetch_run_metadata') as mock_fetch_run:
                
                # Setup mock responses
                from celline.DB.dev.model import SampleSchema, RunSchema
                
                mock_fetch_sample.return_value = SampleSchema(
                    key="CUSTOM123456",
                    title="Integration Test Sample",
                    organism="Homo sapiens",
                    tissue="brain",
                    cell_type=None,
                    library_strategy="RNA-Seq",
                    library_source="TRANSCRIPTOMIC",
                    library_selection="cDNA",
                    platform="Illumina",
                    instrument="HiSeq 2500",
                    parent="STUDY001",
                    children="RUN001",
                    species="Homo sapiens",
                    date="2024-01-01"
                )
                
                mock_fetch_run.return_value = RunSchema(
                    key="RUN001",
                    title="Test Run",
                    strategy="RNA-Seq",
                    raw_link="https://data.test.com/RUN001.fastq.gz",
                    parent="CUSTOM123456",
                    file_type="fastq",
                    file_size=1000000
                )
                
                # Test addition
                result = handler.add("CUSTOM123456")
                
                assert result is True
                
                # Verify database files were created
                sample_db = os.path.join(temp_db_dir, "DB", "CUSTOM_SAMPLES.parquet")
                run_db = os.path.join(temp_db_dir, "DB", "CUSTOM_RUNS.parquet")
                
                assert os.path.exists(sample_db)
                assert os.path.exists(run_db)
                
                # Verify data content
                import polars as pl
                
                sample_df = pl.read_parquet(sample_db)
                assert len(sample_df) == 1
                assert sample_df.get_column('key')[0] == "CUSTOM123456"
                
                run_df = pl.read_parquet(run_db)
                assert len(run_df) == 1
                assert run_df.get_column('key')[0] == "RUN001"

    

Best Practices

Error Handling

      class RobustCustomHandler(CustomDatabaseHandler):
    """Handler with comprehensive error handling"""
    
    def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
        """Robust metadata fetching with comprehensive error handling"""
        if not self.can_handle(sample_id):
            self.logger.error(f"Handler cannot process sample ID: {sample_id}")
            return None
        
        try:
            return self._fetch_with_retry(sample_id)
        except Exception as e:
            self.logger.exception(f"Unexpected error fetching sample {sample_id}: {e}")
            return None
    
    def _fetch_with_retry(self, sample_id: str, max_retries: int = 3) -> Optional[SampleSchema]:
        """Fetch with exponential backoff retry"""
        for attempt in range(max_retries):
            try:
                url = f"{self.api_base_url}/samples/{sample_id}"
                response = self.session.get(url, timeout=30)
                response.raise_for_status()
                
                data = response.json()
                return self._create_sample_schema(sample_id, data)
                
            except requests.HTTPError as e:
                if e.response.status_code == 404:
                    # Don't retry for 404s
                    self.logger.warning(f"Sample not found: {sample_id}")
                    return None
                elif e.response.status_code >= 500:
                    # Server error - retry
                    if attempt < max_retries - 1:
                        wait_time = 2 ** attempt  # Exponential backoff
                        self.logger.warning(f"Server error, retrying in {wait_time}s...")
                        time.sleep(wait_time)
                        continue
                    else:
                        self.logger.error(f"Server error persists for {sample_id}: {e}")
                        return None
                else:
                    # Client error - don't retry
                    self.logger.error(f"Client error for {sample_id}: {e}")
                    return None
            except requests.RequestException as e:
                # Network error - retry
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt
                    self.logger.warning(f"Network error, retrying in {wait_time}s...")
                    time.sleep(wait_time)
                    continue
                else:
                    self.logger.error(f"Network error persists for {sample_id}: {e}")
                    return None
        
        return None

    

Performance Optimization

      class OptimizedCustomHandler(CustomDatabaseHandler):
    """Handler optimized for performance"""
    
    def __init__(self, api_base_url: str, max_connections: int = 10):
        super().__init__(api_base_url)
        
        # Configure session for performance
        from requests.adapters import HTTPAdapter
        from urllib3.util.retry import Retry
        
        retry_strategy = Retry(
            total=3,
            status_forcelist=[429, 500, 502, 503, 504],
            method_whitelist=["HEAD", "GET", "OPTIONS"],
            backoff_factor=1
        )
        
        adapter = HTTPAdapter(
            max_retries=retry_strategy,
            pool_connections=max_connections,
            pool_maxsize=max_connections
        )
        
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)
    
    def batch_fetch_samples(self, sample_ids: List[str]) -> Dict[str, Optional[SampleSchema]]:
        """Batch fetch multiple samples for better performance"""
        # Check for batch API endpoint
        if hasattr(self, '_supports_batch_fetch') and self._supports_batch_fetch:
            return self._batch_fetch_api(sample_ids)
        else:
            return self._batch_fetch_concurrent(sample_ids)
    
    def _batch_fetch_concurrent(self, sample_ids: List[str]) -> Dict[str, Optional[SampleSchema]]:
        """Concurrent fetching of multiple samples"""
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        results = {}
        
        with ThreadPoolExecutor(max_workers=5) as executor:
            # Submit all requests
            futures = {
                executor.submit(self.fetch_sample_metadata, sample_id): sample_id
                for sample_id in sample_ids
            }
            
            # Collect results
            for future in as_completed(futures):
                sample_id = futures[future]
                try:
                    result = future.result()
                    results[sample_id] = result
                except Exception as e:
                    self.logger.error(f"Error fetching {sample_id}: {e}")
                    results[sample_id] = None
        
        return results

    

This comprehensive guide covers all aspects of creating robust, efficient, and well-tested database handlers for Celline.