/Advanced Usage

Advanced Usage

This document explains advanced features and usage methods of Celline. It provides detailed explanations of advanced functions such as custom workflows, large-scale data processing, and integrated analysis.

🚀 Large-Scale Data Processing

Execution in Cluster Environments

PBS/Torque Cluster

      # PBS system configuration
celline config --system PBS --pbs-server my-cluster

# Job resource configuration
cat > pbs_config.toml << EOF
[pbs]
queue = "normal"
walltime = "48:00:00"
memory = "128GB"
ncpus = 32
nodes = 1
EOF

# Large dataset processing
celline run count --config pbs_config.toml

    

Slurm Cluster (Custom Implementation)

      #!/bin/bash
#SBATCH --job-name=celline-analysis
#SBATCH --time=24:00:00
#SBATCH --mem=64G
#SBATCH --cpus-per-task=16

module load R/4.3.0 cellranger/7.0.0

# Process multiple samples in parallel
celline run download --nthread 16
celline run count --nthread 16

    

Memory-Efficient Processing

      # Memory management for large datasets
from celline import Project
from celline.utils.memory import MemoryManager

project = Project("./large-dataset")

# Limit memory usage
with MemoryManager(max_memory="32GB") as mem:
    # Process in chunks
    for chunk in project.iter_samples(chunk_size=10):
        project.process_chunk(chunk)
        mem.clear_cache()

    

🔬 Custom Workflows

Pipeline Design

      from celline import Project, Pipeline
from celline.functions import *

# Custom pipeline definition
class CustomSingleCellPipeline(Pipeline):
    def __init__(self, project: Project):
        super().__init__(project)
        
    def run(self):
        # Step-by-step processing
        self.stage1_data_acquisition()
        self.stage2_quality_control()
        self.stage3_analysis()
        self.stage4_integration()
    
    def stage1_data_acquisition(self):
        """Data acquisition stage"""
        self.project.call(Add(self.sample_list))
        self.project.call(Download())
        
    def stage2_quality_control(self):
        """Quality control stage"""
        self.project.call(Count())
        self.project.call(Preprocess())
        
    def stage3_analysis(self):
        """Analysis stage"""
        self.project.call(PredictCelltype())
        self.project.call(Reduce())
        
    def stage4_integration(self):
        """Integration stage"""
        self.project.call(Integrate())

# Pipeline execution
project = Project("./my-analysis")
pipeline = CustomSingleCellPipeline(project)
pipeline.run()

    

Conditional Processing

      from celline.functions import *

project = Project("./conditional-analysis")

# Processing branching based on conditions
project.call_if_else(
    condition=lambda p: p.has_spatial_data(),
    true=SpatialAnalysis(),
    false=StandardAnalysis()
)

# Processing based on data type
if project.is_10x_data():
    project.call(Count())
elif project.is_smartseq_data():
    project.call(SmartSeqProcessing())
else:
    project.call(CustomProcessing())

    

📊 Batch Processing and Automation

Batch Processing of Multiple Projects

      #!/bin/bash
# batch_analysis.sh

# Project list file
PROJECTS="project_list.txt"

while IFS=',' read -r project_name gse_id description; do
    echo "Processing $project_name ($gse_id)"
    
    # Create project directory
    mkdir -p "$project_name"
    cd "$project_name"
    
    # Execute analysis pipeline
    celline init "$project_name"
    celline run add "$gse_id"
    celline run download --nthread 8
    celline run count
    celline run preprocess
    celline run predict_celltype
    
    # Archive results
    tar -czf "../${project_name}_results.tar.gz" results/
    
    cd ..
done < "$PROJECTS"

    

Automation Using Makefile

      # Makefile for Celline analysis

# Variable definitions
PROJECT_NAME := my-scrna-analysis
GSE_ID := GSE123456
NTHREAD := 8

# Default target
all: init download count preprocess analyze

# Project initialization
init:
    celline init $(PROJECT_NAME)

# Sample addition
add:
    celline run add $(GSE_ID)

# Data download
download: add
    celline run download --nthread $(NTHREAD)

# Count processing
count: download
    celline run count

# Preprocessing
preprocess: count
    celline run preprocess

# Analysis
analyze: preprocess
    celline run predict_celltype
    celline run reduce
    celline run integrate

# Check results
check:
    celline info
    find results/ -name "*.csv" -o -name "*.png" | head -10

# Cleanup
clean:
    rm -rf resources/*/raw resources/*/tmp
    
