This is the developer guide for Celline. It provides detailed explanations of creating custom functions, contributing to the codebase, and plugin development.
🎯 Development Overview
Celline is designed with extensibility in mind, and functionality can be extended through the following methods:
- Creating Custom Functions - Adding new analysis capabilities
- Adding Database Handlers - Supporting new data sources
- Extending Execution Backends - Supporting new computing environments
- Developing Web UI Components - Extending the interface
🛠️ Development Environment Setup
1. Getting the Source Code
# Clone the repository
git clone https://github.com/YUYA556223/celline.git
cd celline
# Checkout the development branch
git checkout develop
2. Installing Development Dependencies
# Development with UV environment
uv sync --all-extras
# Or development with pip
pip install -e ".[dev]"
# Install development tools
pip install ruff black pytest pytest-cov sphinx
3. Verifying Development Environment
# Run tests
pytest tests/
# Check code formatting
ruff check src/
# Type checking
mypy src/celline/
🔧 Custom Function Development
Basic Function Class
from celline.functions._base import CellineFunction
from celline.log.logger import get_logger
import argparse
from typing import Optional, List, TYPE_CHECKING
if TYPE_CHECKING:
from celline import Project
class MyCustomFunction(CellineFunction):
"""Example custom analysis function"""
def __init__(self, parameter1: str, parameter2: int = 10):
"""
Initialization method
Args:
parameter1: Required parameter
parameter2: Optional parameter
"""
super().__init__()
self.parameter1 = parameter1
self.parameter2 = parameter2
self.logger = get_logger(__name__)
def call(self, project: "Project") -> "Project":
"""
Main processing - required implementation method
Args:
project: Celline project instance
Returns:
project: Processed project
"""
self.logger.info(f"Starting custom analysis with {self.parameter1}")
try:
# Get samples
samples = self._get_samples_from_project(project)
# Process each sample
for sample_id in samples:
self.logger.info(f"Processing sample: {sample_id}")
self._process_sample(project, sample_id)
self.logger.info("Custom analysis completed successfully")
except Exception as e:
self.logger.error(f"Error in custom analysis: {e}")
raise
return project
def _get_samples_from_project(self, project: "Project") -> List[str]:
"""Get sample list from project"""
import toml
from celline.config import Config
samples_file = f"{Config.PROJ_ROOT}/samples.toml"
if not os.path.exists(samples_file):
return []
with open(samples_file, 'r') as f:
samples = toml.load(f)
return list(samples.keys())
def _process_sample(self, project: "Project", sample_id: str):
"""Process individual sample"""
from celline.utils.path import Path
import scanpy as sc
# Path setup
path = Path("dummy_project", sample_id) # Use actual project ID
# Load data
if path.is_counted:
adata = sc.read_10x_h5(f"{path.resources_sample_counted}/outs/filtered_feature_bc_matrix.h5")
# Custom processing
result = self._custom_analysis(adata)
# Save results
self._save_results(path, result)
def _custom_analysis(self, adata):
"""Custom analysis logic"""
import numpy as np
# Example: high expression gene detection
mean_expression = np.mean(adata.X.toarray(), axis=0)
high_expr_genes = adata.var_names[mean_expression > np.percentile(mean_expression, 90)]
return {
"high_expression_genes": high_expr_genes.tolist(),
"total_cells": adata.n_obs,
"total_genes": adata.n_vars,
"parameter1_used": self.parameter1,
"parameter2_used": self.parameter2
}
def _save_results(self, path: "Path", result: dict):
"""Save results"""
import json
result_file = f"{path.data_sample}/custom_analysis_results.json"
with open(result_file, 'w') as f:
json.dump(result, f, indent=2)
def add_cli_args(self, parser: argparse.ArgumentParser) -> None:
"""Define CLI arguments"""
parser.add_argument(
'--parameter1', '-p1',
required=True,
help='Required parameter 1'
)
parser.add_argument(
'--parameter2', '-p2',
type=int,
default=10,
help='Optional parameter 2 (default: 10)'
)
def cli(self, project: "Project", args: Optional[argparse.Namespace] = None) -> "Project":
"""CLI execution entry point"""
if args:
self.parameter1 = args.parameter1
self.parameter2 = args.parameter2
return self.call(project)
def get_description(self) -> str:
"""Function description"""
return """Custom analysis function example.
This function demonstrates how to create custom analysis
functions that integrate with the Celline framework."""
def get_usage_examples(self) -> List[str]:
"""Usage examples"""
return [
"celline run my_custom --parameter1 value1",
"celline run my_custom --parameter1 value1 --parameter2 20"
]
Function Registration
# Register function to Celline
from celline.cli.registry import get_registry
def register_custom_functions():
"""Register custom functions"""
registry = get_registry()
registry.register_function(
name="my_custom",
class_ref=MyCustomFunction,
module_path="my_package.custom_functions"
)
# Execute during package initialization
register_custom_functions()
Creating Tests
# tests/test_custom_function.py
import pytest
import tempfile
import os
from celline import Project
from my_package.custom_functions import MyCustomFunction
class TestMyCustomFunction:
@pytest.fixture
def temp_project(self):
"""Temporary project for testing"""
with tempfile.TemporaryDirectory() as temp_dir:
# Create configuration file
setting_content = """
[project]
name = "test_project"
version = "1.0.0"
[execution]
system = "multithreading"
nthread = 1
[R]
r_path = "/usr/bin/R"
[fetch]
wait_time = 1
"""
with open(f"{temp_dir}/setting.toml", 'w') as f:
f.write(setting_content)
# Create sample file
with open(f"{temp_dir}/samples.toml", 'w') as f:
f.write('GSM123456 = "Test Sample"\n')
yield Project(temp_dir, "test_project")
def test_custom_function_creation(self):
"""Test custom function creation"""
func = MyCustomFunction("test_param", 5)
assert func.parameter1 == "test_param"
assert func.parameter2 == 5
def test_custom_function_execution(self, temp_project):
"""Test custom function execution"""
func = MyCustomFunction("test_param", 5)
# Prepare mock data if needed
# ...
result_project = func.call(temp_project)
assert result_project is not None
def test_cli_args_parsing(self):
"""Test CLI argument parsing"""
import argparse
func = MyCustomFunction("dummy", 1)
parser = argparse.ArgumentParser()
func.add_cli_args(parser)
args = parser.parse_args(['--parameter1', 'test_value', '--parameter2', '15'])
assert args.parameter1 == 'test_value'
assert args.parameter2 == 15
🗄️ Database Handler Development
Custom Database Handler
from celline.DB.dev.handler import DatabaseHandler
from celline.DB.dev.model import SampleSchema
from typing import Optional
import requests
class CustomDatabaseHandler(DatabaseHandler):
"""Example custom database handler"""
def __init__(self):
self.api_base = "https://api.customdb.org/v1"
self.headers = {"Accept": "application/json"}
def can_handle(self, sample_id: str) -> bool:
"""Check if this handler can process the sample ID"""
return sample_id.startswith("CUSTOM")
def fetch_sample_metadata(self, sample_id: str) -> Optional[SampleSchema]:
"""Fetch sample metadata"""
try:
response = requests.get(
f"{self.api_base}/samples/{sample_id}",
headers=self.headers,
timeout=30
)
response.raise_for_status()
data = response.json()
# Convert to SampleSchema
schema = SampleSchema(
key=data["id"],
title=data["title"],
organism=data["organism"],
library_strategy=data["library_strategy"],
parent=data["study_id"],
children=data.get("run_ids", ""),
# Other required fields
)
return schema
except requests.RequestException as e:
self.logger.error(f"Failed to fetch metadata for {sample_id}: {e}")
return None
def add(self, sample_id: str) -> bool:
"""Add sample to local database"""
metadata = self.fetch_sample_metadata(sample_id)
if metadata:
# Save to local database
self._save_to_local_db(metadata)
return True
return False
def _save_to_local_db(self, schema: SampleSchema):
"""Save to local database"""
import polars as pl
from celline.config import Config
# Load existing data
db_file = f"{Config.EXEC_ROOT}/DB/CUSTOM_SAMPLES.parquet"
if os.path.exists(db_file):
df = pl.read_parquet(db_file)
else:
df = pl.DataFrame()
# Add new record
new_record = pl.DataFrame([{
"key": schema.key,
"title": schema.title,
"organism": schema.organism,
"library_strategy": schema.library_strategy,
"parent": schema.parent,
"children": schema.children
}])
# Combine data and save
updated_df = pl.concat([df, new_record]) if not df.is_empty() else new_record
updated_df.write_parquet(db_file)
# Register handler
from celline.DB.dev.handler import HandleResolver
def register_custom_handler():
"""Register custom handler"""
custom_handler = CustomDatabaseHandler()
HandleResolver.register_handler(custom_handler)
register_custom_handler()
🎨 Web UI Component Development
Vue.js Component
<!-- CustomAnalysisPanel.vue -->
<template>
<div class="custom-analysis-panel">
<h3>Custom Analysis</h3>
<div class="parameter-section">
<h4>Parameters</h4>
<div class="form-group">
<label for="parameter1">Parameter 1:</label>
<input
id="parameter1"
v-model="parameters.parameter1"
type="text"
placeholder="Enter parameter 1"
required
/>
</div>
<div class="form-group">
<label for="parameter2">Parameter 2:</label>
<input
id="parameter2"
v-model="parameters.parameter2"
type="number"
min="1"
max="100"
/>
</div>
</div>
<div class="action-section">
<button
@click="executeAnalysis"
:disabled="isRunning || !canExecute"
class="btn-primary"
>
{{ isRunning ? 'Running...' : 'Execute Analysis' }}
</button>
</div>
<div v-if="result" class="result-section">
<h4>Results</h4>
<div class="result-content">
<p>High expression genes found: {{ result.high_expression_genes.length }}</p>
<p>Total cells: {{ result.total_cells }}</p>
<p>Total genes: {{ result.total_genes }}</p>
</div>
</div>
<div v-if="error" class="error-section">
<p class="error-message">{{ error }}</p>
</div>
</div>
</template>
<script>
export default {
name: 'CustomAnalysisPanel',
data() {
return {
parameters: {
parameter1: '',
parameter2: 10
},
isRunning: false,
result: null,
error: null
};
},
computed: {
canExecute() {
return this.parameters.parameter1.trim() !== '';
}
},
methods: {
async executeAnalysis() {
this.isRunning = true;
this.error = null;
try {
const response = await fetch('/api/functions/my_custom/execute', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(this.parameters)
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const data = await response.json();
if (data.job_id) {
// Monitor job
await this.monitorJob(data.job_id);
} else {
this.result = data.result;
}
} catch (error) {
this.error = `Analysis failed: ${error.message}`;
} finally {
this.isRunning = false;
}
},
async monitorJob(jobId) {
const checkStatus = async () => {
try {
const response = await fetch(`/api/jobs/${jobId}`);
const status = await response.json();
if (status.status === 'completed') {
this.result = status.result;
return true;
} else if (status.status === 'failed') {
this.error = status.message;
return true;
}
return false;
} catch (error) {
this.error = `Job monitoring failed: ${error.message}`;
return true;
}
};
// Polling for job status
while (!(await checkStatus())) {
await new Promise(resolve => setTimeout(resolve, 2000));
}
}
}
};
</script>
<style scoped>
.custom-analysis-panel {
padding: 20px;
border: 1px solid #ddd;
border-radius: 8px;
margin: 10px 0;
}
.form-group {
margin: 10px 0;
}
.form-group label {
display: block;
margin-bottom: 5px;
font-weight: bold;
}
.form-group input {
width: 100%;
padding: 8px;
border: 1px solid #ccc;
border-radius: 4px;
}
.btn-primary {
background-color: #007bff;
color: white;
border: none;
padding: 10px 20px;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
}
.btn-primary:disabled {
background-color: #6c757d;
cursor: not-allowed;
}
.result-section {
margin-top: 20px;
padding: 15px;
background-color: #f8f9fa;
border-radius: 4px;
}
.error-section {
margin-top: 20px;
padding: 15px;
background-color: #f8d7da;
border: 1px solid #f5c6cb;
border-radius: 4px;
}
.error-message {
color: #721c24;
margin: 0;
}
</style>
Adding FastAPI Endpoints
# celline/api/custom_endpoints.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import Dict, Any
import uuid
from datetime import datetime
from celline.functions.my_custom import MyCustomFunction
from celline import Project
router = APIRouter(prefix="/api/functions/my_custom", tags=["custom"])
class CustomAnalysisRequest(BaseModel):
parameter1: str
parameter2: int = 10
class CustomAnalysisResponse(BaseModel):
job_id: str
status: str
# Job storage (use Redis etc. in actual applications)
active_jobs: Dict[str, Dict[str, Any]] = {}
@router.post("/execute", response_model=CustomAnalysisResponse)
async def execute_custom_analysis(
request: CustomAnalysisRequest,
background_tasks: BackgroundTasks
):
"""Execute custom analysis"""
job_id = str(uuid.uuid4())
# Initialize job information
active_jobs[job_id] = {
"status": "pending",
"created_at": datetime.now(),
"progress": 0.0,
"message": "Analysis queued",
"result": None
}
# Execute in background task
background_tasks.add_task(
run_custom_analysis,
job_id,
request.parameter1,
request.parameter2
)
return CustomAnalysisResponse(job_id=job_id, status="started")
async def run_custom_analysis(job_id: str, param1: str, param2: int):
"""Execute custom analysis in background"""
try:
# Update job status
active_jobs[job_id]["status"] = "running"
active_jobs[job_id]["message"] = "Starting custom analysis"
active_jobs[job_id]["progress"] = 10.0
# Get project
project = Project("./") # Current project
# Execute custom function
custom_func = MyCustomFunction(param1, param2)
active_jobs[job_id]["progress"] = 50.0
active_jobs[job_id]["message"] = "Running analysis"
# Execute analysis (if run synchronously)
result_project = custom_func.call(project)
# Load results
result = load_analysis_result(project)
# Complete
active_jobs[job_id]["status"] = "completed"
active_jobs[job_id]["progress"] = 100.0
active_jobs[job_id]["message"] = "Analysis completed successfully"
active_jobs[job_id]["result"] = result
except Exception as e:
active_jobs[job_id]["status"] = "failed"
active_jobs[job_id]["message"] = f"Analysis failed: {str(e)}"
def load_analysis_result(project: Project) -> Dict[str, Any]:
"""Load analysis results"""
import json
import os
from celline.config import Config
# Load result file
result_file = f"{Config.PROJ_ROOT}/data/*/custom_analysis_results.json"
# Use proper file path in actual implementation
# Simplified here
return {
"high_expression_genes": ["GENE1", "GENE2", "GENE3"],
"total_cells": 1000,
"total_genes": 20000
}
@router.get("/status/{job_id}")
async def get_job_status(job_id: str):
"""Get job status"""
if job_id not in active_jobs:
raise HTTPException(status_code=404, detail="Job not found")
return active_jobs[job_id]
# Add router to main app
# celline/api/main.py
from .custom_endpoints import router as custom_router
app.include_router(custom_router)
📊 Testing and Debugging
Writing Unit Tests
# tests/test_custom_analysis.py
import pytest
import tempfile
import json
from unittest.mock import Mock, patch
from celline import Project
from my_package.custom_functions import MyCustomFunction
class TestCustomAnalysisIntegration:
@pytest.fixture
def mock_scanpy_data(self):
"""Mock of Scanpy AnnData object"""
import numpy as np
mock_adata = Mock()
mock_adata.n_obs = 1000
mock_adata.n_vars = 20000
mock_adata.X.toarray.return_value = np.random.rand(1000, 20000)
mock_adata.var_names = [f"GENE_{i}" for i in range(20000)]
return mock_adata
@patch('scanpy.read_10x_h5')
def test_full_analysis_workflow(self, mock_read_h5, mock_scanpy_data):
"""Test full analysis workflow"""
# Setup mock
mock_read_h5.return_value = mock_scanpy_data
with tempfile.TemporaryDirectory() as temp_dir:
# Prepare test environment
project = self._setup_test_project(temp_dir)
# Execute custom function
func = MyCustomFunction("test_param", 5)
result_project = func.call(project)
# Verify results
assert result_project is not None
# Check result files
result_files = self._find_result_files(temp_dir)
assert len(result_files) > 0
# Verify result content
with open(result_files[0], 'r') as f:
result = json.load(f)
assert "high_expression_genes" in result
assert "total_cells" in result
assert result["parameter1_used"] == "test_param"
assert result["parameter2_used"] == 5
def _setup_test_project(self, temp_dir):
"""Setup test project"""
# Create configuration file
setting_content = """
[project]
name = "test_project"
version = "1.0.0"
[execution]
system = "multithreading"
nthread = 1
[R]
r_path = "/usr/bin/R"
[fetch]
wait_time = 1
"""
with open(f"{temp_dir}/setting.toml", 'w') as f:
f.write(setting_content)
# Create sample file
with open(f"{temp_dir}/samples.toml", 'w') as f:
f.write('GSM123456 = "Test Sample"\n')
# Create data directory
data_dir = f"{temp_dir}/data/GSM123456"
os.makedirs(data_dir, exist_ok=True)
# Create resource directory
resource_dir = f"{temp_dir}/resources/GSM123456/counted/outs"
os.makedirs(resource_dir, exist_ok=True)
return Project(temp_dir, "test_project")
def _find_result_files(self, temp_dir):
"""Find result files"""
import glob
return glob.glob(f"{temp_dir}/data/*/custom_analysis_results.json")
Debug Logging
# Setup detailed logging for debugging
import logging
from celline.log.logger import get_logger
def setup_debug_logging():
"""Setup debug logging"""
# Root logger configuration
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('celline_debug.log'),
logging.StreamHandler()
]
)
# Celline specific logger configuration
celline_logger = get_logger("celline")
celline_logger.setLevel(logging.DEBUG)
# Adjust external library log levels
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Debug mode execution
if __name__ == "__main__":
setup_debug_logging()
# Test execution of custom function
logger = get_logger(__name__)
logger.debug("Starting debug session")
try:
project = Project("./test_project")
func = MyCustomFunction("debug_param", 1)
func.call(project)
except Exception as e:
logger.exception("Error occurred during debug session")
raise
🚀 Contributing
Creating Pull Requests
# 1. Clone forked repository
git clone https://github.com/yourusername/celline.git
cd celline
# 2. Create new branch
git checkout -b feature/my-new-feature
# 3. Make changes
# Code changes, add tests, etc.
# 4. Run tests
pytest tests/
# 5. Code formatting
ruff format src/
ruff check src/ --fix
# 6. Commit
git add .
git commit -m "Add new custom analysis feature"
# 7. Push
git push origin feature/my-new-feature
# 8. Create pull request on GitHub
Code Quality Standards
# Coding convention examples
# 1. Use type hints
def process_data(data: List[Dict[str, Any]]) -> pd.DataFrame:
"""Use clear type hints"""
pass
# 2. Documentation
def complex_function(param1: str, param2: int) -> bool:
"""
Add detailed documentation for complex functions
Args:
param1: Description of parameter 1
param2: Description of parameter 2
Returns:
Boolean result of processing
Raises:
ValueError: When invalid parameters are provided
Example:
>>> result = complex_function("test", 42)
>>> assert result is True
"""
pass
# 3. Error handling
def safe_function():
"""Proper error handling"""
try:
# Processing
pass
except SpecificException as e:
logger.error(f"Specific error occurred: {e}")
raise
except Exception as e:
logger.exception("Unexpected error occurred")
raise RuntimeError(f"Function failed: {e}") from e
Documentation
# Docstring writing examples
class NewAnalysisFunction(CellineFunction):
"""
New analysis function class
This function performs XX analysis and generates YY results.
Attributes:
param1: Analysis parameter 1
param2: Analysis parameter 2
Example:
>>> func = NewAnalysisFunction("value1", 10)
>>> result = func.call(project)
>>> print(result.status)
"""
def __init__(self, param1: str, param2: int):
"""
Initialization
Args:
param1: Required parameter
param2: Optional parameter
"""
pass
def call(self, project: "Project") -> "Project":
"""
Main processing
Args:
project: Celline project
Returns:
Processed project
Raises:
CellineException: When processing fails
"""
pass
🔧 Debugging and Profiling
Performance Measurement
import cProfile
import pstats
from functools import wraps
def profile_function(func):
"""Function profiling decorator"""
@wraps(func)
def wrapper(*args, **kwargs):
profiler = cProfile.Profile()
profiler.enable()
try:
result = func(*args, **kwargs)
finally:
profiler.disable()
# Output results
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20
return result
return wrapper
# Usage example
@profile_function
def heavy_analysis_function(data):
"""Heavy processing function"""
# Processing content
pass
Memory Usage Monitoring
import psutil
import os
from contextlib import contextmanager
@contextmanager
def memory_monitor(description: str = ""):
"""Monitor memory usage"""
process = psutil.Process(os.getpid())
start_memory = process.memory_info().rss / 1024 / 1024 # MB
print(f"Memory before {description}: {start_memory:.2f} MB")
try:
yield
finally:
end_memory = process.memory_info().rss / 1024 / 1024 # MB
diff = end_memory - start_memory
print(f"Memory after {description}: {end_memory:.2f} MB (diff: {diff:+.2f} MB)")
# Usage example
with memory_monitor("custom analysis"):
result = heavy_function(large_data)
Success: We welcome contributions to the Celline ecosystem using this developer guide! If you have questions or suggestions, please use GitHub Issues or Discussions.