/Execution Backend Development

Execution Backend Development

Learn how to create custom execution backends to support different computing environments and resource management systems in Celline.

Execution Backend Architecture

Backend Interface

All execution backends must implement the ExecutionBackend interface:

      from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from enum import Enum
import subprocess

class JobStatus(Enum):
    """Job execution status"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class ExecutionBackend(ABC):
    """Base interface for execution backends"""
    
    @abstractmethod
    def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
        """Submit job for execution and return job ID"""
        pass
    
    @abstractmethod
    def get_job_status(self, job_id: str) -> JobStatus:
        """Get current job status"""
        pass
    
    @abstractmethod
    def get_job_output(self, job_id: str) -> Optional[str]:
        """Get job output logs"""
        pass
    
    @abstractmethod
    def cancel_job(self, job_id: str) -> bool:
        """Cancel running job"""
        pass
    
    @abstractmethod
    def is_available(self) -> bool:
        """Check if backend is available"""
        pass
    
    def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
        """Get resource usage statistics (optional override)"""
        return {}
    
    def cleanup_job(self, job_id: str) -> bool:
        """Cleanup job artifacts (optional override)"""
        return True

    

Job Configuration Schema

Standard job configuration structure:

      from dataclasses import dataclass
from typing import Optional, Dict, List

@dataclass
class JobConfig:
    """Job configuration schema"""
    name: str                           # Job name
    command: str                        # Command to execute
    working_directory: str              # Working directory
    environment: Dict[str, str]         # Environment variables
    resources: 'ResourceConfig'         # Resource requirements
    dependencies: List[str]             # Job dependencies
    timeout: Optional[int]              # Timeout in seconds
    retry_count: int = 0               # Number of retries
    priority: int = 0                  # Job priority
    metadata: Dict[str, Any] = None    # Additional metadata

@dataclass
class ResourceConfig:
    """Resource requirements configuration"""
    cpu_cores: int = 1                 # Number of CPU cores
    memory_gb: float = 1.0             # Memory in GB
    gpu_count: int = 0                 # Number of GPUs
    disk_space_gb: float = 1.0         # Disk space in GB
    walltime_minutes: Optional[int] = None  # Maximum runtime
    queue: Optional[str] = None        # Queue name
    node_type: Optional[str] = None    # Node type specification

    

Local Execution Backend

Basic Local Backend

      # celline/middleware/backends/local_backend.py
import subprocess
import threading
import uuid
import time
import os
import signal
from typing import Dict, Any, Optional
from pathlib import Path
from celline.middleware.backends.base import ExecutionBackend, JobStatus, JobConfig
from celline.log.logger import get_logger

class LocalExecutionBackend(ExecutionBackend):
    """
    Local execution backend for running jobs on the local machine
    
    This backend executes jobs as local processes with resource monitoring
    and job management capabilities.
    """
    
    def __init__(self, max_concurrent_jobs: int = 4, 
                 job_output_dir: str = "./logs/jobs"):
        """
        Initialize local execution backend
        
        Args:
            max_concurrent_jobs: Maximum number of concurrent jobs
            job_output_dir: Directory for job output files
        """
        self.max_concurrent_jobs = max_concurrent_jobs
        self.job_output_dir = Path(job_output_dir)
        self.job_output_dir.mkdir(parents=True, exist_ok=True)
        
        # Job tracking
        self.active_jobs: Dict[str, Dict[str, Any]] = {}
        self.job_processes: Dict[str, subprocess.Popen] = {}
        self.job_threads: Dict[str, threading.Thread] = {}
        
        # Resource monitoring
        self.current_cpu_usage = 0
        self.current_memory_usage = 0.0
        
        self.logger = get_logger(__name__)
        
        # Start resource monitor
        self._start_resource_monitor()
    
    def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
        """
        Submit job for local execution
        
        Args:
            command: Command to execute
            job_config: Job configuration
            
        Returns:
            Job ID string
        """
        job_id = str(uuid.uuid4())
        
        # Parse job configuration
        config = self._parse_job_config(job_config)
        
        # Check resource availability
        if not self._check_resource_availability(config.resources):
            raise RuntimeError("Insufficient resources for job execution")
        
        # Check concurrent job limit
        running_jobs = sum(1 for job in self.active_jobs.values() 
                          if job['status'] == JobStatus.RUNNING)
        
        if running_jobs >= self.max_concurrent_jobs:
            # Queue the job
            self.active_jobs[job_id] = {
                'status': JobStatus.PENDING,
                'command': command,
                'config': config,
                'submitted_at': time.time(),
                'started_at': None,
                'finished_at': None,
                'output_file': self.job_output_dir / f"{job_id}.log",
                'error_file': self.job_output_dir / f"{job_id}.err"
            }
            
            self.logger.info(f"Job {job_id} queued (concurrent limit reached)")
        else:
            # Execute immediately
            self._execute_job(job_id, command, config)
        
        return job_id
    
    def _parse_job_config(self, job_config: Dict[str, Any]) -> JobConfig:
        """Parse job configuration dictionary"""
        from celline.middleware.backends.schemas import JobConfig, ResourceConfig
        
        # Extract resource configuration
        resources_dict = job_config.get('resources', {})
        resources = ResourceConfig(
            cpu_cores=resources_dict.get('cpu_cores', 1),
            memory_gb=resources_dict.get('memory_gb', 1.0),
            gpu_count=resources_dict.get('gpu_count', 0),
            disk_space_gb=resources_dict.get('disk_space_gb', 1.0),
            walltime_minutes=resources_dict.get('walltime_minutes'),
            queue=resources_dict.get('queue'),
            node_type=resources_dict.get('node_type')
        )
        
        # Create job configuration
        config = JobConfig(
            name=job_config.get('name', 'celline_job'),
            command=job_config.get('command', ''),
            working_directory=job_config.get('working_directory', '.'),
            environment=job_config.get('environment', {}),
            resources=resources,
            dependencies=job_config.get('dependencies', []),
            timeout=job_config.get('timeout'),
            retry_count=job_config.get('retry_count', 0),
            priority=job_config.get('priority', 0),
            metadata=job_config.get('metadata', {})
        )
        
        return config
    
    def _check_resource_availability(self, resources: 'ResourceConfig') -> bool:
        """Check if required resources are available"""
        import psutil
        
        # Check CPU availability
        available_cpu = psutil.cpu_count()
        if resources.cpu_cores > available_cpu:
            self.logger.warning(f"Requested {resources.cpu_cores} CPUs, only {available_cpu} available")
            return False
        
        # Check memory availability
        available_memory = psutil.virtual_memory().available / (1024**3)  # GB
        if resources.memory_gb > available_memory:
            self.logger.warning(f"Requested {resources.memory_gb}GB memory, only {available_memory:.1f}GB available")
            return False
        
        # Check disk space
        available_disk = psutil.disk_usage('.').free / (1024**3)  # GB
        if resources.disk_space_gb > available_disk:
            self.logger.warning(f"Requested {resources.disk_space_gb}GB disk, only {available_disk:.1f}GB available")
            return False
        
        return True
    
    def _execute_job(self, job_id: str, command: str, config: JobConfig):
        """Execute job in background thread"""
        # Initialize job record
        self.active_jobs[job_id] = {
            'status': JobStatus.RUNNING,
            'command': command,
            'config': config,
            'submitted_at': time.time(),
            'started_at': time.time(),
            'finished_at': None,
            'output_file': self.job_output_dir / f"{job_id}.log",
            'error_file': self.job_output_dir / f"{job_id}.err",
            'pid': None,
            'exit_code': None
        }
        
        # Start execution thread
        thread = threading.Thread(
            target=self._run_job_process,
            args=(job_id, command, config),
            daemon=True
        )
        self.job_threads[job_id] = thread
        thread.start()
        
        self.logger.info(f"Job {job_id} started execution")
    
    def _run_job_process(self, job_id: str, command: str, config: JobConfig):
        """Run job process with monitoring"""
        job_info = self.active_jobs[job_id]
        
        try:
            # Prepare environment
            env = os.environ.copy()
            env.update(config.environment)
            
            # Open output files
            with open(job_info['output_file'], 'w') as stdout_file, \
                 open(job_info['error_file'], 'w') as stderr_file:
                
                # Start process
                process = subprocess.Popen(
                    command,
                    shell=True,
                    cwd=config.working_directory,
                    env=env,
                    stdout=stdout_file,
                    stderr=stderr_file,
                    preexec_fn=os.setsid  # Create new process group
                )
                
                # Store process reference
                self.job_processes[job_id] = process
                job_info['pid'] = process.pid
                
                # Wait for completion with timeout
                try:
                    if config.timeout:
                        exit_code = process.wait(timeout=config.timeout)
                    else:
                        exit_code = process.wait()
                    
                    job_info['exit_code'] = exit_code
                    
                    if exit_code == 0:
                        job_info['status'] = JobStatus.COMPLETED
                        self.logger.info(f"Job {job_id} completed successfully")
                    else:
                        job_info['status'] = JobStatus.FAILED
                        self.logger.error(f"Job {job_id} failed with exit code {exit_code}")
                
                except subprocess.TimeoutExpired:
                    # Kill process and its children
                    try:
                        os.killpg(os.getpgid(process.pid), signal.SIGTERM)
                        process.wait(timeout=5)
                    except:
                        os.killpg(os.getpgid(process.pid), signal.SIGKILL)
                    
                    job_info['status'] = JobStatus.FAILED
                    job_info['exit_code'] = -1
                    self.logger.error(f"Job {job_id} terminated due to timeout")
                
        except Exception as e:
            job_info['status'] = JobStatus.FAILED
            job_info['exit_code'] = -1
            self.logger.exception(f"Job {job_id} failed with exception: {e}")
        
        finally:
            # Cleanup
            job_info['finished_at'] = time.time()
            if job_id in self.job_processes:
                del self.job_processes[job_id]
            
            # Try to start queued jobs
            self._start_queued_jobs()
    
    def _start_queued_jobs(self):
        """Start pending jobs if resources are available"""
        running_jobs = sum(1 for job in self.active_jobs.values() 
                          if job['status'] == JobStatus.RUNNING)
        
        if running_jobs >= self.max_concurrent_jobs:
            return
        
        # Find pending jobs sorted by priority and submission time
        pending_jobs = [
            (job_id, job_info) for job_id, job_info in self.active_jobs.items()
            if job_info['status'] == JobStatus.PENDING
        ]
        
        pending_jobs.sort(key=lambda x: (-x[1]['config'].priority, x[1]['submitted_at']))
        
        # Start jobs up to the limit
        jobs_to_start = min(len(pending_jobs), 
                           self.max_concurrent_jobs - running_jobs)
        
        for i in range(jobs_to_start):
            job_id, job_info = pending_jobs[i]
            if self._check_resource_availability(job_info['config'].resources):
                self._execute_job(job_id, job_info['command'], job_info['config'])
    
    def get_job_status(self, job_id: str) -> JobStatus:
        """Get current job status"""
        if job_id not in self.active_jobs:
            raise ValueError(f"Job {job_id} not found")
        
        return self.active_jobs[job_id]['status']
    
    def get_job_output(self, job_id: str) -> Optional[str]:
        """Get job output logs"""
        if job_id not in self.active_jobs:
            return None
        
        job_info = self.active_jobs[job_id]
        
        try:
            # Combine stdout and stderr
            output_lines = []
            
            if job_info['output_file'].exists():
                with open(job_info['output_file'], 'r') as f:
                    output_lines.extend(f.readlines())
            
            if job_info['error_file'].exists():
                with open(job_info['error_file'], 'r') as f:
                    error_lines = f.readlines()
                    if error_lines:
                        output_lines.append("\n--- STDERR ---\n")
                        output_lines.extend(error_lines)
            
            return ''.join(output_lines)
            
        except Exception as e:
            self.logger.error(f"Error reading output for job {job_id}: {e}")
            return None
    
    def cancel_job(self, job_id: str) -> bool:
        """Cancel running job"""
        if job_id not in self.active_jobs:
            return False
        
        job_info = self.active_jobs[job_id]
        
        if job_info['status'] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
            return True
        
        if job_info['status'] == JobStatus.PENDING:
            job_info['status'] = JobStatus.CANCELLED
            job_info['finished_at'] = time.time()
            return True
        
        # Cancel running job
        if job_id in self.job_processes:
            process = self.job_processes[job_id]
            try:
                # Try graceful termination first
                os.killpg(os.getpgid(process.pid), signal.SIGTERM)
                
                # Wait briefly for graceful shutdown
                try:
                    process.wait(timeout=5)
                except subprocess.TimeoutExpired:
                    # Force kill if necessary
                    os.killpg(os.getpgid(process.pid), signal.SIGKILL)
                
                job_info['status'] = JobStatus.CANCELLED
                job_info['finished_at'] = time.time()
                
                self.logger.info(f"Job {job_id} cancelled successfully")
                return True
                
            except Exception as e:
                self.logger.error(f"Error cancelling job {job_id}: {e}")
                return False
        
        return False
    
    def is_available(self) -> bool:
        """Check if local backend is available"""
        return True  # Local backend is always available
    
    def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
        """Get resource usage for job"""
        if job_id not in self.active_jobs:
            return {}
        
        job_info = self.active_jobs[job_id]
        
        if 'pid' not in job_info or job_info['pid'] is None:
            return {}
        
        try:
            import psutil
            process = psutil.Process(job_info['pid'])
            
            return {
                'cpu_percent': process.cpu_percent(),
                'memory_mb': process.memory_info().rss / (1024**2),
                'memory_percent': process.memory_percent(),
                'num_threads': process.num_threads(),
                'status': process.status(),
                'create_time': process.create_time()
            }
            
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            return {}
    
    def cleanup_job(self, job_id: str) -> bool:
        """Cleanup job artifacts"""
        if job_id not in self.active_jobs:
            return False
        
        job_info = self.active_jobs[job_id]
        
        try:
            # Remove output files
            if job_info['output_file'].exists():
                job_info['output_file'].unlink()
            
            if job_info['error_file'].exists():
                job_info['error_file'].unlink()
            
            # Remove job from tracking
            del self.active_jobs[job_id]
            
            # Cleanup thread reference
            if job_id in self.job_threads:
                del self.job_threads[job_id]
            
            return True
            
        except Exception as e:
            self.logger.error(f"Error cleaning up job {job_id}: {e}")
            return False
    
    def _start_resource_monitor(self):
        """Start resource monitoring thread"""
        def monitor():
            import psutil
            while True:
                try:
                    self.current_cpu_usage = psutil.cpu_percent(interval=1)
                    self.current_memory_usage = psutil.virtual_memory().percent
                    time.sleep(10)  # Update every 10 seconds
                except Exception as e:
                    self.logger.error(f"Resource monitoring error: {e}")
                    time.sleep(30)
        
        monitor_thread = threading.Thread(target=monitor, daemon=True)
        monitor_thread.start()
    
    def get_system_status(self) -> Dict[str, Any]:
        """Get overall system status"""
        import psutil
        
        running_jobs = sum(1 for job in self.active_jobs.values() 
                          if job['status'] == JobStatus.RUNNING)
        pending_jobs = sum(1 for job in self.active_jobs.values() 
                          if job['status'] == JobStatus.PENDING)
        
        return {
            'backend_type': 'local',
            'running_jobs': running_jobs,
            'pending_jobs': pending_jobs,
            'max_concurrent_jobs': self.max_concurrent_jobs,
            'cpu_usage_percent': self.current_cpu_usage,
            'memory_usage_percent': self.current_memory_usage,
            'available_cpu_cores': psutil.cpu_count(),
            'available_memory_gb': psutil.virtual_memory().available / (1024**3),
            'available_disk_gb': psutil.disk_usage('.').free / (1024**3)
        }

    

PBS/Torque Backend

Professional HPC Backend

      # celline/middleware/backends/pbs_backend.py
import subprocess
import re
import time
from typing import Dict, Any, Optional
from celline.middleware.backends.base import ExecutionBackend, JobStatus
from celline.log.logger import get_logger

class PBSExecutionBackend(ExecutionBackend):
    """
    PBS/Torque execution backend for HPC clusters
    
    This backend submits jobs to PBS/Torque queue management system
    and provides comprehensive job monitoring and resource management.
    """
    
    def __init__(self, default_queue: str = "default", 
                 script_template_path: Optional[str] = None):
        """
        Initialize PBS execution backend
        
        Args:
            default_queue: Default queue for job submission
            script_template_path: Path to custom PBS script template
        """
        self.default_queue = default_queue
        self.script_template_path = script_template_path
        self.logger = get_logger(__name__)
        
        # Job tracking
        self.submitted_jobs: Dict[str, Dict[str, Any]] = {}
        
        # Validate PBS availability
        if not self.is_available():
            raise RuntimeError("PBS/Torque system not available")
    
    def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
        """
        Submit job to PBS queue
        
        Args:
            command: Command to execute
            job_config: Job configuration
            
        Returns:
            PBS job ID
        """
        # Generate PBS script
        script_content = self._generate_pbs_script(command, job_config)
        
        # Write script to temporary file
        import tempfile
        with tempfile.NamedTemporaryFile(mode='w', suffix='.pbs', delete=False) as f:
            f.write(script_content)
            script_path = f.name
        
        try:
            # Submit job
            result = subprocess.run(
                ['qsub', script_path],
                capture_output=True,
                text=True,
                check=True
            )
            
            # Extract job ID from qsub output
            job_id = result.stdout.strip()
            
            # Store job information
            self.submitted_jobs[job_id] = {
                'command': command,
                'config': job_config,
                'script_path': script_path,
                'submitted_at': time.time(),
                'status': JobStatus.PENDING
            }
            
            self.logger.info(f"Job {job_id} submitted to PBS queue")
            return job_id
            
        except subprocess.CalledProcessError as e:
            self.logger.error(f"Failed to submit PBS job: {e.stderr}")
            raise RuntimeError(f"PBS job submission failed: {e.stderr}")
        
        finally:
            # Cleanup script file
            import os
            try:
                os.unlink(script_path)
            except:
                pass
    
    def _generate_pbs_script(self, command: str, job_config: Dict[str, Any]) -> str:
        """Generate PBS script content"""
        config = self._parse_job_config(job_config)
        
        # PBS script template
        script_lines = [
            "#!/bin/bash",
            "",
            f"#PBS -N {config.name}",
            f"#PBS -q {config.resources.queue or self.default_queue}",
            f"#PBS -l nodes=1:ppn={config.resources.cpu_cores}",
            f"#PBS -l mem={int(config.resources.memory_gb)}gb",
        ]
        
        # Add walltime if specified
        if config.resources.walltime_minutes:
            hours = config.resources.walltime_minutes // 60
            minutes = config.resources.walltime_minutes % 60
            script_lines.append(f"#PBS -l walltime={hours:02d}:{minutes:02d}:00")
        
        # Add GPU request if needed
        if config.resources.gpu_count > 0:
            script_lines.append(f"#PBS -l gpus={config.resources.gpu_count}")
        
        # Add output/error files
        script_lines.extend([
            "#PBS -o ${PBS_JOBNAME}.out",
            "#PBS -e ${PBS_JOBNAME}.err",
            "#PBS -V",  # Export environment variables
            ""
        ])
        
        # Add working directory change
        script_lines.extend([
            f"cd {config.working_directory}",
            ""
        ])
        
        # Add environment variables
        for key, value in config.environment.items():
            script_lines.append(f"export {key}='{value}'")
        
        if config.environment:
            script_lines.append("")
        
        # Add module loads if specified
        modules = job_config.get('modules', [])
        for module in modules:
            script_lines.append(f"module load {module}")
        
        if modules:
            script_lines.append("")
        
        # Add the actual command
        script_lines.extend([
            "# Execute command",
            command,
            "",
            "# Check exit status",
            "exit_code=$?",
            "echo \"Job completed with exit code: $exit_code\"",
            "exit $exit_code"
        ])
        
        return "\n".join(script_lines)
    
    def get_job_status(self, job_id: str) -> JobStatus:
        """Get PBS job status"""
        try:
            result = subprocess.run(
                ['qstat', '-f', job_id],
                capture_output=True,
                text=True,
                check=True
            )
            
            # Parse qstat output
            status_match = re.search(r'job_state = (\w)', result.stdout)
            if status_match:
                pbs_status = status_match.group(1)
                return self._convert_pbs_status(pbs_status)
            
            return JobStatus.PENDING
            
        except subprocess.CalledProcessError:
            # Job might be completed and removed from queue
            # Check if we have it in our tracking
            if job_id in self.submitted_jobs:
                # Try to get completion status from output files
                return self._check_completion_status(job_id)
            
            return JobStatus.FAILED
    
    def _convert_pbs_status(self, pbs_status: str) -> JobStatus:
        """Convert PBS status to JobStatus"""
        status_map = {
            'Q': JobStatus.PENDING,    # Queued
            'R': JobStatus.RUNNING,    # Running
            'C': JobStatus.COMPLETED,  # Completed
            'E': JobStatus.COMPLETED,  # Exiting
            'H': JobStatus.PENDING,    # Held
            'S': JobStatus.PENDING,    # Suspended
            'T': JobStatus.RUNNING,    # Transitioning
            'W': JobStatus.PENDING,    # Waiting
            'X': JobStatus.FAILED      # Finished (failed)
        }
        
        return status_map.get(pbs_status, JobStatus.FAILED)
    
    def _check_completion_status(self, job_id: str) -> JobStatus:
        """Check job completion status from output files"""
        if job_id not in self.submitted_jobs:
            return JobStatus.FAILED
        
        job_info = self.submitted_jobs[job_id]
        job_name = job_info['config'].get('name', 'celline_job')
        
        # Check error file for PBS errors
        error_file = f"{job_name}.err"
        if os.path.exists(error_file):
            try:
                with open(error_file, 'r') as f:
                    error_content = f.read()
                    if error_content.strip():
                        return JobStatus.FAILED
            except:
                pass
        
        # Check output file for completion
        output_file = f"{job_name}.out"
        if os.path.exists(output_file):
            try:
                with open(output_file, 'r') as f:
                    output_content = f.read()
                    # Look for our exit code marker
                    if "Job completed with exit code: 0" in output_content:
                        return JobStatus.COMPLETED
                    elif "Job completed with exit code:" in output_content:
                        return JobStatus.FAILED
            except:
                pass
        
        # Default to completed if no error indicators
        return JobStatus.COMPLETED
    
    def get_job_output(self, job_id: str) -> Optional[str]:
        """Get PBS job output"""
        if job_id not in self.submitted_jobs:
            return None
        
        job_info = self.submitted_jobs[job_id]
        job_name = job_info['config'].get('name', 'celline_job')
        
        output_lines = []
        
        # Read output file
        output_file = f"{job_name}.out"
        if os.path.exists(output_file):
            try:
                with open(output_file, 'r') as f:
                    output_lines.extend(f.readlines())
            except Exception as e:
                output_lines.append(f"Error reading output file: {e}\n")
        
        # Read error file
        error_file = f"{job_name}.err"
        if os.path.exists(error_file):
            try:
                with open(error_file, 'r') as f:
                    error_lines = f.readlines()
                    if error_lines:
                        output_lines.append("\n--- STDERR ---\n")
                        output_lines.extend(error_lines)
            except Exception as e:
                output_lines.append(f"Error reading error file: {e}\n")
        
        return ''.join(output_lines) if output_lines else None
    
    def cancel_job(self, job_id: str) -> bool:
        """Cancel PBS job"""
        try:
            subprocess.run(
                ['qdel', job_id],
                capture_output=True,
                text=True,
                check=True
            )
            
            # Update job status
            if job_id in self.submitted_jobs:
                self.submitted_jobs[job_id]['status'] = JobStatus.CANCELLED
            
            self.logger.info(f"PBS job {job_id} cancelled")
            return True
            
        except subprocess.CalledProcessError as e:
            self.logger.error(f"Failed to cancel PBS job {job_id}: {e.stderr}")
            return False
    
    def is_available(self) -> bool:
        """Check if PBS system is available"""
        try:
            subprocess.run(['qstat', '--version'], 
                         capture_output=True, check=True)
            return True
        except (subprocess.CalledProcessError, FileNotFoundError):
            return False
    
    def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
        """Get PBS job resource usage"""
        try:
            result = subprocess.run(
                ['qstat', '-f', job_id],
                capture_output=True,
                text=True,
                check=True
            )
            
            # Parse resource usage from qstat output
            usage_info = {}
            
            # Extract resource information
            for line in result.stdout.split('\n'):
                line = line.strip()
                
                if 'resources_used.cput' in line:
                    cput_match = re.search(r'resources_used\.cput = (.+)', line)
                    if cput_match:
                        usage_info['cpu_time'] = cput_match.group(1)
                
                elif 'resources_used.mem' in line:
                    mem_match = re.search(r'resources_used\.mem = (.+)', line)
                    if mem_match:
                        usage_info['memory_used'] = mem_match.group(1)
                
                elif 'resources_used.walltime' in line:
                    wall_match = re.search(r'resources_used\.walltime = (.+)', line)
                    if wall_match:
                        usage_info['walltime_used'] = wall_match.group(1)
            
            return usage_info
            
        except subprocess.CalledProcessError:
            return {}
    
    def get_queue_status(self) -> Dict[str, Any]:
        """Get PBS queue status"""
        try:
            result = subprocess.run(
                ['qstat', '-Q'],
                capture_output=True,
                text=True,
                check=True
            )
            
            queue_info = {}
            lines = result.stdout.strip().split('\n')[2:]  # Skip header
            
            for line in lines:
                parts = line.split()
                if len(parts) >= 6:
                    queue_name = parts[0]
                    max_jobs = parts[1]
                    queued = parts[2]
                    running = parts[3]
                    
                    queue_info[queue_name] = {
                        'max_jobs': max_jobs,
                        'queued_jobs': queued,
                        'running_jobs': running,
                        'status': parts[5]
                    }
            
            return queue_info
            
        except subprocess.CalledProcessError:
            return {}

    

SLURM Backend

Modern HPC Backend

      # celline/middleware/backends/slurm_backend.py
import subprocess
import json
import time
from typing import Dict, Any, Optional
from celline.middleware.backends.base import ExecutionBackend, JobStatus
from celline.log.logger import get_logger

class SLURMExecutionBackend(ExecutionBackend):
    """
    SLURM execution backend for modern HPC clusters
    
    This backend provides comprehensive integration with SLURM workload manager
    including advanced resource management and job array support.
    """
    
    def __init__(self, default_partition: str = "compute", 
                 account: Optional[str] = None):
        """
        Initialize SLURM execution backend
        
        Args:
            default_partition: Default partition for job submission
            account: SLURM account for billing
        """
        self.default_partition = default_partition
        self.account = account
        self.logger = get_logger(__name__)
        
        # Job tracking
        self.submitted_jobs: Dict[str, Dict[str, Any]] = {}
        
        # Validate SLURM availability
        if not self.is_available():
            raise RuntimeError("SLURM system not available")
    
    def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
        """
        Submit job to SLURM
        
        Args:
            command: Command to execute
            job_config: Job configuration
            
        Returns:
            SLURM job ID
        """
        # Build sbatch command
        sbatch_args = self._build_sbatch_args(job_config)
        
        # Create batch script content
        script_content = self._generate_slurm_script(command, job_config)
        
        try:
            # Submit job using sbatch
            process = subprocess.Popen(
                ['sbatch'] + sbatch_args,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            stdout, stderr = process.communicate(input=script_content)
            
            if process.returncode != 0:
                raise RuntimeError(f"sbatch failed: {stderr}")
            
            # Extract job ID from sbatch output
            # Format: "Submitted batch job 12345"
            import re
            job_match = re.search(r'Submitted batch job (\d+)', stdout)
            if not job_match:
                raise RuntimeError(f"Could not parse job ID from: {stdout}")
            
            job_id = job_match.group(1)
            
            # Store job information
            self.submitted_jobs[job_id] = {
                'command': command,
                'config': job_config,
                'submitted_at': time.time(),
                'status': JobStatus.PENDING
            }
            
            self.logger.info(f"Job {job_id} submitted to SLURM")
            return job_id
            
        except Exception as e:
            self.logger.error(f"Failed to submit SLURM job: {e}")
            raise RuntimeError(f"SLURM job submission failed: {e}")
    
    def _build_sbatch_args(self, job_config: Dict[str, Any]) -> list:
        """Build sbatch command arguments"""
        config = self._parse_job_config(job_config)
        args = []
        
        # Job name
        args.extend(['--job-name', config.name])
        
        # Partition
        partition = config.resources.queue or self.default_partition
        args.extend(['--partition', partition])
        
        # Account
        if self.account:
            args.extend(['--account', self.account])
        
        # Resource requirements
        args.extend(['--nodes', '1'])
        args.extend(['--ntasks-per-node', '1'])
        args.extend(['--cpus-per-task', str(config.resources.cpu_cores)])
        args.extend(['--mem', f'{int(config.resources.memory_gb * 1024)}M'])
        
        # Walltime
        if config.resources.walltime_minutes:
            args.extend(['--time', f'{config.resources.walltime_minutes}'])
        
        # GPU requirements
        if config.resources.gpu_count > 0:
            args.extend(['--gres', f'gpu:{config.resources.gpu_count}'])
        
        # Output files
        args.extend(['--output', f'{config.name}_%j.out'])
        args.extend(['--error', f'{config.name}_%j.err'])
        
        # Additional SLURM options
        slurm_options = job_config.get('slurm_options', {})
        for key, value in slurm_options.items():
            if value is True:
                args.append(f'--{key}')
            elif value is not False:
                args.extend([f'--{key}', str(value)])
        
        return args
    
    def _generate_slurm_script(self, command: str, job_config: Dict[str, Any]) -> str:
        """Generate SLURM batch script content"""
        config = self._parse_job_config(job_config)
        
        script_lines = [
            "#!/bin/bash",
            "",
            "# SLURM job script generated by Celline",
            ""
        ]
        
        # Add working directory change
        script_lines.extend([
            f"cd {config.working_directory}",
            ""
        ])
        
        # Add environment variables
        for key, value in config.environment.items():
            script_lines.append(f"export {key}='{value}'")
        
        if config.environment:
            script_lines.append("")
        
        # Add module loads
        modules = job_config.get('modules', [])
        for module in modules:
            script_lines.append(f"module load {module}")
        
        if modules:
            script_lines.append("")
        
        # Add conda environment activation if specified
        conda_env = job_config.get('conda_env')
        if conda_env:
            script_lines.extend([
                f"source activate {conda_env}",
                ""
            ])
        
        # Add the actual command
        script_lines.extend([
            "# Execute command",
            f"echo \"Starting job at $(date)\"",
            f"echo \"Running on node: $SLURMD_NODENAME\"",
            f"echo \"Working directory: $(pwd)\"",
            "",
            command,
            "",
            "# Check exit status",
            "exit_code=$?",
            "echo \"Job completed at $(date) with exit code: $exit_code\"",
            "exit $exit_code"
        ])
        
        return "\n".join(script_lines)
    
    def get_job_status(self, job_id: str) -> JobStatus:
        """Get SLURM job status using scontrol"""
        try:
            result = subprocess.run(
                ['scontrol', 'show', 'job', job_id],
                capture_output=True,
                text=True,
                check=True
            )
            
            # Parse scontrol output for JobState
            import re
            state_match = re.search(r'JobState=(\w+)', result.stdout)
            if state_match:
                slurm_state = state_match.group(1)
                return self._convert_slurm_status(slurm_state)
            
            return JobStatus.PENDING
            
        except subprocess.CalledProcessError:
            # Job might be completed and removed from scontrol
            # Try sacct for historical data
            return self._get_historical_status(job_id)
    
    def _convert_slurm_status(self, slurm_state: str) -> JobStatus:
        """Convert SLURM status to JobStatus"""
        status_map = {
            'PENDING': JobStatus.PENDING,
            'CONFIGURING': JobStatus.PENDING,
            'RUNNING': JobStatus.RUNNING,
            'SUSPENDED': JobStatus.RUNNING,
            'COMPLETING': JobStatus.RUNNING,
            'COMPLETED': JobStatus.COMPLETED,
            'CANCELLED': JobStatus.CANCELLED,
            'FAILED': JobStatus.FAILED,
            'TIMEOUT': JobStatus.FAILED,
            'PREEMPTED': JobStatus.FAILED,
            'NODE_FAIL': JobStatus.FAILED,
            'BOOT_FAIL': JobStatus.FAILED,
            'DEADLINE': JobStatus.FAILED,
            'OUT_OF_MEMORY': JobStatus.FAILED
        }
        
        return status_map.get(slurm_state, JobStatus.FAILED)
    
    def _get_historical_status(self, job_id: str) -> JobStatus:
        """Get job status from SLURM accounting using sacct"""
        try:
            result = subprocess.run(
                ['sacct', '-j', job_id, '--format=State', '--noheader', '--parsable2'],
                capture_output=True,
                text=True,
                check=True
            )
            
            if result.stdout.strip():
                slurm_state = result.stdout.strip().split('\n')[0]
                return self._convert_slurm_status(slurm_state)
            
            return JobStatus.FAILED
            
        except subprocess.CalledProcessError:
            return JobStatus.FAILED
    
    def get_job_output(self, job_id: str) -> Optional[str]:
        """Get SLURM job output"""
        if job_id not in self.submitted_jobs:
            return None
        
        job_info = self.submitted_jobs[job_id]
        job_name = job_info['config'].get('name', 'celline_job')
        
        output_lines = []
        
        # SLURM output files include job ID
        output_file = f"{job_name}_{job_id}.out"
        error_file = f"{job_name}_{job_id}.err"
        
        # Read output file
        import os
        if os.path.exists(output_file):
            try:
                with open(output_file, 'r') as f:
                    output_lines.extend(f.readlines())
            except Exception as e:
                output_lines.append(f"Error reading output file: {e}\n")
        
        # Read error file
        if os.path.exists(error_file):
            try:
                with open(error_file, 'r') as f:
                    error_lines = f.readlines()
                    if error_lines:
                        output_lines.append("\n--- STDERR ---\n")
                        output_lines.extend(error_lines)
            except Exception as e:
                output_lines.append(f"Error reading error file: {e}\n")
        
        return ''.join(output_lines) if output_lines else None
    
    def cancel_job(self, job_id: str) -> bool:
        """Cancel SLURM job"""
        try:
            subprocess.run(
                ['scancel', job_id],
                capture_output=True,
                text=True,
                check=True
            )
            
            # Update job status
            if job_id in self.submitted_jobs:
                self.submitted_jobs[job_id]['status'] = JobStatus.CANCELLED
            
            self.logger.info(f"SLURM job {job_id} cancelled")
            return True
            
        except subprocess.CalledProcessError as e:
            self.logger.error(f"Failed to cancel SLURM job {job_id}: {e.stderr}")
            return False
    
    def is_available(self) -> bool:
        """Check if SLURM system is available"""
        try:
            subprocess.run(['sinfo', '--version'], 
                         capture_output=True, check=True)
            return True
        except (subprocess.CalledProcessError, FileNotFoundError):
            return False
    
    def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
        """Get SLURM job resource usage using sacct"""
        try:
            result = subprocess.run([
                'sacct', '-j', job_id, 
                '--format=JobID,CPUTime,MaxRSS,MaxVMSize,Elapsed,State,ExitCode',
                '--units=M', '--parsable2', '--noheader'
            ], capture_output=True, text=True, check=True)
            
            if not result.stdout.strip():
                return {}
            
            # Parse sacct output
            lines = result.stdout.strip().split('\n')
            main_job_line = lines[0]  # First line is main job
            
            fields = main_job_line.split('|')
            if len(fields) >= 7:
                return {
                    'job_id': fields[0],
                    'cpu_time': fields[1],
                    'max_rss_mb': fields[2],
                    'max_vmsize_mb': fields[3],
                    'elapsed_time': fields[4],
                    'state': fields[5],
                    'exit_code': fields[6]
                }
            
            return {}
            
        except subprocess.CalledProcessError:
            return {}
    
    def get_partition_info(self) -> Dict[str, Any]:
        """Get SLURM partition information"""
        try:
            result = subprocess.run([
                'sinfo', '--format=%P,%A,%l,%N', '--noheader'
            ], capture_output=True, text=True, check=True)
            
            partition_info = {}
            
            for line in result.stdout.strip().split('\n'):
                parts = line.split(',')
                if len(parts) >= 4:
                    partition = parts[0].rstrip('*')  # Remove default indicator
                    avail_status = parts[1]
                    time_limit = parts[2]
                    nodes = parts[3]
                    
                    partition_info[partition] = {
                        'availability': avail_status,
                        'time_limit': time_limit,
                        'nodes': nodes
                    }
            
            return partition_info
            
        except subprocess.CalledProcessError:
            return {}
    
    def submit_job_array(self, commands: list, job_config: Dict[str, Any]) -> str:
        """Submit SLURM job array"""
        array_size = len(commands)
        if array_size == 0:
            raise ValueError("No commands provided for job array")
        
        # Modify job config for array
        array_config = job_config.copy()
        array_config['slurm_options'] = array_config.get('slurm_options', {})
        array_config['slurm_options']['array'] = f'0-{array_size-1}'
        
        # Create command array script
        script_content = self._generate_array_script(commands, job_config)
        
        # Submit array job
        job_id = self.submit_job("", array_config)  # Empty command, using script
        
        return job_id
    
    def _generate_array_script(self, commands: list, job_config: Dict[str, Any]) -> str:
        """Generate SLURM job array script"""
        config = self._parse_job_config(job_config)
        
        script_lines = [
            "#!/bin/bash",
            "",
            "# SLURM job array script generated by Celline",
            ""
        ]
        
        # Add working directory and environment setup
        script_lines.extend([
            f"cd {config.working_directory}",
            ""
        ])
        
        # Add environment variables
        for key, value in config.environment.items():
            script_lines.append(f"export {key}='{value}'")
        
        if config.environment:
            script_lines.append("")
        
        # Create command array
        script_lines.append("# Command array")
        script_lines.append("commands=(")
        for cmd in commands:
            # Escape commands properly
            escaped_cmd = cmd.replace('"', '\\"')
            script_lines.append(f'    "{escaped_cmd}"')
        script_lines.append(")")
        script_lines.append("")
        
        # Execute command based on array index
        script_lines.extend([
            "# Execute command for this array task",
            "echo \"Array task $SLURM_ARRAY_TASK_ID starting at $(date)\"",
            'eval "${commands[$SLURM_ARRAY_TASK_ID]}"',
            "exit_code=$?",
            "echo \"Array task $SLURM_ARRAY_TASK_ID completed at $(date) with exit code: $exit_code\"",
            "exit $exit_code"
        ])
        
        return "\n".join(script_lines)

    

Cloud Backend (AWS Batch)

Cloud-Native Backend

      # celline/middleware/backends/aws_batch_backend.py
import boto3
import time
import json
from typing import Dict, Any, Optional
from celline.middleware.backends.base import ExecutionBackend, JobStatus
from celline.log.logger import get_logger

class AWSBatchExecutionBackend(ExecutionBackend):
    """
    AWS Batch execution backend for cloud computing
    
    This backend provides integration with AWS Batch for scalable
    containerized job execution in the cloud.
    """
    
    def __init__(self, job_queue: str, job_definition: str,
                 region: str = 'us-east-1'):
        """
        Initialize AWS Batch backend
        
        Args:
            job_queue: AWS Batch job queue name
            job_definition: AWS Batch job definition name
            region: AWS region
        """
        self.job_queue = job_queue
        self.job_definition = job_definition
        self.region = region
        
        # Initialize AWS Batch client
        self.batch_client = boto3.client('batch', region_name=region)
        self.logs_client = boto3.client('logs', region_name=region)
        
        self.logger = get_logger(__name__)
        
        # Job tracking
        self.submitted_jobs: Dict[str, Dict[str, Any]] = {}
        
        # Validate AWS Batch availability
        if not self.is_available():
            raise RuntimeError("AWS Batch not available")
    
    def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
        """
        Submit job to AWS Batch
        
        Args:
            command: Command to execute
            job_config: Job configuration
            
        Returns:
            AWS Batch job ID
        """
        config = self._parse_job_config(job_config)
        
        # Build job parameters
        job_params = {
            'jobName': config.name,
            'jobQueue': self.job_queue,
            'jobDefinition': self.job_definition,
            'parameters': {
                'command': command,
                'workingDirectory': config.working_directory
            }
        }
        
        # Add environment variables
        if config.environment:
            env_list = [
                {'name': key, 'value': value}
                for key, value in config.environment.items()
            ]
            job_params['containerOverrides'] = {
                'environment': env_list
            }
        
        # Add resource requirements
        if hasattr(config.resources, 'cpu_cores') or hasattr(config.resources, 'memory_gb'):
            container_overrides = job_params.get('containerOverrides', {})
            
            if config.resources.cpu_cores > 1:
                container_overrides['vcpus'] = config.resources.cpu_cores
            
            if config.resources.memory_gb > 1:
                container_overrides['memory'] = int(config.resources.memory_gb * 1024)
            
            job_params['containerOverrides'] = container_overrides
        
        # Add job dependencies
        if config.dependencies:
            job_params['dependsOn'] = [
                {'jobId': dep_id} for dep_id in config.dependencies
            ]
        
        # Add timeout
        if config.timeout:
            job_params['timeout'] = {'attemptDurationSeconds': config.timeout}
        
        # Add retry configuration
        if config.retry_count > 0:
            job_params['retryStrategy'] = {'attempts': config.retry_count + 1}
        
        try:
            # Submit job
            response = self.batch_client.submit_job(**job_params)
            job_id = response['jobId']
            
            # Store job information
            self.submitted_jobs[job_id] = {
                'command': command,
                'config': job_config,
                'submitted_at': time.time(),
                'status': JobStatus.PENDING,
                'log_group': None,
                'log_stream': None
            }
            
            self.logger.info(f"Job {job_id} submitted to AWS Batch")
            return job_id
            
        except Exception as e:
            self.logger.error(f"Failed to submit AWS Batch job: {e}")
            raise RuntimeError(f"AWS Batch job submission failed: {e}")
    
    def get_job_status(self, job_id: str) -> JobStatus:
        """Get AWS Batch job status"""
        try:
            response = self.batch_client.describe_jobs(jobs=[job_id])
            
            if not response['jobs']:
                return JobStatus.FAILED
            
            job = response['jobs'][0]
            batch_status = job['status']
            
            # Update log information if available
            if job_id in self.submitted_jobs:
                job_info = self.submitted_jobs[job_id]
                if 'logStreamName' in job.get('attempts', [{}])[0].get('taskProperties', {}):
                    task_props = job['attempts'][0]['taskProperties']
                    job_info['log_group'] = '/aws/batch/job'
                    job_info['log_stream'] = task_props['logStreamName']
            
            return self._convert_batch_status(batch_status)
            
        except Exception as e:
            self.logger.error(f"Error getting job status for {job_id}: {e}")
            return JobStatus.FAILED
    
    def _convert_batch_status(self, batch_status: str) -> JobStatus:
        """Convert AWS Batch status to JobStatus"""
        status_map = {
            'SUBMITTED': JobStatus.PENDING,
            'PENDING': JobStatus.PENDING,
            'RUNNABLE': JobStatus.PENDING,
            'STARTING': JobStatus.PENDING,
            'RUNNING': JobStatus.RUNNING,
            'SUCCEEDED': JobStatus.COMPLETED,
            'FAILED': JobStatus.FAILED
        }
        
        return status_map.get(batch_status, JobStatus.FAILED)
    
    def get_job_output(self, job_id: str) -> Optional[str]:
        """Get AWS Batch job output from CloudWatch Logs"""
        if job_id not in self.submitted_jobs:
            return None
        
        job_info = self.submitted_jobs[job_id]
        
        # Check if log information is available
        if not job_info.get('log_group') or not job_info.get('log_stream'):
            # Try to get log information from job description
            try:
                response = self.batch_client.describe_jobs(jobs=[job_id])
                if response['jobs']:
                    job = response['jobs'][0]
                    if job.get('attempts'):
                        task_props = job['attempts'][0].get('taskProperties', {})
                        if 'logStreamName' in task_props:
                            job_info['log_group'] = '/aws/batch/job'
                            job_info['log_stream'] = task_props['logStreamName']
            except:
                pass
        
        if not job_info.get('log_group') or not job_info.get('log_stream'):
            return None
        
        try:
            # Get logs from CloudWatch
            response = self.logs_client.get_log_events(
                logGroupName=job_info['log_group'],
                logStreamName=job_info['log_stream']
            )
            
            log_lines = []
            for event in response['events']:
                timestamp = time.strftime('%Y-%m-%d %H:%M:%S', 
                                        time.localtime(event['timestamp'] / 1000))
                log_lines.append(f"[{timestamp}] {event['message']}")
            
            return '\n'.join(log_lines)
            
        except Exception as e:
            self.logger.error(f"Error retrieving logs for job {job_id}: {e}")
            return None
    
    def cancel_job(self, job_id: str) -> bool:
        """Cancel AWS Batch job"""
        try:
            self.batch_client.cancel_job(
                jobId=job_id,
                reason='Cancelled by user'
            )
            
            # Update job status
            if job_id in self.submitted_jobs:
                self.submitted_jobs[job_id]['status'] = JobStatus.CANCELLED
            
            self.logger.info(f"AWS Batch job {job_id} cancelled")
            return True
            
        except Exception as e:
            self.logger.error(f"Failed to cancel AWS Batch job {job_id}: {e}")
            return False
    
    def is_available(self) -> bool:
        """Check if AWS Batch is available"""
        try:
            # Test by listing job queues
            self.batch_client.describe_job_queues(maxResults=1)
            return True
        except Exception:
            return False
    
    def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
        """Get AWS Batch job resource usage"""
        try:
            response = self.batch_client.describe_jobs(jobs=[job_id])
            
            if not response['jobs']:
                return {}
            
            job = response['jobs'][0]
            
            # Extract resource information
            usage_info = {
                'status': job['status'],
                'created_at': job['createdAt'],
                'started_at': job.get('startedAt'),
                'stopped_at': job.get('stoppedAt')
            }
            
            # Add container information if available
            if 'container' in job:
                container = job['container']
                usage_info.update({
                    'vcpus': container.get('vcpus'),
                    'memory': container.get('memory'),
                    'job_role_arn': container.get('jobRoleArn'),
                    'exit_code': container.get('exitCode'),
                    'reason': container.get('reason')
                })
            
            return usage_info
            
        except Exception as e:
            self.logger.error(f"Error getting resource usage for job {job_id}: {e}")
            return {}
    
    def get_queue_status(self) -> Dict[str, Any]:
        """Get AWS Batch queue status"""
        try:
            response = self.batch_client.describe_job_queues(
                jobQueues=[self.job_queue]
            )
            
            if not response['jobQueues']:
                return {}
            
            queue = response['jobQueues'][0]
            
            return {
                'queue_name': queue['jobQueueName'],
                'state': queue['state'],
                'status': queue['status'],
                'priority': queue['priority'],
                'compute_environments': queue['computeEnvironmentOrder']
            }
            
        except Exception as e:
            self.logger.error(f"Error getting queue status: {e}")
            return {}
    
    def create_job_definition(self, definition_config: Dict[str, Any]) -> str:
        """Create AWS Batch job definition"""
        try:
            response = self.batch_client.register_job_definition(**definition_config)
            job_def_arn = response['jobDefinitionArn']
            
            self.logger.info(f"Created job definition: {job_def_arn}")
            return job_def_arn
            
        except Exception as e:
            self.logger.error(f"Failed to create job definition: {e}")
            raise RuntimeError(f"Job definition creation failed: {e}")

    

Backend Registration and Factory

Backend Registry

      # celline/middleware/backends/registry.py
from typing import Dict, Type, Optional
from celline.middleware.backends.base import ExecutionBackend
from celline.log.logger import get_logger

class BackendRegistry:
    """Registry for execution backends"""
    
    def __init__(self):
        self.backends: Dict[str, Type[ExecutionBackend]] = {}
        self.logger = get_logger(__name__)
    
    def register_backend(self, name: str, backend_class: Type[ExecutionBackend]):
        """Register execution backend"""
        self.backends[name] = backend_class
        self.logger.info(f"Registered execution backend: {name}")
    
    def get_backend(self, name: str) -> Optional[Type[ExecutionBackend]]:
        """Get backend class by name"""
        return self.backends.get(name)
    
    def list_backends(self) -> list:
        """List available backend names"""
        return list(self.backends.keys())
    
    def create_backend(self, name: str, **kwargs) -> ExecutionBackend:
        """Create backend instance"""
        backend_class = self.get_backend(name)
        if not backend_class:
            raise ValueError(f"Unknown backend: {name}")
        
        return backend_class(**kwargs)

# Global registry instance
_backend_registry = BackendRegistry()

def get_backend_registry() -> BackendRegistry:
    """Get global backend registry"""
    return _backend_registry

def register_default_backends():
    """Register default execution backends"""
    from .local_backend import LocalExecutionBackend
    from .pbs_backend import PBSExecutionBackend
    from .slurm_backend import SLURMExecutionBackend
    from .aws_batch_backend import AWSBatchExecutionBackend
    
    registry = get_backend_registry()
    
    registry.register_backend("local", LocalExecutionBackend)
    registry.register_backend("pbs", PBSExecutionBackend)
    registry.register_backend("slurm", SLURMExecutionBackend)
    registry.register_backend("aws_batch", AWSBatchExecutionBackend)

# Auto-register default backends
register_default_backends()

    

Backend Factory

      # celline/middleware/backends/factory.py
from typing import Dict, Any, Optional
from celline.middleware.backends.registry import get_backend_registry
from celline.middleware.backends.base import ExecutionBackend
from celline.config import Config
from celline.log.logger import get_logger

class BackendFactory:
    """Factory for creating execution backends"""
    
    def __init__(self):
        self.logger = get_logger(__name__)
        self._cached_backends: Dict[str, ExecutionBackend] = {}
    
    def create_backend(self, backend_config: Dict[str, Any]) -> ExecutionBackend:
        """
        Create execution backend from configuration
        
        Args:
            backend_config: Backend configuration dictionary
            
        Returns:
            ExecutionBackend instance
        """
        backend_type = backend_config.get('type')
        if not backend_type:
            raise ValueError("Backend type not specified")
        
        # Check cache first
        cache_key = self._get_cache_key(backend_config)
        if cache_key in self._cached_backends:
            return self._cached_backends[cache_key]
        
        # Get backend class
        registry = get_backend_registry()
        backend_class = registry.get_backend(backend_type)
        
        if not backend_class:
            raise ValueError(f"Unknown backend type: {backend_type}")
        
        # Extract backend-specific configuration
        backend_params = backend_config.get('params', {})
        
        try:
            # Create backend instance
            backend = backend_class(**backend_params)
            
            # Validate backend availability
            if not backend.is_available():
                raise RuntimeError(f"Backend {backend_type} is not available")
            
            # Cache the backend
            self._cached_backends[cache_key] = backend
            
            self.logger.info(f"Created {backend_type} execution backend")
            return backend
            
        except Exception as e:
            self.logger.error(f"Failed to create {backend_type} backend: {e}")
            raise
    
    def _get_cache_key(self, backend_config: Dict[str, Any]) -> str:
        """Generate cache key for backend configuration"""
        import hashlib
        import json
        
        # Create deterministic hash of configuration
        config_str = json.dumps(backend_config, sort_keys=True)
        return hashlib.md5(config_str.encode()).hexdigest()
    
    def get_default_backend(self) -> ExecutionBackend:
        """Get default backend based on system configuration"""
        # Check configuration for default backend
        execution_config = getattr(Config, 'EXECUTION', {})
        backend_config = execution_config.get('backend', {})
        
        if not backend_config:
            # Auto-detect available backend
            backend_config = self._auto_detect_backend()
        
        return self.create_backend(backend_config)
    
    def _auto_detect_backend(self) -> Dict[str, Any]:
        """Auto-detect best available backend"""
        registry = get_backend_registry()
        
        # Priority order for backend selection
        backend_priorities = [
            ('slurm', {}),
            ('pbs', {}),
            ('local', {'max_concurrent_jobs': 2})
        ]
        
        for backend_type, default_params in backend_priorities:
            backend_class = registry.get_backend(backend_type)
            if backend_class:
                try:
                    # Test backend availability
                    test_backend = backend_class(**default_params)
                    if test_backend.is_available():
                        self.logger.info(f"Auto-detected {backend_type} backend")
                        return {
                            'type': backend_type,
                            'params': default_params
                        }
                except Exception:
                    continue
        
        # Fallback to local backend
        self.logger.warning("No suitable backend found, using local backend")
        return {
            'type': 'local',
            'params': {'max_concurrent_jobs': 1}
        }
    
    def list_available_backends(self) -> Dict[str, bool]:
        """List all backends and their availability"""
        registry = get_backend_registry()
        availability = {}
        
        for backend_name in registry.list_backends():
            backend_class = registry.get_backend(backend_name)
            try:
                # Create minimal instance to test availability
                backend = backend_class()
                availability[backend_name] = backend.is_available()
            except Exception:
                availability[backend_name] = False
        
        return availability

# Global factory instance
_backend_factory = BackendFactory()

def get_backend_factory() -> BackendFactory:
    """Get global backend factory"""
    return _backend_factory

    

This comprehensive guide covers all aspects of creating custom execution backends for Celline, from basic local execution to advanced cloud and HPC integration, with proper resource management, monitoring, and error handling.