# Complete cleanup
distclean:
    rm -rf resources/ data/ results/

.PHONY: all init add download count preprocess analyze check clean distclean

    

🔄 Integrated Analysis

Multi-Dataset Integration

      from celline.functions.integrate import MultiDatasetIntegration

# Multi-dataset integration analysis
integration = MultiDatasetIntegration([
    "dataset1_GSE123456",
    "dataset2_GSE789012", 
    "dataset3_GSE345678"
])

project = Project("./integrated-analysis")
project.call(integration)

# Comparison of batch effect correction methods
methods = ["harmony", "combat", "mnn", "cca"]
for method in methods:
    integration.method = method
    project.call(integration)
    project.save_results(f"integration_{method}")

    

Meta-Analysis

      from celline.functions.meta import MetaAnalysis

# Meta-analysis execution
meta = MetaAnalysis([
    {"gse": "GSE123456", "condition": "control", "tissue": "brain"},
    {"gse": "GSE789012", "condition": "disease", "tissue": "brain"},
    {"gse": "GSE345678", "condition": "treatment", "tissue": "brain"}
])

project = Project("./meta-analysis")
project.call(meta)

# Statistical analysis of results
meta.perform_differential_analysis()
meta.generate_forest_plots()

    

🧬 Custom Analysis Functions

Creating New Analysis Functions

      from celline.functions._base import CellineFunction
import argparse

class CustomCellTypeClassification(CellineFunction):
    """Custom cell type classification function"""
    
    def __init__(self, model_path: str, confidence_threshold: float = 0.8):
        super().__init__()
        self.model_path = model_path
        self.confidence_threshold = confidence_threshold
    
    def call(self, project):
        """Main processing"""
        import pickle
        import scanpy as sc
        
        # Load custom model
        with open(self.model_path, 'rb') as f:
            model = pickle.load(f)
        
        # Process each sample
        for sample_id in project.get_samples():
            adata = project.load_sample_data(sample_id)
            
            # Feature extraction
            features = self.extract_features(adata)
            
            # Execute prediction
            predictions = model.predict(features)
            confidence = model.predict_proba(features)
            
            # Confidence filtering
            high_conf_mask = confidence.max(axis=1) >= self.confidence_threshold
            
            # Save results
            adata.obs['custom_celltype'] = predictions
            adata.obs['celltype_confidence'] = confidence.max(axis=1)
            adata.obs['high_confidence'] = high_conf_mask
            
            project.save_sample_data(sample_id, adata)
        
        return project
    
    def extract_features(self, adata):
        """Feature extraction"""
        # Custom feature calculation
        import numpy as np
        
        # Highly variable gene expression
        hvg_expr = adata[:, adata.var.highly_variable].X.toarray()
        
        # Pathway scores
        pathway_scores = self.calculate_pathway_scores(adata)
        
        # Concatenation
        features = np.concatenate([hvg_expr, pathway_scores], axis=1)
        return features
    
    def add_cli_args(self, parser: argparse.ArgumentParser):
        """Add CLI arguments"""
        parser.add_argument('--model-path', required=True,
                          help='Path to the trained classification model')
        parser.add_argument('--confidence-threshold', type=float, default=0.8,
                          help='Confidence threshold for predictions')
    
    def get_description(self):
        return "Custom cell type classification using trained models"

    

Registering Analysis Functions

      # Register custom function to Celline
from celline.cli.registry import get_registry

registry = get_registry()
registry.register_function(
    name="custom_classify",
    class_ref=CustomCellTypeClassification,
    module_path="my_custom_functions.classification"
)

# Use via CLI
# celline run custom_classify --model-path my_model.pkl

    

📊 Advanced Data Visualization Features

Custom Plot Creation

      from celline.visualization import CellinePlotter

class AdvancedVisualization(CellineFunction):
    def call(self, project):
        plotter = CellinePlotter(project)
        
        # Integrated UMAP for multiple samples
        plotter.integrated_umap(
            samples=project.get_samples(),
            color_by="celltype",
            split_by="condition"
        )
        
        # Expression heatmap
        plotter.expression_heatmap(
            genes=["CD4", "CD8A", "IL2", "IFNG"],
            group_by="celltype",
            save_path="results/expression_heatmap.pdf"
        )
        
        # Trajectory analysis
        plotter.trajectory_plot(
            root_cell="stem_cell",
            save_path="results/trajectory.pdf"
        )
        
        return project

    

Interactive Visualization

      import plotly.graph_objects as go
from celline.interactive import InteractivePlotter

