/Custom Function Development

Custom Function Development

Learn how to create custom analysis functions that integrate seamlessly with the Celline framework.

Function Architecture

Base Class Structure

All Celline functions inherit from CellineFunction which provides the core interface:

      from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional
import argparse

if TYPE_CHECKING:
    from celline import Project

class CellineFunction(ABC):
    """Base class for all Celline functions"""
    
    @abstractmethod
    def call(self, project: "Project") -> "Project":
        """Main execution method - must be implemented"""
        pass
    
    def register(self) -> Optional[str]:
        """Return function name for CLI registration"""
        return None
    
    def add_cli_args(self, parser: argparse.ArgumentParser) -> None:
        """Add CLI arguments - optional override"""
        pass
    
    def cli(self, project: "Project", args: Optional[argparse.Namespace] = None) -> "Project":
        """CLI entry point - optional override"""
        return self.call(project)
    
    def get_description(self) -> str:
        """Function description for help"""
        return "Custom Celline function"
    
    def get_usage_examples(self) -> list[str]:
        """Usage examples for help"""
        return []

    

Function Lifecycle

Understanding the function execution lifecycle:

      graph TD
    A[Function Creation] --> B[Parameter Validation]
    B --> C[CLI Argument Parsing]
    C --> D[Project Context Setup]
    D --> E[call() Method Execution]
    E --> F[Resource Cleanup]
    F --> G[Return Updated Project]

    

Creating Your First Function

Step 1: Basic Function Template

      # mypackage/functions/example_function.py
from celline.functions._base import CellineFunction
from celline.log.logger import get_logger
from celline.config import Config
import argparse
import os
from typing import TYPE_CHECKING, Optional, List, Dict, Any

if TYPE_CHECKING:
    from celline import Project

