Source code for ccat_workflow_manager.hpc.local

"""Local HPC backend - direct Apptainer execution via subprocess.

For local development, standalone servers, or sites without SLURM/K8s.
Tracks jobs in Redis with synthetic UUID job IDs.
"""

import subprocess
import uuid
from datetime import datetime, timezone

from .base import HPCBackend, HPCJobInfo, HPCJobStatus
from ..utils import get_redis_connection
from ..logging_utils import get_structured_logger
from ..exceptions import HPCSubmissionError

logger = get_structured_logger(__name__)

REDIS_JOB_PREFIX = "wf:local_job:"


[docs] class LocalBackend(HPCBackend): """Run jobs locally via direct apptainer exec."""
[docs] def submit( self, execution_command: str, image_ref: str, sif_path: str, input_dir: str, output_dir: str, workspace_dir: str, manifest_path: str, resource_requirements: dict, environment_variables: dict, job_name: str, ) -> str: job_id = str(uuid.uuid4())[:8] redis = get_redis_connection() # Store job metadata in Redis redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", mapping={ "status": HPCJobStatus.RUNNING.value, "command": execution_command, "job_name": job_name, "start_time": datetime.now(timezone.utc).isoformat(), }) redis.expire(f"{REDIS_JOB_PREFIX}{job_id}", 86400 * 7) try: result = subprocess.Popen( execution_command, shell=True, stdout=open(f"{output_dir}/{job_name}.out", "w"), stderr=open(f"{output_dir}/{job_name}.err", "w"), env={**dict(environment_variables)}, ) redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", "pid", str(result.pid)) logger.info("local_job_submitted", job_id=job_id, pid=result.pid) return job_id except Exception as e: redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", mapping={ "status": HPCJobStatus.FAILED.value, "error": str(e), }) raise HPCSubmissionError(f"Local execution failed: {e}")
[docs] def get_status(self, job_id: str) -> HPCJobInfo: redis = get_redis_connection() data = redis.hgetall(f"{REDIS_JOB_PREFIX}{job_id}") if not data: return HPCJobInfo(job_id=job_id, status=HPCJobStatus.UNKNOWN) pid = data.get("pid") stored_status = data.get("status", "unknown") # Check if process is still running if pid and stored_status == HPCJobStatus.RUNNING.value: try: import os os.kill(int(pid), 0) return HPCJobInfo( job_id=job_id, status=HPCJobStatus.RUNNING, start_time=data.get("start_time"), ) except OSError: # Process finished - check exit code redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", mapping={ "status": HPCJobStatus.COMPLETED.value, "end_time": datetime.now(timezone.utc).isoformat(), }) return HPCJobInfo( job_id=job_id, status=HPCJobStatus.COMPLETED, start_time=data.get("start_time"), end_time=datetime.now(timezone.utc).isoformat(), ) status_map = { "running": HPCJobStatus.RUNNING, "completed": HPCJobStatus.COMPLETED, "failed": HPCJobStatus.FAILED, "cancelled": HPCJobStatus.CANCELLED, } return HPCJobInfo( job_id=job_id, status=status_map.get(stored_status, HPCJobStatus.UNKNOWN), start_time=data.get("start_time"), end_time=data.get("end_time"), )
[docs] def get_logs(self, job_id: str) -> str: redis = get_redis_connection() data = redis.hgetall(f"{REDIS_JOB_PREFIX}{job_id}") job_name = data.get("job_name", job_id) return f"Local job {job_name} (see output files)"
[docs] def cancel(self, job_id: str) -> bool: redis = get_redis_connection() data = redis.hgetall(f"{REDIS_JOB_PREFIX}{job_id}") pid = data.get("pid") if pid: try: import os import signal os.kill(int(pid), signal.SIGTERM) redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", mapping={ "status": HPCJobStatus.CANCELLED.value, "end_time": datetime.now(timezone.utc).isoformat(), }) logger.info("local_job_cancelled", job_id=job_id, pid=pid) return True except OSError: pass return False