This is the complete reference for Celline's Python API. It provides detailed explanations of classes, functions, and methods for calling Celline's functionality directly from programs.
🎯 API Overview
Celline's Python API consists of the following main components:
- Project: Project management and workflow control
- Functions: Analysis functionality implementation
- Data Handlers: Data types and I/O operations
- Database: Metadata and sample information management
- Visualization: Plotting and interactive features
🏗️ Core Classes
Project
The main class that manages projects and controls analysis workflows.
from celline import Project
# Project creation
project = Project(project_dir="./my-project", proj_name="example")
# Basic usage
project.call(function_instance)
project.parallelize(4)
project.singularize()
Constructor
def __init__(
self,
project_dir: str,
proj_name: str = "",
r_path: str = ""
) -> None:
Parameters:
project_dir
(str): Path to the project directoryproj_name
(str, optional): Project name (default: directory name)r_path
(str, optional): R path (default: auto-detection)
Main Methods
call(func, wait_for_complete=True)
Executes Celline functions.
from celline.functions.add import Add
from celline.functions.download import Download
# Sample addition
add_func = Add([Add.SampleInfo(id="GSE123456", title="Sample 1")])
project.call(add_func)
# Data download
download_func = Download()
project.call(download_func)
parallelize(njobs)
/ singularize()
Controls parallel execution.
# Execute with 4 parallel processes
project.parallelize(4)
# Return to single thread
project.singularize()
call_if_else(condition, true, false)
Controls execution with conditional branching.
project.call_if_else(
condition=lambda p: len(p.get_samples()) > 10,
true=HighThroughputProcessing(),
false=StandardProcessing()
)
Execution Environment Control
# Multithreading execution
project.useMultiThreading()
# PBS cluster execution
project.usePBS("cluster-name")
Seurat Data Objects
# Obtaining Seurat objects
seurat = project.seurat(
project_id="GSE123456",
sample_id="GSM789012",
identifier="seurat.seurat",
via_seurat_disk=False
)
# Direct loading from file path
seurat = project.seurat_from_rawpath("/path/to/seurat.rds")
🔬 Analysis Function Classes
Base Class: CellineFunction
The base class for all analysis functions.
from celline.functions._base import CellineFunction
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from celline import Project
class CustomFunction(CellineFunction):
def __init__(self, param1: str, param2: int):
super().__init__()
self.param1 = param1
self.param2 = param2
def call(self, project: "Project"):
# Custom processing implementation
print(f"Processing with {self.param1}, {self.param2}")
return project
def get_description(self) -> str:
return "Custom analysis function"
Main Analysis Functions
Add - Sample Addition
from celline.functions.add import Add
import polars as pl
# Using SampleInfo
samples = [
Add.SampleInfo(id="GSE123456", title="Dataset 1"),
Add.SampleInfo(id="GSM789012", title="Sample 1")
]
add_func = Add(samples)
# Using DataFrame
df = pl.DataFrame({
"id": ["GSE123456", "GSM789012"],
"title": ["Dataset 1", "Sample 1"]
})
add_func = Add(df)
project.call(add_func)
Download - Data Download
from celline.functions.download import Download
# Basic download
download = Download()
project.call(download)
# Download with callbacks
def on_complete(sample_id):
print(f"Downloaded: {sample_id}")
def on_error(error):
print(f"Error: {error}")
download = Download(then=on_complete, catch=on_error)
project.call(download)
Preprocess - Preprocessing
from celline.functions.preprocess import Preprocess
# Basic preprocessing
preprocess = Preprocess()
project.call(preprocess)
# Target specific cell types
preprocess = Preprocess(target_celltype=["Neuron", "Astrocyte"])
project.call(preprocess)
Other Main Functions
from celline.functions import *
# Count processing
project.call(Count())
# Seurat object creation
project.call(CreateSeuratObject())
# Cell type prediction
project.call(PredictCelltype())
# Dimensionality reduction
project.call(Reduce())
# Data integration
project.call(Integrate())
🗄️ Database API
Database Handlers
from celline.DB.dev.handler import HandleResolver
from celline.DB.dev.model import SampleSchema, RunSchema
# Sample ID resolution
resolver = HandleResolver.resolve("GSM123456")
if resolver:
sample_schema = resolver.sample.search("GSM123456")
print(f"Sample title: {sample_schema.title}")
print(f"Parent study: {sample_schema.parent}")
Database Models
SampleSchema
from celline.DB.model.sra_gsm import SRA_GSM
# GSM sample search
gsm_model = SRA_GSM()
sample = gsm_model.search("GSM123456")
print(f"Title: {sample.title}")
print(f"Organism: {sample.organism}")
print(f"Library strategy: {sample.library_strategy}")
Database Synchronization
from celline.functions.sync_DB import SyncDB
# Database synchronization
sync = SyncDB()
project.call(sync)
📊 Data Handling
Seurat Data Operations
from celline.data import Seurat
# Seurat object loading
seurat = Seurat("/path/to/seurat.rds", via_seurat_disk=True)
# Data retrieval
expression_matrix = seurat.get_expression()
metadata = seurat.get_metadata()
variable_features = seurat.get_variable_features()
Data Format Conversion
from celline.utils.serialization import NamedTupleAndPolarsStructure
import polars as pl
# NamedTuple and DataFrame interconversion
data_converter = NamedTupleAndPolarsStructure[Add.SampleInfo]
# DataFrame to NamedTuple list
df = pl.DataFrame({"id": ["GSM1", "GSM2"], "title": ["S1", "S2"]})
sample_list = data_converter.deserialize(df, Add.SampleInfo)
# NamedTuple list to DataFrame
samples = [Add.SampleInfo("GSM1", "S1"), Add.SampleInfo("GSM2", "S2")]
df = data_converter.serialize(samples)
🔧 Utilities
Path Management
from celline.utils.path import Path
# Project path management
path = Path("GSE123456", "GSM789012")
# Directory preparation
path.prepare()
# Various path retrieval
print(f"Raw data: {path.resources_sample_raw}")
print(f"Counted data: {path.resources_sample_counted}")
print(f"Results: {path.data_sample}")
# Status checking
if path.is_downloaded:
print("Data has been downloaded")
if path.is_counted:
print("Data has been counted")
Configuration Management
from celline.config import Config, Setting
# Project configuration
print(f"Project root: {Config.PROJ_ROOT}")
print(f"Execution root: {Config.EXEC_ROOT}")
# Execution settings
print(f"Project name: {Setting.name}")
print(f"R path: {Setting.r_path}")
print(f"Thread count: {Setting.nthread}")
print(f"Execution system: {Setting.system}")
# Configuration saving
Setting.name = "new-project"
Setting.nthread = 8
Setting.flush() # Save to setting.toml
🎭 Interactive API
Web API Server
from celline.api.main import app
import uvicorn
# FastAPI application startup
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
API Endpoints
import requests
# Get project information
response = requests.get("http://localhost:8000/api/project")
project_info = response.json()
# Add samples
add_request = {
"sample_ids": ["GSM123456", "GSM789012"]
}
response = requests.post("http://localhost:8000/api/samples/add",
json=add_request)
job_info = response.json()
# Check job status
job_id = job_info["job_id"]
response = requests.get(f"http://localhost:8000/api/jobs/{job_id}")
status = response.json()
🧵 Parallel Processing and Thread Management
ThreadObservable
from celline.middleware import ThreadObservable
# Parallel job configuration
ThreadObservable.set_jobs(4)
# Parallel execution of shell commands
shell_files = [
"script1.sh",
"script2.sh",
"script3.sh"
]
ThreadObservable.call_shell(shell_files)
# Wait for completion
ThreadObservable.watch()
Server System Management
from celline.server import ServerSystem
# Multithreading execution
ServerSystem.useMultiThreading()
# PBS cluster execution
ServerSystem.usePBS("my-cluster")
# Check current configuration
print(f"Current system: {ServerSystem.current_system}")
print(f"Cluster server: {ServerSystem.cluster_server_name}")
🔍 Logging and Debugging
Logging
from celline.log.logger import get_logger
# Get logger
logger = get_logger(__name__)
# Log output
logger.info("Processing started")
logger.warning("Low memory warning")
logger.error("Processing failed")
# Contextual logging
logger.info("Processing sample", extra={
"sample_id": "GSM123456",
"step": "preprocessing"
})
📝 Custom Function Creation Example
Complete Custom Function
from celline.functions._base import CellineFunction
from celline.log.logger import get_logger
import argparse
from typing import Optional, List
class AdvancedQualityControl(CellineFunction):
"""Advanced quality control function"""
def __init__(self,
min_genes: int = 200,
max_genes: int = 5000,
max_mito_pct: float = 20.0,
doublet_threshold: float = 0.3):
super().__init__()
self.min_genes = min_genes
self.max_genes = max_genes
self.max_mito_pct = max_mito_pct
self.doublet_threshold = doublet_threshold
self.logger = get_logger(__name__)
def call(self, project):
"""Main processing"""
self.logger.info("Starting advanced quality control")
samples = self.get_samples_from_project(project)
for sample_id in samples:
self.logger.info(f"Processing sample: {sample_id}")
self.process_sample(project, sample_id)
self.logger.info("Advanced quality control completed")
return project
def process_sample(self, project, sample_id):
"""Individual sample processing"""
import scanpy as sc
import pandas as pd
# Data loading
adata = self.load_sample_data(project, sample_id)
# Quality metrics calculation
adata.var['mt'] = adata.var_names.str.startswith('MT-')
sc.pp.calculate_qc_metrics(adata, percent_top=None,
log1p=False, inplace=True)
# Filtering
sc.pp.filter_cells(adata, min_genes=self.min_genes)
sc.pp.filter_genes(adata, min_cells=3)
# Remove outlier cells
adata = adata[adata.obs.n_genes_by_counts < self.max_genes, :]
adata = adata[adata.obs.pct_counts_mt < self.max_mito_pct, :]
# Doublet detection
doublet_scores = self.detect_doublets(adata)
adata.obs['doublet_score'] = doublet_scores
adata = adata[doublet_scores < self.doublet_threshold, :]
# Save results
self.save_sample_data(project, sample_id, adata)
# Generate QC report
self.generate_qc_report(project, sample_id, adata)
def detect_doublets(self, adata):
"""Doublet detection"""
import scrublet as scr
scrub = scr.Scrublet(adata.X)
doublet_scores, predicted_doublets = scrub.scrub_doublets(verbose=False)
return doublet_scores
def add_cli_args(self, parser: argparse.ArgumentParser):
"""Add CLI arguments"""
parser.add_argument('--min-genes', type=int, default=200,
help='Minimum number of genes per cell')
parser.add_argument('--max-genes', type=int, default=5000,
help='Maximum number of genes per cell')
parser.add_argument('--max-mito-pct', type=float, default=20.0,
help='Maximum mitochondrial gene percentage')
parser.add_argument('--doublet-threshold', type=float, default=0.3,
help='Doublet score threshold')
def cli(self, project, args: Optional[argparse.Namespace] = None):
"""CLI entry point"""
if args:
self.min_genes = args.min_genes
self.max_genes = args.max_genes
self.max_mito_pct = args.max_mito_pct
self.doublet_threshold = args.doublet_threshold
return self.call(project)
def get_description(self) -> str:
return """Advanced quality control with customizable thresholds.
Performs comprehensive QC including gene count filtering,
mitochondrial gene percentage filtering, and doublet detection."""
def get_usage_examples(self) -> List[str]:
return [
"celline run advanced_qc",
"celline run advanced_qc --min-genes 300 --max-genes 6000",
"celline run advanced_qc --max-mito-pct 15 --doublet-threshold 0.25"
]
# Usage example
qc = AdvancedQualityControl(min_genes=300, max_genes=6000)
project.call(qc)
Info: For detailed usage examples of the API, see the Functions Reference section for detailed explanations of individual functions.