class ExampleFunction(CellineFunction):
    """
    Example custom analysis function
    
    This function demonstrates the basic structure and features
    of a Celline custom function.
    """
    
    def __init__(self, analysis_type: str, threshold: float = 0.5, output_format: str = "json"):
        """
        Initialize the function with parameters
        
        Args:
            analysis_type: Type of analysis to perform
            threshold: Analysis threshold value
            output_format: Output format (json, csv, tsv)
        """
        self.analysis_type = analysis_type
        self.threshold = threshold
        self.output_format = output_format
        self.logger = get_logger(__name__)
        
        # Validate parameters
        self._validate_parameters()
    
    def _validate_parameters(self):
        """Validate input parameters"""
        valid_analysis_types = ["expression", "quality", "diversity"]
        if self.analysis_type not in valid_analysis_types:
            raise ValueError(f"analysis_type must be one of {valid_analysis_types}")
        
        if not 0 <= self.threshold <= 1:
            raise ValueError("threshold must be between 0 and 1")
        
        valid_formats = ["json", "csv", "tsv"]
        if self.output_format not in valid_formats:
            raise ValueError(f"output_format must be one of {valid_formats}")
    
    def call(self, project: "Project") -> "Project":
        """
        Main execution method
        
        Args:
            project: Celline project instance
            
        Returns:
            Updated project instance
        """
        self.logger.info(f"Starting {self.analysis_type} analysis with threshold {self.threshold}")
        
        try:
            # Get samples from project
            samples = self._get_project_samples(project)
            self.logger.info(f"Found {len(samples)} samples to process")
            
            # Process each sample
            results = {}
            for sample_id in samples:
                self.logger.info(f"Processing sample: {sample_id}")
                sample_result = self._process_sample(project, sample_id)
                results[sample_id] = sample_result
            
            # Save combined results
            self._save_results(project, results)
            
            self.logger.info("Analysis completed successfully")
            
        except Exception as e:
            self.logger.error(f"Analysis failed: {e}")
            raise
        
        return project
    
    def _get_project_samples(self, project: "Project") -> List[str]:
        """Get list of samples from project"""
        import toml
        
        samples_file = f"{Config.PROJ_ROOT}/samples.toml"
        if not os.path.exists(samples_file):
            self.logger.warning("No samples.toml found")
            return []
        
        with open(samples_file, 'r') as f:
            samples_data = toml.load(f)
        
        return list(samples_data.keys())
    
    def _process_sample(self, project: "Project", sample_id: str) -> Dict[str, Any]:
        """Process individual sample"""
        from celline.utils.path import Path
        
        # Get sample path
        sample_path = self._get_sample_path(project, sample_id)
        
        # Load sample data
        sample_data = self._load_sample_data(sample_path)
        
        # Perform analysis based on type
        if self.analysis_type == "expression":
            result = self._analyze_expression(sample_data)
        elif self.analysis_type == "quality":
            result = self._analyze_quality(sample_data)
        elif self.analysis_type == "diversity":
            result = self._analyze_diversity(sample_data)
        else:
            raise ValueError(f"Unknown analysis type: {self.analysis_type}")
        
        return result
    
    def _get_sample_path(self, project: "Project", sample_id: str) -> "Path":
        """Get path object for sample"""
        from celline.utils.path import Path
        from celline.DB.dev.handler import HandleResolver
        
        # Resolve sample to get project information
        resolver = HandleResolver.resolve(sample_id)
        if resolver is None:
            raise ValueError(f"Cannot resolve sample: {sample_id}")
        
        sample_schema = resolver.sample.search(sample_id)
        if sample_schema.parent is None:
            raise ValueError(f"Sample {sample_id} has no parent project")
        
        return Path(sample_schema.parent, sample_id)
    
    def _load_sample_data(self, sample_path: "Path") -> Dict[str, Any]:
        """Load sample data from various sources"""
        data = {}
        
        # Load count matrix if available
        if sample_path.is_counted:
            try:
                import scanpy as sc
                count_matrix_path = f"{sample_path.resources_sample_counted}/outs/filtered_feature_bc_matrix.h5"
                adata = sc.read_10x_h5(count_matrix_path)
                data['count_matrix'] = adata
                self.logger.debug(f"Loaded count matrix: {adata.n_obs} cells, {adata.n_vars} genes")
            except Exception as e:
                self.logger.warning(f"Could not load count matrix: {e}")
        
        # Load cell type predictions if available
        if sample_path.is_predicted_celltype:
            try:
                import pandas as pd
                celltype_path = sample_path.data_sample_predicted_celltype
                celltype_data = pd.read_csv(celltype_path, sep='\t')
                data['cell_types'] = celltype_data
                self.logger.debug(f"Loaded cell type predictions: {len(celltype_data)} cells")
            except Exception as e:
                self.logger.warning(f"Could not load cell type predictions: {e}")
        
        # Load QC data if available
        if sample_path.is_preprocessed:
            try:
                import pandas as pd
                qc_path = f"{sample_path.data_sample}/cell_info.tsv"
                qc_data = pd.read_csv(qc_path, sep='\t')
                data['qc_metrics'] = qc_data
                self.logger.debug(f"Loaded QC metrics: {len(qc_data)} cells")
            except Exception as e:
                self.logger.warning(f"Could not load QC metrics: {e}")
        
        return data
    
    def _analyze_expression(self, sample_data: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze gene expression patterns"""
        if 'count_matrix' not in sample_data:
            raise ValueError("Count matrix required for expression analysis")
        
        import numpy as np
        adata = sample_data['count_matrix']
        
        # Calculate basic expression statistics
        mean_expression = np.mean(adata.X.toarray(), axis=0)
        std_expression = np.std(adata.X.toarray(), axis=0)
        
        # Find highly variable genes
        cv = std_expression / (mean_expression + 1e-12)  # Coefficient of variation
        high_var_threshold = np.percentile(cv, (1 - self.threshold) * 100)
        high_var_genes = adata.var_names[cv > high_var_threshold]
        
        # Calculate expression metrics
        total_counts_per_cell = np.sum(adata.X.toarray(), axis=1)
        genes_per_cell = np.sum(adata.X.toarray() > 0, axis=1)
        
        result = {
            'analysis_type': 'expression',
            'total_cells': adata.n_obs,
            'total_genes': adata.n_vars,
            'highly_variable_genes': len(high_var_genes),
            'highly_variable_gene_list': high_var_genes.tolist(),
            'mean_counts_per_cell': float(np.mean(total_counts_per_cell)),
            'mean_genes_per_cell': float(np.mean(genes_per_cell)),
            'threshold_used': self.threshold
        }
        
        return result
    
    def _analyze_quality(self, sample_data: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze data quality metrics"""
        if 'qc_metrics' not in sample_data:
            raise ValueError("QC metrics required for quality analysis")
        
        qc_data = sample_data['qc_metrics']
        
        # Quality thresholds
        min_genes = 200
        max_genes = 5000
        max_mt_percent = 20
        
        # Calculate quality metrics
        high_quality_cells = qc_data[
            (qc_data['n_genes_by_counts'] >= min_genes) &
            (qc_data['n_genes_by_counts'] <= max_genes) &
            (qc_data['pct_counts_mt'] <= max_mt_percent)
        ]
        
        result = {
            'analysis_type': 'quality',
            'total_cells': len(qc_data),
            'high_quality_cells': len(high_quality_cells),
            'quality_rate': len(high_quality_cells) / len(qc_data),
            'mean_genes_per_cell': float(qc_data['n_genes_by_counts'].mean()),
            'mean_mt_percent': float(qc_data['pct_counts_mt'].mean()),
            'passes_threshold': len(high_quality_cells) / len(qc_data) >= self.threshold
        }
        
        return result
    
    def _analyze_diversity(self, sample_data: Dict[str, Any]) -> Dict[str, Any]:
        """Analyze cell type diversity"""
        if 'cell_types' not in sample_data:
            raise ValueError("Cell type predictions required for diversity analysis")
        
        celltype_data = sample_data['cell_types']
        
        # Calculate cell type diversity
        celltype_counts = celltype_data['cell_type'].value_counts()
        total_cells = len(celltype_data)
        
        # Shannon diversity index
        proportions = celltype_counts / total_cells
        shannon_diversity = -np.sum(proportions * np.log(proportions + 1e-12))
        
        # Simpson diversity index
        simpson_diversity = 1 - np.sum(proportions ** 2)
        
        result = {
            'analysis_type': 'diversity',
            'total_cells': total_cells,
            'unique_cell_types': len(celltype_counts),
            'cell_type_counts': celltype_counts.to_dict(),
            'shannon_diversity': float(shannon_diversity),
            'simpson_diversity': float(simpson_diversity),
            'meets_diversity_threshold': float(shannon_diversity) >= self.threshold
        }
        
        return result
    
    def _save_results(self, project: "Project", results: Dict[str, Dict[str, Any]]):
        """Save analysis results"""
        import json
        import pandas as pd
        from datetime import datetime
        
        # Create results directory
        results_dir = f"{Config.PROJ_ROOT}/results/example_analysis"
        os.makedirs(results_dir, exist_ok=True)
        
        # Add metadata
        metadata = {
            'analysis_function': 'ExampleFunction',
            'analysis_type': self.analysis_type,
            'threshold': self.threshold,
            'output_format': self.output_format,
            'timestamp': datetime.now().isoformat(),
            'total_samples': len(results)
        }
        
        combined_results = {
            'metadata': metadata,
            'sample_results': results
        }
        
        # Save in requested format
        if self.output_format == "json":
            output_file = f"{results_dir}/results.json"
            with open(output_file, 'w') as f:
                json.dump(combined_results, f, indent=2)
        
        elif self.output_format in ["csv", "tsv"]:
            separator = "," if self.output_format == "csv" else "\t"
            output_file = f"{results_dir}/results.{self.output_format}"
            
            # Flatten results for tabular format
            flat_results = []
            for sample_id, sample_result in results.items():
                flat_result = {'sample_id': sample_id}
                flat_result.update(sample_result)
                flat_results.append(flat_result)
            
            df = pd.DataFrame(flat_results)
            df.to_csv(output_file, sep=separator, index=False)
        
        self.logger.info(f"Results saved to: {output_file}")
    
    # CLI Integration Methods
    def add_cli_args(self, parser: argparse.ArgumentParser) -> None:
        """Add CLI arguments"""
        parser.add_argument(
            'analysis_type',
            choices=['expression', 'quality', 'diversity'],
            help='Type of analysis to perform'
        )
        parser.add_argument(
            '--threshold', '-t',
            type=float,
            default=0.5,
            help='Analysis threshold (default: 0.5)'
        )
        parser.add_argument(
            '--output-format', '-f',
            choices=['json', 'csv', 'tsv'],
            default='json',
            help='Output format (default: json)'
        )
        parser.add_argument(
            '--verbose', '-v',
            action='store_true',
            help='Enable verbose logging'
        )
    
    def cli(self, project: "Project", args: Optional[argparse.Namespace] = None) -> "Project":
        """CLI entry point"""
        if args is None:
            raise ValueError("CLI arguments required")
        
        # Update instance parameters from CLI args
        self.analysis_type = args.analysis_type
        self.threshold = args.threshold
        self.output_format = args.output_format
        
        # Set verbose logging if requested
        if args.verbose:
            self.logger.setLevel("DEBUG")
        
        # Validate updated parameters
        self._validate_parameters()
        
        return self.call(project)
    
    def get_description(self) -> str:
        """Function description"""
        return """Example custom analysis function for Celline.
        
        This function demonstrates how to create custom analysis functions
        that integrate with the Celline framework. It supports multiple
        analysis types and output formats."""
    
    def get_usage_examples(self) -> List[str]:
        """Usage examples"""
        return [
            "celline run example expression",
            "celline run example quality --threshold 0.8",
            "celline run example diversity --output-format csv",
            "celline run example expression --threshold 0.3 --verbose"
        ]

    

Step 2: Function Registration

Create a registration system for your custom functions:

      # mypackage/registry.py
from celline.cli.registry import get_registry
from .functions.example_function import ExampleFunction

def register_custom_functions():
    """Register all custom functions"""
    registry = get_registry()
    
    # Register example function
    registry.register_function(
        name="example",
        class_ref=ExampleFunction,
        module_path="mypackage.functions.example_function"
    )
    
    print("Custom functions registered successfully")

# Auto-register when package is imported
register_custom_functions()

    

Step 3: Package Structure

Organize your custom functions in a proper package structure:

      mypackage/
├── __init__.py
├── registry.py
├── functions/
│   ├── __init__.py
│   ├── example_function.py
│   ├── advanced_analysis.py
│   └── visualization_function.py
├── utils/
│   ├── __init__.py
│   ├── data_processing.py
│   └── visualization.py
├── tests/
│   ├── __init__.py
│   ├── test_example_function.py
│   └── fixtures/
└── docs/
    ├── example_function.md
    └── api_reference.md

    

Advanced Function Development

Handling Complex Data Workflows

      class AdvancedAnalysisFunction(CellineFunction):
    """Advanced function with complex data workflows"""
    
    def __init__(self, workflow_config: Dict[str, Any]):
        self.workflow_config = workflow_config
        self.intermediate_results = {}
        self.logger = get_logger(__name__)
    
    def call(self, project: "Project") -> "Project":
        """Execute complex workflow with multiple steps"""
        
        # Step 1: Data preparation
        self.logger.info("Step 1: Preparing data")
        prepared_data = self._prepare_data(project)
        self.intermediate_results['prepared_data'] = prepared_data
        
        # Step 2: Initial analysis
        self.logger.info("Step 2: Initial analysis")
        initial_results = self._initial_analysis(prepared_data)
        self.intermediate_results['initial_results'] = initial_results
        
        # Step 3: Advanced processing
        self.logger.info("Step 3: Advanced processing")
        advanced_results = self._advanced_processing(initial_results)
        self.intermediate_results['advanced_results'] = advanced_results
        
        # Step 4: Generate final results
        self.logger.info("Step 4: Generating final results")
        final_results = self._generate_final_results(advanced_results)
        
        # Step 5: Save and cleanup
        self._save_results(project, final_results)
        self._cleanup_intermediate_files()
        
        return project
    
    def _prepare_data(self, project: "Project") -> Dict[str, Any]:
        """Prepare data for analysis"""
        # Complex data preparation logic
        pass
    
    def _initial_analysis(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Perform initial analysis"""
        # Initial analysis logic
        pass
    
    def _advanced_processing(self, initial_results: Dict[str, Any]) -> Dict[str, Any]:
        """Advanced processing step"""
        # Advanced processing logic
        pass
    
    def _generate_final_results(self, processed_data: Dict[str, Any]) -> Dict[str, Any]:
        """Generate final results"""
        # Final result generation
        pass
    
    def _cleanup_intermediate_files(self):
        """Cleanup intermediate files to save space"""
        # Cleanup logic
        pass

    

Integration with External Tools

      class ExternalToolFunction(CellineFunction):
    """Function that integrates with external tools"""
    
    def __init__(self, tool_path: str, tool_params: Dict[str, Any]):
        self.tool_path = tool_path
        self.tool_params = tool_params
        self.logger = get_logger(__name__)
        
        # Validate tool availability
        self._validate_tool()
    
    def _validate_tool(self):
        """Validate external tool availability"""
        import shutil
        
        if not shutil.which(self.tool_path):
            raise RuntimeError(f"External tool not found: {self.tool_path}")
        
        # Check tool version if needed
        result = subprocess.run([self.tool_path, "--version"], 
                              capture_output=True, text=True)
        if result.returncode != 0:
            raise RuntimeError(f"Cannot check tool version: {self.tool_path}")
        
        self.logger.info(f"External tool validated: {result.stdout.strip()}")
    
    def call(self, project: "Project") -> "Project":
        """Execute function with external tool"""
        samples = self._get_project_samples(project)
        
        for sample_id in samples:
            self.logger.info(f"Processing sample with external tool: {sample_id}")
            
            # Prepare input files
            input_files = self._prepare_input_files(project, sample_id)
            
            # Execute external tool
            output_files = self._execute_tool(input_files)
            
            # Process tool output
            results = self._process_tool_output(output_files)
            
            # Save results
            self._save_sample_results(project, sample_id, results)
        
        return project
    
    def _execute_tool(self, input_files: Dict[str, str]) -> Dict[str, str]:
        """Execute external tool"""
        import subprocess
        import tempfile
        
        # Create temporary directory for output
        output_dir = tempfile.mkdtemp(prefix="celline_external_")
        
        # Build command
        cmd = [self.tool_path]
        cmd.extend(self._build_tool_arguments(input_files, output_dir))
        
        # Execute tool
        self.logger.debug(f"Executing command: {' '.join(cmd)}")
        result = subprocess.run(cmd, capture_output=True, text=True)
        
        if result.returncode != 0:
            raise RuntimeError(f"External tool failed: {result.stderr}")
        
        # Return output file paths
        return self._collect_output_files(output_dir)

    

Parallel Processing Support

      from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import Callable

class ParallelProcessingFunction(CellineFunction):
    """Function with parallel processing capabilities"""
    
    def __init__(self, n_workers: int = 4, use_processes: bool = False):
        self.n_workers = n_workers
        self.use_processes = use_processes
        self.logger = get_logger(__name__)
    
    def call(self, project: "Project") -> "Project":
        """Execute with parallel processing"""
        samples = self._get_project_samples(project)
        
        # Choose executor based on configuration
        executor_class = ProcessPoolExecutor if self.use_processes else ThreadPoolExecutor
        
        with executor_class(max_workers=self.n_workers) as executor:
            # Submit all tasks
            futures = {
                executor.submit(self._process_sample, project, sample_id): sample_id
                for sample_id in samples
            }
            
            # Collect results
            results = {}
            for future in futures:
                sample_id = futures[future]
                try:
                    result = future.result()
                    results[sample_id] = result
                    self.logger.info(f"Completed processing: {sample_id}")
                except Exception as e:
                    self.logger.error(f"Failed to process {sample_id}: {e}")
                    results[sample_id] = None
        
        # Save combined results
        self._save_combined_results(project, results)
        
        return project
    
    def _process_sample(self, project: "Project", sample_id: str) -> Dict[str, Any]:
        """Process individual sample (safe for parallel execution)"""
        # Sample processing logic that is thread/process safe
        pass

    

Testing Custom Functions

Unit Test Template

      # tests/test_example_function.py
import pytest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock

from celline import Project
from mypackage.functions.example_function import ExampleFunction

class TestExampleFunction:
    
    @pytest.fixture
    def temp_project_dir(self):
        """Create temporary project directory"""
        temp_dir = tempfile.mkdtemp()
        yield temp_dir
        shutil.rmtree(temp_dir, ignore_errors=True)
    
    @pytest.fixture
    def test_project(self, temp_project_dir):
        """Create test project with sample data"""
        project_path = Path(temp_project_dir)
        
        # Create project structure
        (project_path / "data").mkdir()
        (project_path / "resources").mkdir()
        (project_path / "results").mkdir()
        
        # Create configuration files
        setting_content = """
[project]
name = "test_project"
version = "1.0.0"
"""
        (project_path / "setting.toml").write_text(setting_content)
        
        samples_content = """
GSM123456 = "Test Sample 1"
GSM789012 = "Test Sample 2"
"""
        (project_path / "samples.toml").write_text(samples_content)
        
        return Project(str(project_path), "test_project")
    
    def test_function_initialization(self):
        """Test function initialization and parameter validation"""
        # Valid initialization
        func = ExampleFunction("expression", 0.5, "json")
        assert func.analysis_type == "expression"
        assert func.threshold == 0.5
        assert func.output_format == "json"
        
        # Invalid analysis type
        with pytest.raises(ValueError, match="analysis_type must be one of"):
            ExampleFunction("invalid_type", 0.5, "json")
        
        # Invalid threshold
        with pytest.raises(ValueError, match="threshold must be between 0 and 1"):
            ExampleFunction("expression", 1.5, "json")
        
        # Invalid output format
        with pytest.raises(ValueError, match="output_format must be one of"):
            ExampleFunction("expression", 0.5, "invalid_format")
    
    @patch('celline.utils.path.Path')
    @patch('celline.DB.dev.handler.HandleResolver')
    def test_sample_processing(self, mock_resolver, mock_path, test_project):
        """Test sample processing logic"""
        # Setup mocks
        mock_sample_schema = Mock()
        mock_sample_schema.parent = "test_parent"
        mock_resolver.resolve.return_value.sample.search.return_value = mock_sample_schema
        
        mock_path_instance = Mock()
        mock_path_instance.is_counted = True
        mock_path_instance.is_predicted_celltype = True
        mock_path_instance.is_preprocessed = True
        mock_path.return_value = mock_path_instance
        
        # Create function and test
        func = ExampleFunction("expression", 0.5, "json")
        
        with patch.object(func, '_load_sample_data') as mock_load_data:
            mock_load_data.return_value = {
                'count_matrix': self._create_mock_adata()
            }
            
            result = func._process_sample(test_project, "GSM123456")
            
            assert result['analysis_type'] == 'expression'
            assert 'total_cells' in result
            assert 'total_genes' in result
    
    def _create_mock_adata(self):
        """Create mock AnnData object"""
        import numpy as np
        
        mock_adata = Mock()
        mock_adata.n_obs = 1000
        mock_adata.n_vars = 2000
        mock_adata.X.toarray.return_value = np.random.rand(1000, 2000)
        mock_adata.var_names = [f"GENE_{i}" for i in range(2000)]
        
        return mock_adata
    
    def test_cli_argument_parsing(self):
        """Test CLI argument parsing"""
        import argparse
        
        func = ExampleFunction("expression", 0.5, "json")
        parser = argparse.ArgumentParser()
        func.add_cli_args(parser)
        
        # Test valid arguments
        args = parser.parse_args(['expression', '--threshold', '0.8', '--output-format', 'csv'])
        assert args.analysis_type == 'expression'
        assert args.threshold == 0.8
        assert args.output_format == 'csv'
        
        # Test default values
        args = parser.parse_args(['quality'])
        assert args.analysis_type == 'quality'
        assert args.threshold == 0.5
        assert args.output_format == 'json'
    
    @patch('builtins.open', create=True)
    @patch('json.dump')
    def test_result_saving(self, mock_json_dump, mock_open, test_project):
        """Test result saving functionality"""
        func = ExampleFunction("expression", 0.5, "json")
        
        test_results = {
            "GSM123456": {
                "analysis_type": "expression",
                "total_cells": 1000,
                "total_genes": 2000
            }
        }
        
        func._save_results(test_project, test_results)
        
        # Verify file operations
        mock_open.assert_called()
        mock_json_dump.assert_called()

    

Integration Tests

      # tests/integration/test_example_function_integration.py
import pytest
import tempfile
import os
from pathlib import Path

from celline import Project
from mypackage.functions.example_function import ExampleFunction

class TestExampleFunctionIntegration:
    
    @pytest.fixture
    def full_test_project(self):
        """Create full test project with real data structure"""
        temp_dir = tempfile.mkdtemp()
        project_path = Path(temp_dir)
        
        # Create complete project structure
        self._create_project_structure(project_path)
        self._create_sample_data(project_path)
        
        yield Project(str(project_path), "integration_test")
        
        shutil.rmtree(temp_dir, ignore_errors=True)
    
    def _create_project_structure(self, project_path: Path):
        """Create realistic project structure"""
        # Create directories
        (project_path / "data" / "GSM123456").mkdir(parents=True)
        (project_path / "resources" / "GSM123456" / "counted" / "outs").mkdir(parents=True)
        (project_path / "results").mkdir()
        
        # Create configuration files
        (project_path / "setting.toml").write_text("""
[project]
name = "integration_test"
version = "1.0.0"
""")
        
        (project_path / "samples.toml").write_text("""
GSM123456 = "Integration Test Sample"
""")
    
    def _create_sample_data(self, project_path: Path):
        """Create realistic sample data files"""
        # Create mock HDF5 file (in real test, use actual data)
        h5_file = project_path / "resources" / "GSM123456" / "counted" / "outs" / "filtered_feature_bc_matrix.h5"
        h5_file.touch()  # In real test, create actual HDF5 data
        
        # Create QC metrics file
        qc_file = project_path / "data" / "GSM123456" / "cell_info.tsv"
        qc_content = """barcode\tn_genes_by_counts\ttotal_counts\tpct_counts_mt\tinclude
CELL_1\t1500\t5000\t5.0\ttrue
CELL_2\t2000\t8000\t3.2\ttrue
CELL_3\t800\t2500\t15.0\tfalse
"""
        qc_file.write_text(qc_content)
        
        # Create cell type predictions
        celltype_file = project_path / "data" / "GSM123456" / "predicted_celltype.tsv"
        celltype_content = """cell\tscpred_prediction
GSM123456_1\tT_cell
GSM123456_2\tB_cell
GSM123456_3\tNK_cell
"""
        celltype_file.write_text(celltype_content)
    
    @pytest.mark.slow
    def test_full_workflow_execution(self, full_test_project):
        """Test complete workflow execution"""
        func = ExampleFunction("quality", 0.7, "json")
        
        # Execute function
        result_project = func.call(full_test_project)
        
        # Verify results
        assert result_project is not None
        
        # Check output files
        results_dir = Path(full_test_project.PROJ_PATH) / "results" / "example_analysis"
        assert results_dir.exists()
        
        results_file = results_dir / "results.json"
        assert results_file.exists()
        
        # Verify result content
        import json
        with open(results_file) as f:
            results = json.load(f)
        
        assert "metadata" in results
        assert "sample_results" in results
        assert results["metadata"]["analysis_type"] == "quality"

    

This comprehensive guide covers the essential aspects of creating custom functions for Celline, from basic templates to advanced features and thorough testing strategies.