def create_interactive_plots(project):
    plotter = InteractivePlotter(project)
    
    # 3D UMAP
    fig = plotter.plot_3d_umap(
        color_by="celltype",
        hover_data=["sample", "condition", "batch"]
    )
    
    # Interactive gene expression
    expr_fig = plotter.plot_gene_expression(
        genes=["CD4", "CD8A"],
        plot_type="violin"
    )
    
    # Dashboard creation
    dashboard = plotter.create_dashboard([fig, expr_fig])
    dashboard.serve(port=8050)

    

🔌 API Integration

External Database Integration

      from celline.database import ExternalDBConnector

class CustomDBIntegration(CellineFunction):
    def __init__(self, db_config: dict):
        self.db_config = db_config
    
    def call(self, project):
        # Fetch additional information from external database
        connector = ExternalDBConnector(self.db_config)
        
        for sample_id in project.get_samples():
            # Extend metadata
            extended_meta = connector.fetch_extended_metadata(sample_id)
            
            # Update gene annotations
            gene_annotations = connector.fetch_gene_annotations(
                species=project.get_species()
            )
            
            # Integrate into project
            project.update_sample_metadata(sample_id, extended_meta)
            project.update_gene_annotations(gene_annotations)
        
        return project

    

Machine Learning Pipeline Integration

      from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

class MLPipeline(CellineFunction):
    def __init__(self, model_config: dict):
        self.model_config = model_config
    
    def call(self, project):
        # Build machine learning pipeline
        pipeline = Pipeline([
            ('scaler', StandardScaler()),
            ('classifier', RandomForestClassifier(**self.model_config))
        ])
        
        # Prepare training data
        X_train, y_train = project.prepare_training_data()
        
        # Train model
        pipeline.fit(X_train, y_train)
        
        # Execute predictions
        for sample_id in project.get_samples():
            X_test = project.prepare_test_data(sample_id)
            predictions = pipeline.predict(X_test)
            project.save_predictions(sample_id, predictions)
        
        # Save model
        project.save_model(pipeline, "trained_classifier.pkl")
        
        return project

    

🔧 Performance Optimization

Parallel Processing Optimization

      from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from celline.utils import ProcessManager

class OptimizedProcessing(CellineFunction):
    def call(self, project):
        samples = project.get_samples()
        
        # CPU-intensive tasks use process parallelization
        with ProcessPoolExecutor(max_workers=8) as executor:
            cpu_futures = [
                executor.submit(self.cpu_intensive_task, sample)
                for sample in samples
            ]
        
        # I/O-intensive tasks use thread parallelization
        with ThreadPoolExecutor(max_workers=16) as executor:
            io_futures = [
                executor.submit(self.io_intensive_task, sample)
                for sample in samples
            ]
        
        # Collect results
        cpu_results = [f.result() for f in cpu_futures]
        io_results = [f.result() for f in io_futures]
        
        return project

    

Memory Usage Optimization

      from celline.utils.memory import MemoryProfiler, LazyLoader

class MemoryOptimizedAnalysis(CellineFunction):
    def call(self, project):
        with MemoryProfiler() as profiler:
            # Use lazy loading
            lazy_data = LazyLoader(project.data_path)
            
            # Process in chunks
            for chunk in lazy_data.iter_chunks(chunk_size=1000):
                self.process_chunk(chunk)
                
                # Monitor memory usage
                if profiler.get_memory_usage() > profiler.memory_limit:
                    profiler.clear_cache()
                    gc.collect()
        
        return project

    

🚨 Error Handling and Recovery

Robust Error Handling

      from celline.utils.exceptions import CellineException
from celline.utils.recovery import RecoveryManager

class RobustAnalysis(CellineFunction):
    def call(self, project):
        recovery = RecoveryManager(project)
        
        try:
            # Execute analysis steps
            for step in self.analysis_steps:
                checkpoint = recovery.create_checkpoint(step.name)
                
                try:
                    step.execute(project)
                    recovery.mark_success(checkpoint)
                except Exception as e:
                    recovery.mark_failure(checkpoint, e)
                    
                    # Attempt automatic recovery
                    if recovery.can_recover(step):
                        recovery.recover_from_checkpoint(checkpoint)
                    else:
                        raise CellineException(f"Step {step.name} failed: {e}")
        
        except CellineException as e:
            # Generate detailed error report
            recovery.generate_error_report(e)
            raise
        
        return project

    

Warning: When using advanced features, conduct thorough testing and verify operation with small datasets before running in production environments.