Execution Backend Development
Learn how to create custom execution backends to support different computing environments and resource management systems in Celline.
Execution Backend Architecture
Backend Interface
All execution backends must implement the ExecutionBackend
interface:
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from enum import Enum
import subprocess
class JobStatus(Enum):
"""Job execution status"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class ExecutionBackend(ABC):
"""Base interface for execution backends"""
@abstractmethod
def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
"""Submit job for execution and return job ID"""
pass
@abstractmethod
def get_job_status(self, job_id: str) -> JobStatus:
"""Get current job status"""
pass
@abstractmethod
def get_job_output(self, job_id: str) -> Optional[str]:
"""Get job output logs"""
pass
@abstractmethod
def cancel_job(self, job_id: str) -> bool:
"""Cancel running job"""
pass
@abstractmethod
def is_available(self) -> bool:
"""Check if backend is available"""
pass
def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
"""Get resource usage statistics (optional override)"""
return {}
def cleanup_job(self, job_id: str) -> bool:
"""Cleanup job artifacts (optional override)"""
return True
Job Configuration Schema
Standard job configuration structure:
from dataclasses import dataclass
from typing import Optional, Dict, List
@dataclass
class JobConfig:
"""Job configuration schema"""
name: str # Job name
command: str # Command to execute
working_directory: str # Working directory
environment: Dict[str, str] # Environment variables
resources: 'ResourceConfig' # Resource requirements
dependencies: List[str] # Job dependencies
timeout: Optional[int] # Timeout in seconds
retry_count: int = 0 # Number of retries
priority: int = 0 # Job priority
metadata: Dict[str, Any] = None # Additional metadata
@dataclass
class ResourceConfig:
"""Resource requirements configuration"""
cpu_cores: int = 1 # Number of CPU cores
memory_gb: float = 1.0 # Memory in GB
gpu_count: int = 0 # Number of GPUs
disk_space_gb: float = 1.0 # Disk space in GB
walltime_minutes: Optional[int] = None # Maximum runtime
queue: Optional[str] = None # Queue name
node_type: Optional[str] = None # Node type specification
Local Execution Backend
Basic Local Backend
# celline/middleware/backends/local_backend.py
import subprocess
import threading
import uuid
import time
import os
import signal
from typing import Dict, Any, Optional
from pathlib import Path
from celline.middleware.backends.base import ExecutionBackend, JobStatus, JobConfig
from celline.log.logger import get_logger
class LocalExecutionBackend(ExecutionBackend):
"""
Local execution backend for running jobs on the local machine
This backend executes jobs as local processes with resource monitoring
and job management capabilities.
"""
def __init__(self, max_concurrent_jobs: int = 4,
job_output_dir: str = "./logs/jobs"):
"""
Initialize local execution backend
Args:
max_concurrent_jobs: Maximum number of concurrent jobs
job_output_dir: Directory for job output files
"""
self.max_concurrent_jobs = max_concurrent_jobs
self.job_output_dir = Path(job_output_dir)
self.job_output_dir.mkdir(parents=True, exist_ok=True)
# Job tracking
self.active_jobs: Dict[str, Dict[str, Any]] = {}
self.job_processes: Dict[str, subprocess.Popen] = {}
self.job_threads: Dict[str, threading.Thread] = {}
# Resource monitoring
self.current_cpu_usage = 0
self.current_memory_usage = 0.0
self.logger = get_logger(__name__)
# Start resource monitor
self._start_resource_monitor()
def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
"""
Submit job for local execution
Args:
command: Command to execute
job_config: Job configuration
Returns:
Job ID string
"""
job_id = str(uuid.uuid4())
# Parse job configuration
config = self._parse_job_config(job_config)
# Check resource availability
if not self._check_resource_availability(config.resources):
raise RuntimeError("Insufficient resources for job execution")
# Check concurrent job limit
running_jobs = sum(1 for job in self.active_jobs.values()
if job['status'] == JobStatus.RUNNING)
if running_jobs >= self.max_concurrent_jobs:
# Queue the job
self.active_jobs[job_id] = {
'status': JobStatus.PENDING,
'command': command,
'config': config,
'submitted_at': time.time(),
'started_at': None,
'finished_at': None,
'output_file': self.job_output_dir / f"{job_id}.log",
'error_file': self.job_output_dir / f"{job_id}.err"
}
self.logger.info(f"Job {job_id} queued (concurrent limit reached)")
else:
# Execute immediately
self._execute_job(job_id, command, config)
return job_id
def _parse_job_config(self, job_config: Dict[str, Any]) -> JobConfig:
"""Parse job configuration dictionary"""
from celline.middleware.backends.schemas import JobConfig, ResourceConfig
# Extract resource configuration
resources_dict = job_config.get('resources', {})
resources = ResourceConfig(
cpu_cores=resources_dict.get('cpu_cores', 1),
memory_gb=resources_dict.get('memory_gb', 1.0),
gpu_count=resources_dict.get('gpu_count', 0),
disk_space_gb=resources_dict.get('disk_space_gb', 1.0),
walltime_minutes=resources_dict.get('walltime_minutes'),
queue=resources_dict.get('queue'),
node_type=resources_dict.get('node_type')
)
# Create job configuration
config = JobConfig(
name=job_config.get('name', 'celline_job'),
command=job_config.get('command', ''),
working_directory=job_config.get('working_directory', '.'),
environment=job_config.get('environment', {}),
resources=resources,
dependencies=job_config.get('dependencies', []),
timeout=job_config.get('timeout'),
retry_count=job_config.get('retry_count', 0),
priority=job_config.get('priority', 0),
metadata=job_config.get('metadata', {})
)
return config
def _check_resource_availability(self, resources: 'ResourceConfig') -> bool:
"""Check if required resources are available"""
import psutil
# Check CPU availability
available_cpu = psutil.cpu_count()
if resources.cpu_cores > available_cpu:
self.logger.warning(f"Requested {resources.cpu_cores} CPUs, only {available_cpu} available")
return False
# Check memory availability
available_memory = psutil.virtual_memory().available / (1024**3) # GB
if resources.memory_gb > available_memory:
self.logger.warning(f"Requested {resources.memory_gb}GB memory, only {available_memory:.1f}GB available")
return False
# Check disk space
available_disk = psutil.disk_usage('.').free / (1024**3) # GB
if resources.disk_space_gb > available_disk:
self.logger.warning(f"Requested {resources.disk_space_gb}GB disk, only {available_disk:.1f}GB available")
return False
return True
def _execute_job(self, job_id: str, command: str, config: JobConfig):
"""Execute job in background thread"""
# Initialize job record
self.active_jobs[job_id] = {
'status': JobStatus.RUNNING,
'command': command,
'config': config,
'submitted_at': time.time(),
'started_at': time.time(),
'finished_at': None,
'output_file': self.job_output_dir / f"{job_id}.log",
'error_file': self.job_output_dir / f"{job_id}.err",
'pid': None,
'exit_code': None
}
# Start execution thread
thread = threading.Thread(
target=self._run_job_process,
args=(job_id, command, config),
daemon=True
)
self.job_threads[job_id] = thread
thread.start()
self.logger.info(f"Job {job_id} started execution")
def _run_job_process(self, job_id: str, command: str, config: JobConfig):
"""Run job process with monitoring"""
job_info = self.active_jobs[job_id]
try:
# Prepare environment
env = os.environ.copy()
env.update(config.environment)
# Open output files
with open(job_info['output_file'], 'w') as stdout_file, \
open(job_info['error_file'], 'w') as stderr_file:
# Start process
process = subprocess.Popen(
command,
shell=True,
cwd=config.working_directory,
env=env,
stdout=stdout_file,
stderr=stderr_file,
preexec_fn=os.setsid # Create new process group
)
# Store process reference
self.job_processes[job_id] = process
job_info['pid'] = process.pid
# Wait for completion with timeout
try:
if config.timeout:
exit_code = process.wait(timeout=config.timeout)
else:
exit_code = process.wait()
job_info['exit_code'] = exit_code
if exit_code == 0:
job_info['status'] = JobStatus.COMPLETED
self.logger.info(f"Job {job_id} completed successfully")
else:
job_info['status'] = JobStatus.FAILED
self.logger.error(f"Job {job_id} failed with exit code {exit_code}")
except subprocess.TimeoutExpired:
# Kill process and its children
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
process.wait(timeout=5)
except:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
job_info['status'] = JobStatus.FAILED
job_info['exit_code'] = -1
self.logger.error(f"Job {job_id} terminated due to timeout")
except Exception as e:
job_info['status'] = JobStatus.FAILED
job_info['exit_code'] = -1
self.logger.exception(f"Job {job_id} failed with exception: {e}")
finally:
# Cleanup
job_info['finished_at'] = time.time()
if job_id in self.job_processes:
del self.job_processes[job_id]
# Try to start queued jobs
self._start_queued_jobs()
def _start_queued_jobs(self):
"""Start pending jobs if resources are available"""
running_jobs = sum(1 for job in self.active_jobs.values()
if job['status'] == JobStatus.RUNNING)
if running_jobs >= self.max_concurrent_jobs:
return
# Find pending jobs sorted by priority and submission time
pending_jobs = [
(job_id, job_info) for job_id, job_info in self.active_jobs.items()
if job_info['status'] == JobStatus.PENDING
]
pending_jobs.sort(key=lambda x: (-x[1]['config'].priority, x[1]['submitted_at']))
# Start jobs up to the limit
jobs_to_start = min(len(pending_jobs),
self.max_concurrent_jobs - running_jobs)
for i in range(jobs_to_start):
job_id, job_info = pending_jobs[i]
if self._check_resource_availability(job_info['config'].resources):
self._execute_job(job_id, job_info['command'], job_info['config'])
def get_job_status(self, job_id: str) -> JobStatus:
"""Get current job status"""
if job_id not in self.active_jobs:
raise ValueError(f"Job {job_id} not found")
return self.active_jobs[job_id]['status']
def get_job_output(self, job_id: str) -> Optional[str]:
"""Get job output logs"""
if job_id not in self.active_jobs:
return None
job_info = self.active_jobs[job_id]
try:
# Combine stdout and stderr
output_lines = []
if job_info['output_file'].exists():
with open(job_info['output_file'], 'r') as f:
output_lines.extend(f.readlines())
if job_info['error_file'].exists():
with open(job_info['error_file'], 'r') as f:
error_lines = f.readlines()
if error_lines:
output_lines.append("\n--- STDERR ---\n")
output_lines.extend(error_lines)
return ''.join(output_lines)
except Exception as e:
self.logger.error(f"Error reading output for job {job_id}: {e}")
return None
def cancel_job(self, job_id: str) -> bool:
"""Cancel running job"""
if job_id not in self.active_jobs:
return False
job_info = self.active_jobs[job_id]
if job_info['status'] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
return True
if job_info['status'] == JobStatus.PENDING:
job_info['status'] = JobStatus.CANCELLED
job_info['finished_at'] = time.time()
return True
# Cancel running job
if job_id in self.job_processes:
process = self.job_processes[job_id]
try:
# Try graceful termination first
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
# Wait briefly for graceful shutdown
try:
process.wait(timeout=5)
except subprocess.TimeoutExpired:
# Force kill if necessary
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
job_info['status'] = JobStatus.CANCELLED
job_info['finished_at'] = time.time()
self.logger.info(f"Job {job_id} cancelled successfully")
return True
except Exception as e:
self.logger.error(f"Error cancelling job {job_id}: {e}")
return False
return False
def is_available(self) -> bool:
"""Check if local backend is available"""
return True # Local backend is always available
def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
"""Get resource usage for job"""
if job_id not in self.active_jobs:
return {}
job_info = self.active_jobs[job_id]
if 'pid' not in job_info or job_info['pid'] is None:
return {}
try:
import psutil
process = psutil.Process(job_info['pid'])
return {
'cpu_percent': process.cpu_percent(),
'memory_mb': process.memory_info().rss / (1024**2),
'memory_percent': process.memory_percent(),
'num_threads': process.num_threads(),
'status': process.status(),
'create_time': process.create_time()
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
return {}
def cleanup_job(self, job_id: str) -> bool:
"""Cleanup job artifacts"""
if job_id not in self.active_jobs:
return False
job_info = self.active_jobs[job_id]
try:
# Remove output files
if job_info['output_file'].exists():
job_info['output_file'].unlink()
if job_info['error_file'].exists():
job_info['error_file'].unlink()
# Remove job from tracking
del self.active_jobs[job_id]
# Cleanup thread reference
if job_id in self.job_threads:
del self.job_threads[job_id]
return True
except Exception as e:
self.logger.error(f"Error cleaning up job {job_id}: {e}")
return False
def _start_resource_monitor(self):
"""Start resource monitoring thread"""
def monitor():
import psutil
while True:
try:
self.current_cpu_usage = psutil.cpu_percent(interval=1)
self.current_memory_usage = psutil.virtual_memory().percent
time.sleep(10) # Update every 10 seconds
except Exception as e:
self.logger.error(f"Resource monitoring error: {e}")
time.sleep(30)
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
def get_system_status(self) -> Dict[str, Any]:
"""Get overall system status"""
import psutil
running_jobs = sum(1 for job in self.active_jobs.values()
if job['status'] == JobStatus.RUNNING)
pending_jobs = sum(1 for job in self.active_jobs.values()
if job['status'] == JobStatus.PENDING)
return {
'backend_type': 'local',
'running_jobs': running_jobs,
'pending_jobs': pending_jobs,
'max_concurrent_jobs': self.max_concurrent_jobs,
'cpu_usage_percent': self.current_cpu_usage,
'memory_usage_percent': self.current_memory_usage,
'available_cpu_cores': psutil.cpu_count(),
'available_memory_gb': psutil.virtual_memory().available / (1024**3),
'available_disk_gb': psutil.disk_usage('.').free / (1024**3)
}
PBS/Torque Backend
Professional HPC Backend
# celline/middleware/backends/pbs_backend.py
import subprocess
import re
import time
from typing import Dict, Any, Optional
from celline.middleware.backends.base import ExecutionBackend, JobStatus
from celline.log.logger import get_logger
class PBSExecutionBackend(ExecutionBackend):
"""
PBS/Torque execution backend for HPC clusters
This backend submits jobs to PBS/Torque queue management system
and provides comprehensive job monitoring and resource management.
"""
def __init__(self, default_queue: str = "default",
script_template_path: Optional[str] = None):
"""
Initialize PBS execution backend
Args:
default_queue: Default queue for job submission
script_template_path: Path to custom PBS script template
"""
self.default_queue = default_queue
self.script_template_path = script_template_path
self.logger = get_logger(__name__)
# Job tracking
self.submitted_jobs: Dict[str, Dict[str, Any]] = {}
# Validate PBS availability
if not self.is_available():
raise RuntimeError("PBS/Torque system not available")
def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
"""
Submit job to PBS queue
Args:
command: Command to execute
job_config: Job configuration
Returns:
PBS job ID
"""
# Generate PBS script
script_content = self._generate_pbs_script(command, job_config)
# Write script to temporary file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.pbs', delete=False) as f:
f.write(script_content)
script_path = f.name
try:
# Submit job
result = subprocess.run(
['qsub', script_path],
capture_output=True,
text=True,
check=True
)
# Extract job ID from qsub output
job_id = result.stdout.strip()
# Store job information
self.submitted_jobs[job_id] = {
'command': command,
'config': job_config,
'script_path': script_path,
'submitted_at': time.time(),
'status': JobStatus.PENDING
}
self.logger.info(f"Job {job_id} submitted to PBS queue")
return job_id
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to submit PBS job: {e.stderr}")
raise RuntimeError(f"PBS job submission failed: {e.stderr}")
finally:
# Cleanup script file
import os
try:
os.unlink(script_path)
except:
pass
def _generate_pbs_script(self, command: str, job_config: Dict[str, Any]) -> str:
"""Generate PBS script content"""
config = self._parse_job_config(job_config)
# PBS script template
script_lines = [
"#!/bin/bash",
"",
f"#PBS -N {config.name}",
f"#PBS -q {config.resources.queue or self.default_queue}",
f"#PBS -l nodes=1:ppn={config.resources.cpu_cores}",
f"#PBS -l mem={int(config.resources.memory_gb)}gb",
]
# Add walltime if specified
if config.resources.walltime_minutes:
hours = config.resources.walltime_minutes // 60
minutes = config.resources.walltime_minutes % 60
script_lines.append(f"#PBS -l walltime={hours:02d}:{minutes:02d}:00")
# Add GPU request if needed
if config.resources.gpu_count > 0:
script_lines.append(f"#PBS -l gpus={config.resources.gpu_count}")
# Add output/error files
script_lines.extend([
"#PBS -o ${PBS_JOBNAME}.out",
"#PBS -e ${PBS_JOBNAME}.err",
"#PBS -V", # Export environment variables
""
])
# Add working directory change
script_lines.extend([
f"cd {config.working_directory}",
""
])
# Add environment variables
for key, value in config.environment.items():
script_lines.append(f"export {key}='{value}'")
if config.environment:
script_lines.append("")
# Add module loads if specified
modules = job_config.get('modules', [])
for module in modules:
script_lines.append(f"module load {module}")
if modules:
script_lines.append("")
# Add the actual command
script_lines.extend([
"# Execute command",
command,
"",
"# Check exit status",
"exit_code=$?",
"echo \"Job completed with exit code: $exit_code\"",
"exit $exit_code"
])
return "\n".join(script_lines)
def get_job_status(self, job_id: str) -> JobStatus:
"""Get PBS job status"""
try:
result = subprocess.run(
['qstat', '-f', job_id],
capture_output=True,
text=True,
check=True
)
# Parse qstat output
status_match = re.search(r'job_state = (\w)', result.stdout)
if status_match:
pbs_status = status_match.group(1)
return self._convert_pbs_status(pbs_status)
return JobStatus.PENDING
except subprocess.CalledProcessError:
# Job might be completed and removed from queue
# Check if we have it in our tracking
if job_id in self.submitted_jobs:
# Try to get completion status from output files
return self._check_completion_status(job_id)
return JobStatus.FAILED
def _convert_pbs_status(self, pbs_status: str) -> JobStatus:
"""Convert PBS status to JobStatus"""
status_map = {
'Q': JobStatus.PENDING, # Queued
'R': JobStatus.RUNNING, # Running
'C': JobStatus.COMPLETED, # Completed
'E': JobStatus.COMPLETED, # Exiting
'H': JobStatus.PENDING, # Held
'S': JobStatus.PENDING, # Suspended
'T': JobStatus.RUNNING, # Transitioning
'W': JobStatus.PENDING, # Waiting
'X': JobStatus.FAILED # Finished (failed)
}
return status_map.get(pbs_status, JobStatus.FAILED)
def _check_completion_status(self, job_id: str) -> JobStatus:
"""Check job completion status from output files"""
if job_id not in self.submitted_jobs:
return JobStatus.FAILED
job_info = self.submitted_jobs[job_id]
job_name = job_info['config'].get('name', 'celline_job')
# Check error file for PBS errors
error_file = f"{job_name}.err"
if os.path.exists(error_file):
try:
with open(error_file, 'r') as f:
error_content = f.read()
if error_content.strip():
return JobStatus.FAILED
except:
pass
# Check output file for completion
output_file = f"{job_name}.out"
if os.path.exists(output_file):
try:
with open(output_file, 'r') as f:
output_content = f.read()
# Look for our exit code marker
if "Job completed with exit code: 0" in output_content:
return JobStatus.COMPLETED
elif "Job completed with exit code:" in output_content:
return JobStatus.FAILED
except:
pass
# Default to completed if no error indicators
return JobStatus.COMPLETED
def get_job_output(self, job_id: str) -> Optional[str]:
"""Get PBS job output"""
if job_id not in self.submitted_jobs:
return None
job_info = self.submitted_jobs[job_id]
job_name = job_info['config'].get('name', 'celline_job')
output_lines = []
# Read output file
output_file = f"{job_name}.out"
if os.path.exists(output_file):
try:
with open(output_file, 'r') as f:
output_lines.extend(f.readlines())
except Exception as e:
output_lines.append(f"Error reading output file: {e}\n")
# Read error file
error_file = f"{job_name}.err"
if os.path.exists(error_file):
try:
with open(error_file, 'r') as f:
error_lines = f.readlines()
if error_lines:
output_lines.append("\n--- STDERR ---\n")
output_lines.extend(error_lines)
except Exception as e:
output_lines.append(f"Error reading error file: {e}\n")
return ''.join(output_lines) if output_lines else None
def cancel_job(self, job_id: str) -> bool:
"""Cancel PBS job"""
try:
subprocess.run(
['qdel', job_id],
capture_output=True,
text=True,
check=True
)
# Update job status
if job_id in self.submitted_jobs:
self.submitted_jobs[job_id]['status'] = JobStatus.CANCELLED
self.logger.info(f"PBS job {job_id} cancelled")
return True
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to cancel PBS job {job_id}: {e.stderr}")
return False
def is_available(self) -> bool:
"""Check if PBS system is available"""
try:
subprocess.run(['qstat', '--version'],
capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
"""Get PBS job resource usage"""
try:
result = subprocess.run(
['qstat', '-f', job_id],
capture_output=True,
text=True,
check=True
)
# Parse resource usage from qstat output
usage_info = {}
# Extract resource information
for line in result.stdout.split('\n'):
line = line.strip()
if 'resources_used.cput' in line:
cput_match = re.search(r'resources_used\.cput = (.+)', line)
if cput_match:
usage_info['cpu_time'] = cput_match.group(1)
elif 'resources_used.mem' in line:
mem_match = re.search(r'resources_used\.mem = (.+)', line)
if mem_match:
usage_info['memory_used'] = mem_match.group(1)
elif 'resources_used.walltime' in line:
wall_match = re.search(r'resources_used\.walltime = (.+)', line)
if wall_match:
usage_info['walltime_used'] = wall_match.group(1)
return usage_info
except subprocess.CalledProcessError:
return {}
def get_queue_status(self) -> Dict[str, Any]:
"""Get PBS queue status"""
try:
result = subprocess.run(
['qstat', '-Q'],
capture_output=True,
text=True,
check=True
)
queue_info = {}
lines = result.stdout.strip().split('\n')[2:] # Skip header
for line in lines:
parts = line.split()
if len(parts) >= 6:
queue_name = parts[0]
max_jobs = parts[1]
queued = parts[2]
running = parts[3]
queue_info[queue_name] = {
'max_jobs': max_jobs,
'queued_jobs': queued,
'running_jobs': running,
'status': parts[5]
}
return queue_info
except subprocess.CalledProcessError:
return {}
SLURM Backend
Modern HPC Backend
# celline/middleware/backends/slurm_backend.py
import subprocess
import json
import time
from typing import Dict, Any, Optional
from celline.middleware.backends.base import ExecutionBackend, JobStatus
from celline.log.logger import get_logger
class SLURMExecutionBackend(ExecutionBackend):
"""
SLURM execution backend for modern HPC clusters
This backend provides comprehensive integration with SLURM workload manager
including advanced resource management and job array support.
"""
def __init__(self, default_partition: str = "compute",
account: Optional[str] = None):
"""
Initialize SLURM execution backend
Args:
default_partition: Default partition for job submission
account: SLURM account for billing
"""
self.default_partition = default_partition
self.account = account
self.logger = get_logger(__name__)
# Job tracking
self.submitted_jobs: Dict[str, Dict[str, Any]] = {}
# Validate SLURM availability
if not self.is_available():
raise RuntimeError("SLURM system not available")
def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
"""
Submit job to SLURM
Args:
command: Command to execute
job_config: Job configuration
Returns:
SLURM job ID
"""
# Build sbatch command
sbatch_args = self._build_sbatch_args(job_config)
# Create batch script content
script_content = self._generate_slurm_script(command, job_config)
try:
# Submit job using sbatch
process = subprocess.Popen(
['sbatch'] + sbatch_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=script_content)
if process.returncode != 0:
raise RuntimeError(f"sbatch failed: {stderr}")
# Extract job ID from sbatch output
# Format: "Submitted batch job 12345"
import re
job_match = re.search(r'Submitted batch job (\d+)', stdout)
if not job_match:
raise RuntimeError(f"Could not parse job ID from: {stdout}")
job_id = job_match.group(1)
# Store job information
self.submitted_jobs[job_id] = {
'command': command,
'config': job_config,
'submitted_at': time.time(),
'status': JobStatus.PENDING
}
self.logger.info(f"Job {job_id} submitted to SLURM")
return job_id
except Exception as e:
self.logger.error(f"Failed to submit SLURM job: {e}")
raise RuntimeError(f"SLURM job submission failed: {e}")
def _build_sbatch_args(self, job_config: Dict[str, Any]) -> list:
"""Build sbatch command arguments"""
config = self._parse_job_config(job_config)
args = []
# Job name
args.extend(['--job-name', config.name])
# Partition
partition = config.resources.queue or self.default_partition
args.extend(['--partition', partition])
# Account
if self.account:
args.extend(['--account', self.account])
# Resource requirements
args.extend(['--nodes', '1'])
args.extend(['--ntasks-per-node', '1'])
args.extend(['--cpus-per-task', str(config.resources.cpu_cores)])
args.extend(['--mem', f'{int(config.resources.memory_gb * 1024)}M'])
# Walltime
if config.resources.walltime_minutes:
args.extend(['--time', f'{config.resources.walltime_minutes}'])
# GPU requirements
if config.resources.gpu_count > 0:
args.extend(['--gres', f'gpu:{config.resources.gpu_count}'])
# Output files
args.extend(['--output', f'{config.name}_%j.out'])
args.extend(['--error', f'{config.name}_%j.err'])
# Additional SLURM options
slurm_options = job_config.get('slurm_options', {})
for key, value in slurm_options.items():
if value is True:
args.append(f'--{key}')
elif value is not False:
args.extend([f'--{key}', str(value)])
return args
def _generate_slurm_script(self, command: str, job_config: Dict[str, Any]) -> str:
"""Generate SLURM batch script content"""
config = self._parse_job_config(job_config)
script_lines = [
"#!/bin/bash",
"",
"# SLURM job script generated by Celline",
""
]
# Add working directory change
script_lines.extend([
f"cd {config.working_directory}",
""
])
# Add environment variables
for key, value in config.environment.items():
script_lines.append(f"export {key}='{value}'")
if config.environment:
script_lines.append("")
# Add module loads
modules = job_config.get('modules', [])
for module in modules:
script_lines.append(f"module load {module}")
if modules:
script_lines.append("")
# Add conda environment activation if specified
conda_env = job_config.get('conda_env')
if conda_env:
script_lines.extend([
f"source activate {conda_env}",
""
])
# Add the actual command
script_lines.extend([
"# Execute command",
f"echo \"Starting job at $(date)\"",
f"echo \"Running on node: $SLURMD_NODENAME\"",
f"echo \"Working directory: $(pwd)\"",
"",
command,
"",
"# Check exit status",
"exit_code=$?",
"echo \"Job completed at $(date) with exit code: $exit_code\"",
"exit $exit_code"
])
return "\n".join(script_lines)
def get_job_status(self, job_id: str) -> JobStatus:
"""Get SLURM job status using scontrol"""
try:
result = subprocess.run(
['scontrol', 'show', 'job', job_id],
capture_output=True,
text=True,
check=True
)
# Parse scontrol output for JobState
import re
state_match = re.search(r'JobState=(\w+)', result.stdout)
if state_match:
slurm_state = state_match.group(1)
return self._convert_slurm_status(slurm_state)
return JobStatus.PENDING
except subprocess.CalledProcessError:
# Job might be completed and removed from scontrol
# Try sacct for historical data
return self._get_historical_status(job_id)
def _convert_slurm_status(self, slurm_state: str) -> JobStatus:
"""Convert SLURM status to JobStatus"""
status_map = {
'PENDING': JobStatus.PENDING,
'CONFIGURING': JobStatus.PENDING,
'RUNNING': JobStatus.RUNNING,
'SUSPENDED': JobStatus.RUNNING,
'COMPLETING': JobStatus.RUNNING,
'COMPLETED': JobStatus.COMPLETED,
'CANCELLED': JobStatus.CANCELLED,
'FAILED': JobStatus.FAILED,
'TIMEOUT': JobStatus.FAILED,
'PREEMPTED': JobStatus.FAILED,
'NODE_FAIL': JobStatus.FAILED,
'BOOT_FAIL': JobStatus.FAILED,
'DEADLINE': JobStatus.FAILED,
'OUT_OF_MEMORY': JobStatus.FAILED
}
return status_map.get(slurm_state, JobStatus.FAILED)
def _get_historical_status(self, job_id: str) -> JobStatus:
"""Get job status from SLURM accounting using sacct"""
try:
result = subprocess.run(
['sacct', '-j', job_id, '--format=State', '--noheader', '--parsable2'],
capture_output=True,
text=True,
check=True
)
if result.stdout.strip():
slurm_state = result.stdout.strip().split('\n')[0]
return self._convert_slurm_status(slurm_state)
return JobStatus.FAILED
except subprocess.CalledProcessError:
return JobStatus.FAILED
def get_job_output(self, job_id: str) -> Optional[str]:
"""Get SLURM job output"""
if job_id not in self.submitted_jobs:
return None
job_info = self.submitted_jobs[job_id]
job_name = job_info['config'].get('name', 'celline_job')
output_lines = []
# SLURM output files include job ID
output_file = f"{job_name}_{job_id}.out"
error_file = f"{job_name}_{job_id}.err"
# Read output file
import os
if os.path.exists(output_file):
try:
with open(output_file, 'r') as f:
output_lines.extend(f.readlines())
except Exception as e:
output_lines.append(f"Error reading output file: {e}\n")
# Read error file
if os.path.exists(error_file):
try:
with open(error_file, 'r') as f:
error_lines = f.readlines()
if error_lines:
output_lines.append("\n--- STDERR ---\n")
output_lines.extend(error_lines)
except Exception as e:
output_lines.append(f"Error reading error file: {e}\n")
return ''.join(output_lines) if output_lines else None
def cancel_job(self, job_id: str) -> bool:
"""Cancel SLURM job"""
try:
subprocess.run(
['scancel', job_id],
capture_output=True,
text=True,
check=True
)
# Update job status
if job_id in self.submitted_jobs:
self.submitted_jobs[job_id]['status'] = JobStatus.CANCELLED
self.logger.info(f"SLURM job {job_id} cancelled")
return True
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to cancel SLURM job {job_id}: {e.stderr}")
return False
def is_available(self) -> bool:
"""Check if SLURM system is available"""
try:
subprocess.run(['sinfo', '--version'],
capture_output=True, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False
def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
"""Get SLURM job resource usage using sacct"""
try:
result = subprocess.run([
'sacct', '-j', job_id,
'--format=JobID,CPUTime,MaxRSS,MaxVMSize,Elapsed,State,ExitCode',
'--units=M', '--parsable2', '--noheader'
], capture_output=True, text=True, check=True)
if not result.stdout.strip():
return {}
# Parse sacct output
lines = result.stdout.strip().split('\n')
main_job_line = lines[0] # First line is main job
fields = main_job_line.split('|')
if len(fields) >= 7:
return {
'job_id': fields[0],
'cpu_time': fields[1],
'max_rss_mb': fields[2],
'max_vmsize_mb': fields[3],
'elapsed_time': fields[4],
'state': fields[5],
'exit_code': fields[6]
}
return {}
except subprocess.CalledProcessError:
return {}
def get_partition_info(self) -> Dict[str, Any]:
"""Get SLURM partition information"""
try:
result = subprocess.run([
'sinfo', '--format=%P,%A,%l,%N', '--noheader'
], capture_output=True, text=True, check=True)
partition_info = {}
for line in result.stdout.strip().split('\n'):
parts = line.split(',')
if len(parts) >= 4:
partition = parts[0].rstrip('*') # Remove default indicator
avail_status = parts[1]
time_limit = parts[2]
nodes = parts[3]
partition_info[partition] = {
'availability': avail_status,
'time_limit': time_limit,
'nodes': nodes
}
return partition_info
except subprocess.CalledProcessError:
return {}
def submit_job_array(self, commands: list, job_config: Dict[str, Any]) -> str:
"""Submit SLURM job array"""
array_size = len(commands)
if array_size == 0:
raise ValueError("No commands provided for job array")
# Modify job config for array
array_config = job_config.copy()
array_config['slurm_options'] = array_config.get('slurm_options', {})
array_config['slurm_options']['array'] = f'0-{array_size-1}'
# Create command array script
script_content = self._generate_array_script(commands, job_config)
# Submit array job
job_id = self.submit_job("", array_config) # Empty command, using script
return job_id
def _generate_array_script(self, commands: list, job_config: Dict[str, Any]) -> str:
"""Generate SLURM job array script"""
config = self._parse_job_config(job_config)
script_lines = [
"#!/bin/bash",
"",
"# SLURM job array script generated by Celline",
""
]
# Add working directory and environment setup
script_lines.extend([
f"cd {config.working_directory}",
""
])
# Add environment variables
for key, value in config.environment.items():
script_lines.append(f"export {key}='{value}'")
if config.environment:
script_lines.append("")
# Create command array
script_lines.append("# Command array")
script_lines.append("commands=(")
for cmd in commands:
# Escape commands properly
escaped_cmd = cmd.replace('"', '\\"')
script_lines.append(f' "{escaped_cmd}"')
script_lines.append(")")
script_lines.append("")
# Execute command based on array index
script_lines.extend([
"# Execute command for this array task",
"echo \"Array task $SLURM_ARRAY_TASK_ID starting at $(date)\"",
'eval "${commands[$SLURM_ARRAY_TASK_ID]}"',
"exit_code=$?",
"echo \"Array task $SLURM_ARRAY_TASK_ID completed at $(date) with exit code: $exit_code\"",
"exit $exit_code"
])
return "\n".join(script_lines)
Cloud Backend (AWS Batch)
Cloud-Native Backend
# celline/middleware/backends/aws_batch_backend.py
import boto3
import time
import json
from typing import Dict, Any, Optional
from celline.middleware.backends.base import ExecutionBackend, JobStatus
from celline.log.logger import get_logger
class AWSBatchExecutionBackend(ExecutionBackend):
"""
AWS Batch execution backend for cloud computing
This backend provides integration with AWS Batch for scalable
containerized job execution in the cloud.
"""
def __init__(self, job_queue: str, job_definition: str,
region: str = 'us-east-1'):
"""
Initialize AWS Batch backend
Args:
job_queue: AWS Batch job queue name
job_definition: AWS Batch job definition name
region: AWS region
"""
self.job_queue = job_queue
self.job_definition = job_definition
self.region = region
# Initialize AWS Batch client
self.batch_client = boto3.client('batch', region_name=region)
self.logs_client = boto3.client('logs', region_name=region)
self.logger = get_logger(__name__)
# Job tracking
self.submitted_jobs: Dict[str, Dict[str, Any]] = {}
# Validate AWS Batch availability
if not self.is_available():
raise RuntimeError("AWS Batch not available")
def submit_job(self, command: str, job_config: Dict[str, Any]) -> str:
"""
Submit job to AWS Batch
Args:
command: Command to execute
job_config: Job configuration
Returns:
AWS Batch job ID
"""
config = self._parse_job_config(job_config)
# Build job parameters
job_params = {
'jobName': config.name,
'jobQueue': self.job_queue,
'jobDefinition': self.job_definition,
'parameters': {
'command': command,
'workingDirectory': config.working_directory
}
}
# Add environment variables
if config.environment:
env_list = [
{'name': key, 'value': value}
for key, value in config.environment.items()
]
job_params['containerOverrides'] = {
'environment': env_list
}
# Add resource requirements
if hasattr(config.resources, 'cpu_cores') or hasattr(config.resources, 'memory_gb'):
container_overrides = job_params.get('containerOverrides', {})
if config.resources.cpu_cores > 1:
container_overrides['vcpus'] = config.resources.cpu_cores
if config.resources.memory_gb > 1:
container_overrides['memory'] = int(config.resources.memory_gb * 1024)
job_params['containerOverrides'] = container_overrides
# Add job dependencies
if config.dependencies:
job_params['dependsOn'] = [
{'jobId': dep_id} for dep_id in config.dependencies
]
# Add timeout
if config.timeout:
job_params['timeout'] = {'attemptDurationSeconds': config.timeout}
# Add retry configuration
if config.retry_count > 0:
job_params['retryStrategy'] = {'attempts': config.retry_count + 1}
try:
# Submit job
response = self.batch_client.submit_job(**job_params)
job_id = response['jobId']
# Store job information
self.submitted_jobs[job_id] = {
'command': command,
'config': job_config,
'submitted_at': time.time(),
'status': JobStatus.PENDING,
'log_group': None,
'log_stream': None
}
self.logger.info(f"Job {job_id} submitted to AWS Batch")
return job_id
except Exception as e:
self.logger.error(f"Failed to submit AWS Batch job: {e}")
raise RuntimeError(f"AWS Batch job submission failed: {e}")
def get_job_status(self, job_id: str) -> JobStatus:
"""Get AWS Batch job status"""
try:
response = self.batch_client.describe_jobs(jobs=[job_id])
if not response['jobs']:
return JobStatus.FAILED
job = response['jobs'][0]
batch_status = job['status']
# Update log information if available
if job_id in self.submitted_jobs:
job_info = self.submitted_jobs[job_id]
if 'logStreamName' in job.get('attempts', [{}])[0].get('taskProperties', {}):
task_props = job['attempts'][0]['taskProperties']
job_info['log_group'] = '/aws/batch/job'
job_info['log_stream'] = task_props['logStreamName']
return self._convert_batch_status(batch_status)
except Exception as e:
self.logger.error(f"Error getting job status for {job_id}: {e}")
return JobStatus.FAILED
def _convert_batch_status(self, batch_status: str) -> JobStatus:
"""Convert AWS Batch status to JobStatus"""
status_map = {
'SUBMITTED': JobStatus.PENDING,
'PENDING': JobStatus.PENDING,
'RUNNABLE': JobStatus.PENDING,
'STARTING': JobStatus.PENDING,
'RUNNING': JobStatus.RUNNING,
'SUCCEEDED': JobStatus.COMPLETED,
'FAILED': JobStatus.FAILED
}
return status_map.get(batch_status, JobStatus.FAILED)
def get_job_output(self, job_id: str) -> Optional[str]:
"""Get AWS Batch job output from CloudWatch Logs"""
if job_id not in self.submitted_jobs:
return None
job_info = self.submitted_jobs[job_id]
# Check if log information is available
if not job_info.get('log_group') or not job_info.get('log_stream'):
# Try to get log information from job description
try:
response = self.batch_client.describe_jobs(jobs=[job_id])
if response['jobs']:
job = response['jobs'][0]
if job.get('attempts'):
task_props = job['attempts'][0].get('taskProperties', {})
if 'logStreamName' in task_props:
job_info['log_group'] = '/aws/batch/job'
job_info['log_stream'] = task_props['logStreamName']
except:
pass
if not job_info.get('log_group') or not job_info.get('log_stream'):
return None
try:
# Get logs from CloudWatch
response = self.logs_client.get_log_events(
logGroupName=job_info['log_group'],
logStreamName=job_info['log_stream']
)
log_lines = []
for event in response['events']:
timestamp = time.strftime('%Y-%m-%d %H:%M:%S',
time.localtime(event['timestamp'] / 1000))
log_lines.append(f"[{timestamp}] {event['message']}")
return '\n'.join(log_lines)
except Exception as e:
self.logger.error(f"Error retrieving logs for job {job_id}: {e}")
return None
def cancel_job(self, job_id: str) -> bool:
"""Cancel AWS Batch job"""
try:
self.batch_client.cancel_job(
jobId=job_id,
reason='Cancelled by user'
)
# Update job status
if job_id in self.submitted_jobs:
self.submitted_jobs[job_id]['status'] = JobStatus.CANCELLED
self.logger.info(f"AWS Batch job {job_id} cancelled")
return True
except Exception as e:
self.logger.error(f"Failed to cancel AWS Batch job {job_id}: {e}")
return False
def is_available(self) -> bool:
"""Check if AWS Batch is available"""
try:
# Test by listing job queues
self.batch_client.describe_job_queues(maxResults=1)
return True
except Exception:
return False
def get_resource_usage(self, job_id: str) -> Dict[str, Any]:
"""Get AWS Batch job resource usage"""
try:
response = self.batch_client.describe_jobs(jobs=[job_id])
if not response['jobs']:
return {}
job = response['jobs'][0]
# Extract resource information
usage_info = {
'status': job['status'],
'created_at': job['createdAt'],
'started_at': job.get('startedAt'),
'stopped_at': job.get('stoppedAt')
}
# Add container information if available
if 'container' in job:
container = job['container']
usage_info.update({
'vcpus': container.get('vcpus'),
'memory': container.get('memory'),
'job_role_arn': container.get('jobRoleArn'),
'exit_code': container.get('exitCode'),
'reason': container.get('reason')
})
return usage_info
except Exception as e:
self.logger.error(f"Error getting resource usage for job {job_id}: {e}")
return {}
def get_queue_status(self) -> Dict[str, Any]:
"""Get AWS Batch queue status"""
try:
response = self.batch_client.describe_job_queues(
jobQueues=[self.job_queue]
)
if not response['jobQueues']:
return {}
queue = response['jobQueues'][0]
return {
'queue_name': queue['jobQueueName'],
'state': queue['state'],
'status': queue['status'],
'priority': queue['priority'],
'compute_environments': queue['computeEnvironmentOrder']
}
except Exception as e:
self.logger.error(f"Error getting queue status: {e}")
return {}
def create_job_definition(self, definition_config: Dict[str, Any]) -> str:
"""Create AWS Batch job definition"""
try:
response = self.batch_client.register_job_definition(**definition_config)
job_def_arn = response['jobDefinitionArn']
self.logger.info(f"Created job definition: {job_def_arn}")
return job_def_arn
except Exception as e:
self.logger.error(f"Failed to create job definition: {e}")
raise RuntimeError(f"Job definition creation failed: {e}")
Backend Registration and Factory
Backend Registry
# celline/middleware/backends/registry.py
from typing import Dict, Type, Optional
from celline.middleware.backends.base import ExecutionBackend
from celline.log.logger import get_logger
class BackendRegistry:
"""Registry for execution backends"""
def __init__(self):
self.backends: Dict[str, Type[ExecutionBackend]] = {}
self.logger = get_logger(__name__)
def register_backend(self, name: str, backend_class: Type[ExecutionBackend]):
"""Register execution backend"""
self.backends[name] = backend_class
self.logger.info(f"Registered execution backend: {name}")
def get_backend(self, name: str) -> Optional[Type[ExecutionBackend]]:
"""Get backend class by name"""
return self.backends.get(name)
def list_backends(self) -> list:
"""List available backend names"""
return list(self.backends.keys())
def create_backend(self, name: str, **kwargs) -> ExecutionBackend:
"""Create backend instance"""
backend_class = self.get_backend(name)
if not backend_class:
raise ValueError(f"Unknown backend: {name}")
return backend_class(**kwargs)
# Global registry instance
_backend_registry = BackendRegistry()
def get_backend_registry() -> BackendRegistry:
"""Get global backend registry"""
return _backend_registry
def register_default_backends():
"""Register default execution backends"""
from .local_backend import LocalExecutionBackend
from .pbs_backend import PBSExecutionBackend
from .slurm_backend import SLURMExecutionBackend
from .aws_batch_backend import AWSBatchExecutionBackend
registry = get_backend_registry()
registry.register_backend("local", LocalExecutionBackend)
registry.register_backend("pbs", PBSExecutionBackend)
registry.register_backend("slurm", SLURMExecutionBackend)
registry.register_backend("aws_batch", AWSBatchExecutionBackend)
# Auto-register default backends
register_default_backends()
Backend Factory
# celline/middleware/backends/factory.py
from typing import Dict, Any, Optional
from celline.middleware.backends.registry import get_backend_registry
from celline.middleware.backends.base import ExecutionBackend
from celline.config import Config
from celline.log.logger import get_logger
class BackendFactory:
"""Factory for creating execution backends"""
def __init__(self):
self.logger = get_logger(__name__)
self._cached_backends: Dict[str, ExecutionBackend] = {}
def create_backend(self, backend_config: Dict[str, Any]) -> ExecutionBackend:
"""
Create execution backend from configuration
Args:
backend_config: Backend configuration dictionary
Returns:
ExecutionBackend instance
"""
backend_type = backend_config.get('type')
if not backend_type:
raise ValueError("Backend type not specified")
# Check cache first
cache_key = self._get_cache_key(backend_config)
if cache_key in self._cached_backends:
return self._cached_backends[cache_key]
# Get backend class
registry = get_backend_registry()
backend_class = registry.get_backend(backend_type)
if not backend_class:
raise ValueError(f"Unknown backend type: {backend_type}")
# Extract backend-specific configuration
backend_params = backend_config.get('params', {})
try:
# Create backend instance
backend = backend_class(**backend_params)
# Validate backend availability
if not backend.is_available():
raise RuntimeError(f"Backend {backend_type} is not available")
# Cache the backend
self._cached_backends[cache_key] = backend
self.logger.info(f"Created {backend_type} execution backend")
return backend
except Exception as e:
self.logger.error(f"Failed to create {backend_type} backend: {e}")
raise
def _get_cache_key(self, backend_config: Dict[str, Any]) -> str:
"""Generate cache key for backend configuration"""
import hashlib
import json
# Create deterministic hash of configuration
config_str = json.dumps(backend_config, sort_keys=True)
return hashlib.md5(config_str.encode()).hexdigest()
def get_default_backend(self) -> ExecutionBackend:
"""Get default backend based on system configuration"""
# Check configuration for default backend
execution_config = getattr(Config, 'EXECUTION', {})
backend_config = execution_config.get('backend', {})
if not backend_config:
# Auto-detect available backend
backend_config = self._auto_detect_backend()
return self.create_backend(backend_config)
def _auto_detect_backend(self) -> Dict[str, Any]:
"""Auto-detect best available backend"""
registry = get_backend_registry()
# Priority order for backend selection
backend_priorities = [
('slurm', {}),
('pbs', {}),
('local', {'max_concurrent_jobs': 2})
]
for backend_type, default_params in backend_priorities:
backend_class = registry.get_backend(backend_type)
if backend_class:
try:
# Test backend availability
test_backend = backend_class(**default_params)
if test_backend.is_available():
self.logger.info(f"Auto-detected {backend_type} backend")
return {
'type': backend_type,
'params': default_params
}
except Exception:
continue
# Fallback to local backend
self.logger.warning("No suitable backend found, using local backend")
return {
'type': 'local',
'params': {'max_concurrent_jobs': 1}
}
def list_available_backends(self) -> Dict[str, bool]:
"""List all backends and their availability"""
registry = get_backend_registry()
availability = {}
for backend_name in registry.list_backends():
backend_class = registry.get_backend(backend_name)
try:
# Create minimal instance to test availability
backend = backend_class()
availability[backend_name] = backend.is_available()
except Exception:
availability[backend_name] = False
return availability
# Global factory instance
_backend_factory = BackendFactory()
def get_backend_factory() -> BackendFactory:
"""Get global backend factory"""
return _backend_factory
This comprehensive guide covers all aspects of creating custom execution backends for Celline, from basic local execution to advanced cloud and HPC integration, with proper resource management, monitoring, and error handling.