Custom Function Development
Learn how to create custom analysis functions that integrate seamlessly with the Celline framework.
Function Architecture
Base Class Structure
All Celline functions inherit from CellineFunction
which provides the core interface:
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Optional
import argparse
if TYPE_CHECKING:
from celline import Project
class CellineFunction(ABC):
"""Base class for all Celline functions"""
@abstractmethod
def call(self, project: "Project") -> "Project":
"""Main execution method - must be implemented"""
pass
def register(self) -> Optional[str]:
"""Return function name for CLI registration"""
return None
def add_cli_args(self, parser: argparse.ArgumentParser) -> None:
"""Add CLI arguments - optional override"""
pass
def cli(self, project: "Project", args: Optional[argparse.Namespace] = None) -> "Project":
"""CLI entry point - optional override"""
return self.call(project)
def get_description(self) -> str:
"""Function description for help"""
return "Custom Celline function"
def get_usage_examples(self) -> list[str]:
"""Usage examples for help"""
return []
Function Lifecycle
Understanding the function execution lifecycle:
graph TD
A[Function Creation] --> B[Parameter Validation]
B --> C[CLI Argument Parsing]
C --> D[Project Context Setup]
D --> E[call() Method Execution]
E --> F[Resource Cleanup]
F --> G[Return Updated Project]
Creating Your First Function
Step 1: Basic Function Template
# mypackage/functions/example_function.py
from celline.functions._base import CellineFunction
from celline.log.logger import get_logger
from celline.config import Config
import argparse
import os
from typing import TYPE_CHECKING, Optional, List, Dict, Any
if TYPE_CHECKING:
from celline import Project
class ExampleFunction(CellineFunction):
"""
Example custom analysis function
This function demonstrates the basic structure and features
of a Celline custom function.
"""
def __init__(self, analysis_type: str, threshold: float = 0.5, output_format: str = "json"):
"""
Initialize the function with parameters
Args:
analysis_type: Type of analysis to perform
threshold: Analysis threshold value
output_format: Output format (json, csv, tsv)
"""
self.analysis_type = analysis_type
self.threshold = threshold
self.output_format = output_format
self.logger = get_logger(__name__)
# Validate parameters
self._validate_parameters()
def _validate_parameters(self):
"""Validate input parameters"""
valid_analysis_types = ["expression", "quality", "diversity"]
if self.analysis_type not in valid_analysis_types:
raise ValueError(f"analysis_type must be one of {valid_analysis_types}")
if not 0 <= self.threshold <= 1:
raise ValueError("threshold must be between 0 and 1")
valid_formats = ["json", "csv", "tsv"]
if self.output_format not in valid_formats:
raise ValueError(f"output_format must be one of {valid_formats}")
def call(self, project: "Project") -> "Project":
"""
Main execution method
Args:
project: Celline project instance
Returns:
Updated project instance
"""
self.logger.info(f"Starting {self.analysis_type} analysis with threshold {self.threshold}")
try:
# Get samples from project
samples = self._get_project_samples(project)
self.logger.info(f"Found {len(samples)} samples to process")
# Process each sample
results = {}
for sample_id in samples:
self.logger.info(f"Processing sample: {sample_id}")
sample_result = self._process_sample(project, sample_id)
results[sample_id] = sample_result
# Save combined results
self._save_results(project, results)
self.logger.info("Analysis completed successfully")
except Exception as e:
self.logger.error(f"Analysis failed: {e}")
raise
return project
def _get_project_samples(self, project: "Project") -> List[str]:
"""Get list of samples from project"""
import toml
samples_file = f"{Config.PROJ_ROOT}/samples.toml"
if not os.path.exists(samples_file):
self.logger.warning("No samples.toml found")
return []
with open(samples_file, 'r') as f:
samples_data = toml.load(f)
return list(samples_data.keys())
def _process_sample(self, project: "Project", sample_id: str) -> Dict[str, Any]:
"""Process individual sample"""
from celline.utils.path import Path
# Get sample path
sample_path = self._get_sample_path(project, sample_id)
# Load sample data
sample_data = self._load_sample_data(sample_path)
# Perform analysis based on type
if self.analysis_type == "expression":
result = self._analyze_expression(sample_data)
elif self.analysis_type == "quality":
result = self._analyze_quality(sample_data)
elif self.analysis_type == "diversity":
result = self._analyze_diversity(sample_data)
else:
raise ValueError(f"Unknown analysis type: {self.analysis_type}")
return result
def _get_sample_path(self, project: "Project", sample_id: str) -> "Path":
"""Get path object for sample"""
from celline.utils.path import Path
from celline.DB.dev.handler import HandleResolver
# Resolve sample to get project information
resolver = HandleResolver.resolve(sample_id)
if resolver is None:
raise ValueError(f"Cannot resolve sample: {sample_id}")
sample_schema = resolver.sample.search(sample_id)
if sample_schema.parent is None:
raise ValueError(f"Sample {sample_id} has no parent project")
return Path(sample_schema.parent, sample_id)
def _load_sample_data(self, sample_path: "Path") -> Dict[str, Any]:
"""Load sample data from various sources"""
data = {}
# Load count matrix if available
if sample_path.is_counted:
try:
import scanpy as sc
count_matrix_path = f"{sample_path.resources_sample_counted}/outs/filtered_feature_bc_matrix.h5"
adata = sc.read_10x_h5(count_matrix_path)
data['count_matrix'] = adata
self.logger.debug(f"Loaded count matrix: {adata.n_obs} cells, {adata.n_vars} genes")
except Exception as e:
self.logger.warning(f"Could not load count matrix: {e}")
# Load cell type predictions if available
if sample_path.is_predicted_celltype:
try:
import pandas as pd
celltype_path = sample_path.data_sample_predicted_celltype
celltype_data = pd.read_csv(celltype_path, sep='\t')
data['cell_types'] = celltype_data
self.logger.debug(f"Loaded cell type predictions: {len(celltype_data)} cells")
except Exception as e:
self.logger.warning(f"Could not load cell type predictions: {e}")
# Load QC data if available
if sample_path.is_preprocessed:
try:
import pandas as pd
qc_path = f"{sample_path.data_sample}/cell_info.tsv"
qc_data = pd.read_csv(qc_path, sep='\t')
data['qc_metrics'] = qc_data
self.logger.debug(f"Loaded QC metrics: {len(qc_data)} cells")
except Exception as e:
self.logger.warning(f"Could not load QC metrics: {e}")
return data
def _analyze_expression(self, sample_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze gene expression patterns"""
if 'count_matrix' not in sample_data:
raise ValueError("Count matrix required for expression analysis")
import numpy as np
adata = sample_data['count_matrix']
# Calculate basic expression statistics
mean_expression = np.mean(adata.X.toarray(), axis=0)
std_expression = np.std(adata.X.toarray(), axis=0)
# Find highly variable genes
cv = std_expression / (mean_expression + 1e-12) # Coefficient of variation
high_var_threshold = np.percentile(cv, (1 - self.threshold) * 100)
high_var_genes = adata.var_names[cv > high_var_threshold]
# Calculate expression metrics
total_counts_per_cell = np.sum(adata.X.toarray(), axis=1)
genes_per_cell = np.sum(adata.X.toarray() > 0, axis=1)
result = {
'analysis_type': 'expression',
'total_cells': adata.n_obs,
'total_genes': adata.n_vars,
'highly_variable_genes': len(high_var_genes),
'highly_variable_gene_list': high_var_genes.tolist(),
'mean_counts_per_cell': float(np.mean(total_counts_per_cell)),
'mean_genes_per_cell': float(np.mean(genes_per_cell)),
'threshold_used': self.threshold
}
return result
def _analyze_quality(self, sample_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze data quality metrics"""
if 'qc_metrics' not in sample_data:
raise ValueError("QC metrics required for quality analysis")
qc_data = sample_data['qc_metrics']
# Quality thresholds
min_genes = 200
max_genes = 5000
max_mt_percent = 20
# Calculate quality metrics
high_quality_cells = qc_data[
(qc_data['n_genes_by_counts'] >= min_genes) &
(qc_data['n_genes_by_counts'] <= max_genes) &
(qc_data['pct_counts_mt'] <= max_mt_percent)
]
result = {
'analysis_type': 'quality',
'total_cells': len(qc_data),
'high_quality_cells': len(high_quality_cells),
'quality_rate': len(high_quality_cells) / len(qc_data),
'mean_genes_per_cell': float(qc_data['n_genes_by_counts'].mean()),
'mean_mt_percent': float(qc_data['pct_counts_mt'].mean()),
'passes_threshold': len(high_quality_cells) / len(qc_data) >= self.threshold
}
return result
def _analyze_diversity(self, sample_data: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze cell type diversity"""
if 'cell_types' not in sample_data:
raise ValueError("Cell type predictions required for diversity analysis")
celltype_data = sample_data['cell_types']
# Calculate cell type diversity
celltype_counts = celltype_data['cell_type'].value_counts()
total_cells = len(celltype_data)
# Shannon diversity index
proportions = celltype_counts / total_cells
shannon_diversity = -np.sum(proportions * np.log(proportions + 1e-12))
# Simpson diversity index
simpson_diversity = 1 - np.sum(proportions ** 2)
result = {
'analysis_type': 'diversity',
'total_cells': total_cells,
'unique_cell_types': len(celltype_counts),
'cell_type_counts': celltype_counts.to_dict(),
'shannon_diversity': float(shannon_diversity),
'simpson_diversity': float(simpson_diversity),
'meets_diversity_threshold': float(shannon_diversity) >= self.threshold
}
return result
def _save_results(self, project: "Project", results: Dict[str, Dict[str, Any]]):
"""Save analysis results"""
import json
import pandas as pd
from datetime import datetime
# Create results directory
results_dir = f"{Config.PROJ_ROOT}/results/example_analysis"
os.makedirs(results_dir, exist_ok=True)
# Add metadata
metadata = {
'analysis_function': 'ExampleFunction',
'analysis_type': self.analysis_type,
'threshold': self.threshold,
'output_format': self.output_format,
'timestamp': datetime.now().isoformat(),
'total_samples': len(results)
}
combined_results = {
'metadata': metadata,
'sample_results': results
}
# Save in requested format
if self.output_format == "json":
output_file = f"{results_dir}/results.json"
with open(output_file, 'w') as f:
json.dump(combined_results, f, indent=2)
elif self.output_format in ["csv", "tsv"]:
separator = "," if self.output_format == "csv" else "\t"
output_file = f"{results_dir}/results.{self.output_format}"
# Flatten results for tabular format
flat_results = []
for sample_id, sample_result in results.items():
flat_result = {'sample_id': sample_id}
flat_result.update(sample_result)
flat_results.append(flat_result)
df = pd.DataFrame(flat_results)
df.to_csv(output_file, sep=separator, index=False)
self.logger.info(f"Results saved to: {output_file}")
# CLI Integration Methods
def add_cli_args(self, parser: argparse.ArgumentParser) -> None:
"""Add CLI arguments"""
parser.add_argument(
'analysis_type',
choices=['expression', 'quality', 'diversity'],
help='Type of analysis to perform'
)
parser.add_argument(
'--threshold', '-t',
type=float,
default=0.5,
help='Analysis threshold (default: 0.5)'
)
parser.add_argument(
'--output-format', '-f',
choices=['json', 'csv', 'tsv'],
default='json',
help='Output format (default: json)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose logging'
)
def cli(self, project: "Project", args: Optional[argparse.Namespace] = None) -> "Project":
"""CLI entry point"""
if args is None:
raise ValueError("CLI arguments required")
# Update instance parameters from CLI args
self.analysis_type = args.analysis_type
self.threshold = args.threshold
self.output_format = args.output_format
# Set verbose logging if requested
if args.verbose:
self.logger.setLevel("DEBUG")
# Validate updated parameters
self._validate_parameters()
return self.call(project)
def get_description(self) -> str:
"""Function description"""
return """Example custom analysis function for Celline.
This function demonstrates how to create custom analysis functions
that integrate with the Celline framework. It supports multiple
analysis types and output formats."""
def get_usage_examples(self) -> List[str]:
"""Usage examples"""
return [
"celline run example expression",
"celline run example quality --threshold 0.8",
"celline run example diversity --output-format csv",
"celline run example expression --threshold 0.3 --verbose"
]
Step 2: Function Registration
Create a registration system for your custom functions:
# mypackage/registry.py
from celline.cli.registry import get_registry
from .functions.example_function import ExampleFunction
def register_custom_functions():
"""Register all custom functions"""
registry = get_registry()
# Register example function
registry.register_function(
name="example",
class_ref=ExampleFunction,
module_path="mypackage.functions.example_function"
)
print("Custom functions registered successfully")
# Auto-register when package is imported
register_custom_functions()
Step 3: Package Structure
Organize your custom functions in a proper package structure:
mypackage/
├── __init__.py
├── registry.py
├── functions/
│ ├── __init__.py
│ ├── example_function.py
│ ├── advanced_analysis.py
│ └── visualization_function.py
├── utils/
│ ├── __init__.py
│ ├── data_processing.py
│ └── visualization.py
├── tests/
│ ├── __init__.py
│ ├── test_example_function.py
│ └── fixtures/
└── docs/
├── example_function.md
└── api_reference.md
Advanced Function Development
Handling Complex Data Workflows
class AdvancedAnalysisFunction(CellineFunction):
"""Advanced function with complex data workflows"""
def __init__(self, workflow_config: Dict[str, Any]):
self.workflow_config = workflow_config
self.intermediate_results = {}
self.logger = get_logger(__name__)
def call(self, project: "Project") -> "Project":
"""Execute complex workflow with multiple steps"""
# Step 1: Data preparation
self.logger.info("Step 1: Preparing data")
prepared_data = self._prepare_data(project)
self.intermediate_results['prepared_data'] = prepared_data
# Step 2: Initial analysis
self.logger.info("Step 2: Initial analysis")
initial_results = self._initial_analysis(prepared_data)
self.intermediate_results['initial_results'] = initial_results
# Step 3: Advanced processing
self.logger.info("Step 3: Advanced processing")
advanced_results = self._advanced_processing(initial_results)
self.intermediate_results['advanced_results'] = advanced_results
# Step 4: Generate final results
self.logger.info("Step 4: Generating final results")
final_results = self._generate_final_results(advanced_results)
# Step 5: Save and cleanup
self._save_results(project, final_results)
self._cleanup_intermediate_files()
return project
def _prepare_data(self, project: "Project") -> Dict[str, Any]:
"""Prepare data for analysis"""
# Complex data preparation logic
pass
def _initial_analysis(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Perform initial analysis"""
# Initial analysis logic
pass
def _advanced_processing(self, initial_results: Dict[str, Any]) -> Dict[str, Any]:
"""Advanced processing step"""
# Advanced processing logic
pass
def _generate_final_results(self, processed_data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate final results"""
# Final result generation
pass
def _cleanup_intermediate_files(self):
"""Cleanup intermediate files to save space"""
# Cleanup logic
pass
Integration with External Tools
class ExternalToolFunction(CellineFunction):
"""Function that integrates with external tools"""
def __init__(self, tool_path: str, tool_params: Dict[str, Any]):
self.tool_path = tool_path
self.tool_params = tool_params
self.logger = get_logger(__name__)
# Validate tool availability
self._validate_tool()
def _validate_tool(self):
"""Validate external tool availability"""
import shutil
if not shutil.which(self.tool_path):
raise RuntimeError(f"External tool not found: {self.tool_path}")
# Check tool version if needed
result = subprocess.run([self.tool_path, "--version"],
capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Cannot check tool version: {self.tool_path}")
self.logger.info(f"External tool validated: {result.stdout.strip()}")
def call(self, project: "Project") -> "Project":
"""Execute function with external tool"""
samples = self._get_project_samples(project)
for sample_id in samples:
self.logger.info(f"Processing sample with external tool: {sample_id}")
# Prepare input files
input_files = self._prepare_input_files(project, sample_id)
# Execute external tool
output_files = self._execute_tool(input_files)
# Process tool output
results = self._process_tool_output(output_files)
# Save results
self._save_sample_results(project, sample_id, results)
return project
def _execute_tool(self, input_files: Dict[str, str]) -> Dict[str, str]:
"""Execute external tool"""
import subprocess
import tempfile
# Create temporary directory for output
output_dir = tempfile.mkdtemp(prefix="celline_external_")
# Build command
cmd = [self.tool_path]
cmd.extend(self._build_tool_arguments(input_files, output_dir))
# Execute tool
self.logger.debug(f"Executing command: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"External tool failed: {result.stderr}")
# Return output file paths
return self._collect_output_files(output_dir)
Parallel Processing Support
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from typing import Callable
class ParallelProcessingFunction(CellineFunction):
"""Function with parallel processing capabilities"""
def __init__(self, n_workers: int = 4, use_processes: bool = False):
self.n_workers = n_workers
self.use_processes = use_processes
self.logger = get_logger(__name__)
def call(self, project: "Project") -> "Project":
"""Execute with parallel processing"""
samples = self._get_project_samples(project)
# Choose executor based on configuration
executor_class = ProcessPoolExecutor if self.use_processes else ThreadPoolExecutor
with executor_class(max_workers=self.n_workers) as executor:
# Submit all tasks
futures = {
executor.submit(self._process_sample, project, sample_id): sample_id
for sample_id in samples
}
# Collect results
results = {}
for future in futures:
sample_id = futures[future]
try:
result = future.result()
results[sample_id] = result
self.logger.info(f"Completed processing: {sample_id}")
except Exception as e:
self.logger.error(f"Failed to process {sample_id}: {e}")
results[sample_id] = None
# Save combined results
self._save_combined_results(project, results)
return project
def _process_sample(self, project: "Project", sample_id: str) -> Dict[str, Any]:
"""Process individual sample (safe for parallel execution)"""
# Sample processing logic that is thread/process safe
pass
Testing Custom Functions
Unit Test Template
# tests/test_example_function.py
import pytest
import tempfile
import shutil
from pathlib import Path
from unittest.mock import Mock, patch, MagicMock
from celline import Project
from mypackage.functions.example_function import ExampleFunction
class TestExampleFunction:
@pytest.fixture
def temp_project_dir(self):
"""Create temporary project directory"""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.fixture
def test_project(self, temp_project_dir):
"""Create test project with sample data"""
project_path = Path(temp_project_dir)
# Create project structure
(project_path / "data").mkdir()
(project_path / "resources").mkdir()
(project_path / "results").mkdir()
# Create configuration files
setting_content = """
[project]
name = "test_project"
version = "1.0.0"
"""
(project_path / "setting.toml").write_text(setting_content)
samples_content = """
GSM123456 = "Test Sample 1"
GSM789012 = "Test Sample 2"
"""
(project_path / "samples.toml").write_text(samples_content)
return Project(str(project_path), "test_project")
def test_function_initialization(self):
"""Test function initialization and parameter validation"""
# Valid initialization
func = ExampleFunction("expression", 0.5, "json")
assert func.analysis_type == "expression"
assert func.threshold == 0.5
assert func.output_format == "json"
# Invalid analysis type
with pytest.raises(ValueError, match="analysis_type must be one of"):
ExampleFunction("invalid_type", 0.5, "json")
# Invalid threshold
with pytest.raises(ValueError, match="threshold must be between 0 and 1"):
ExampleFunction("expression", 1.5, "json")
# Invalid output format
with pytest.raises(ValueError, match="output_format must be one of"):
ExampleFunction("expression", 0.5, "invalid_format")
@patch('celline.utils.path.Path')
@patch('celline.DB.dev.handler.HandleResolver')
def test_sample_processing(self, mock_resolver, mock_path, test_project):
"""Test sample processing logic"""
# Setup mocks
mock_sample_schema = Mock()
mock_sample_schema.parent = "test_parent"
mock_resolver.resolve.return_value.sample.search.return_value = mock_sample_schema
mock_path_instance = Mock()
mock_path_instance.is_counted = True
mock_path_instance.is_predicted_celltype = True
mock_path_instance.is_preprocessed = True
mock_path.return_value = mock_path_instance
# Create function and test
func = ExampleFunction("expression", 0.5, "json")
with patch.object(func, '_load_sample_data') as mock_load_data:
mock_load_data.return_value = {
'count_matrix': self._create_mock_adata()
}
result = func._process_sample(test_project, "GSM123456")
assert result['analysis_type'] == 'expression'
assert 'total_cells' in result
assert 'total_genes' in result
def _create_mock_adata(self):
"""Create mock AnnData object"""
import numpy as np
mock_adata = Mock()
mock_adata.n_obs = 1000
mock_adata.n_vars = 2000
mock_adata.X.toarray.return_value = np.random.rand(1000, 2000)
mock_adata.var_names = [f"GENE_{i}" for i in range(2000)]
return mock_adata
def test_cli_argument_parsing(self):
"""Test CLI argument parsing"""
import argparse
func = ExampleFunction("expression", 0.5, "json")
parser = argparse.ArgumentParser()
func.add_cli_args(parser)
# Test valid arguments
args = parser.parse_args(['expression', '--threshold', '0.8', '--output-format', 'csv'])
assert args.analysis_type == 'expression'
assert args.threshold == 0.8
assert args.output_format == 'csv'
# Test default values
args = parser.parse_args(['quality'])
assert args.analysis_type == 'quality'
assert args.threshold == 0.5
assert args.output_format == 'json'
@patch('builtins.open', create=True)
@patch('json.dump')
def test_result_saving(self, mock_json_dump, mock_open, test_project):
"""Test result saving functionality"""
func = ExampleFunction("expression", 0.5, "json")
test_results = {
"GSM123456": {
"analysis_type": "expression",
"total_cells": 1000,
"total_genes": 2000
}
}
func._save_results(test_project, test_results)
# Verify file operations
mock_open.assert_called()
mock_json_dump.assert_called()
Integration Tests
# tests/integration/test_example_function_integration.py
import pytest
import tempfile
import os
from pathlib import Path
from celline import Project
from mypackage.functions.example_function import ExampleFunction
class TestExampleFunctionIntegration:
@pytest.fixture
def full_test_project(self):
"""Create full test project with real data structure"""
temp_dir = tempfile.mkdtemp()
project_path = Path(temp_dir)
# Create complete project structure
self._create_project_structure(project_path)
self._create_sample_data(project_path)
yield Project(str(project_path), "integration_test")
shutil.rmtree(temp_dir, ignore_errors=True)
def _create_project_structure(self, project_path: Path):
"""Create realistic project structure"""
# Create directories
(project_path / "data" / "GSM123456").mkdir(parents=True)
(project_path / "resources" / "GSM123456" / "counted" / "outs").mkdir(parents=True)
(project_path / "results").mkdir()
# Create configuration files
(project_path / "setting.toml").write_text("""
[project]
name = "integration_test"
version = "1.0.0"
""")
(project_path / "samples.toml").write_text("""
GSM123456 = "Integration Test Sample"
""")
def _create_sample_data(self, project_path: Path):
"""Create realistic sample data files"""
# Create mock HDF5 file (in real test, use actual data)
h5_file = project_path / "resources" / "GSM123456" / "counted" / "outs" / "filtered_feature_bc_matrix.h5"
h5_file.touch() # In real test, create actual HDF5 data
# Create QC metrics file
qc_file = project_path / "data" / "GSM123456" / "cell_info.tsv"
qc_content = """barcode\tn_genes_by_counts\ttotal_counts\tpct_counts_mt\tinclude
CELL_1\t1500\t5000\t5.0\ttrue
CELL_2\t2000\t8000\t3.2\ttrue
CELL_3\t800\t2500\t15.0\tfalse
"""
qc_file.write_text(qc_content)
# Create cell type predictions
celltype_file = project_path / "data" / "GSM123456" / "predicted_celltype.tsv"
celltype_content = """cell\tscpred_prediction
GSM123456_1\tT_cell
GSM123456_2\tB_cell
GSM123456_3\tNK_cell
"""
celltype_file.write_text(celltype_content)
@pytest.mark.slow
def test_full_workflow_execution(self, full_test_project):
"""Test complete workflow execution"""
func = ExampleFunction("quality", 0.7, "json")
# Execute function
result_project = func.call(full_test_project)
# Verify results
assert result_project is not None
# Check output files
results_dir = Path(full_test_project.PROJ_PATH) / "results" / "example_analysis"
assert results_dir.exists()
results_file = results_dir / "results.json"
assert results_file.exists()
# Verify result content
import json
with open(results_file) as f:
results = json.load(f)
assert "metadata" in results
assert "sample_results" in results
assert results["metadata"]["analysis_type"] == "quality"
This comprehensive guide covers the essential aspects of creating custom functions for Celline, from basic templates to advanced features and thorough testing strategies.