"""Local HPC backend - direct Apptainer execution via subprocess.
For local development, standalone servers, or sites without SLURM/K8s.
Tracks jobs in Redis with synthetic UUID job IDs.
"""
import subprocess
import uuid
from datetime import datetime, timezone
from .base import HPCBackend, HPCJobInfo, HPCJobStatus
from ..utils import get_redis_connection
from ..logging_utils import get_structured_logger
from ..exceptions import HPCSubmissionError
logger = get_structured_logger(__name__)
REDIS_JOB_PREFIX = "wf:local_job:"
[docs]
class LocalBackend(HPCBackend):
"""Run jobs locally via direct apptainer exec."""
[docs]
def submit(
self,
execution_command: str,
image_ref: str,
sif_path: str,
input_dir: str,
output_dir: str,
workspace_dir: str,
manifest_path: str,
resource_requirements: dict,
environment_variables: dict,
job_name: str,
) -> str:
job_id = str(uuid.uuid4())[:8]
redis = get_redis_connection()
# Store job metadata in Redis
redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", mapping={
"status": HPCJobStatus.RUNNING.value,
"command": execution_command,
"job_name": job_name,
"start_time": datetime.now(timezone.utc).isoformat(),
})
redis.expire(f"{REDIS_JOB_PREFIX}{job_id}", 86400 * 7)
try:
result = subprocess.Popen(
execution_command,
shell=True,
stdout=open(f"{output_dir}/{job_name}.out", "w"),
stderr=open(f"{output_dir}/{job_name}.err", "w"),
env={**dict(environment_variables)},
)
redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", "pid", str(result.pid))
logger.info("local_job_submitted", job_id=job_id, pid=result.pid)
return job_id
except Exception as e:
redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", mapping={
"status": HPCJobStatus.FAILED.value,
"error": str(e),
})
raise HPCSubmissionError(f"Local execution failed: {e}")
[docs]
def get_status(self, job_id: str) -> HPCJobInfo:
redis = get_redis_connection()
data = redis.hgetall(f"{REDIS_JOB_PREFIX}{job_id}")
if not data:
return HPCJobInfo(job_id=job_id, status=HPCJobStatus.UNKNOWN)
pid = data.get("pid")
stored_status = data.get("status", "unknown")
# Check if process is still running
if pid and stored_status == HPCJobStatus.RUNNING.value:
try:
import os
os.kill(int(pid), 0)
return HPCJobInfo(
job_id=job_id,
status=HPCJobStatus.RUNNING,
start_time=data.get("start_time"),
)
except OSError:
# Process finished - check exit code
redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", mapping={
"status": HPCJobStatus.COMPLETED.value,
"end_time": datetime.now(timezone.utc).isoformat(),
})
return HPCJobInfo(
job_id=job_id,
status=HPCJobStatus.COMPLETED,
start_time=data.get("start_time"),
end_time=datetime.now(timezone.utc).isoformat(),
)
status_map = {
"running": HPCJobStatus.RUNNING,
"completed": HPCJobStatus.COMPLETED,
"failed": HPCJobStatus.FAILED,
"cancelled": HPCJobStatus.CANCELLED,
}
return HPCJobInfo(
job_id=job_id,
status=status_map.get(stored_status, HPCJobStatus.UNKNOWN),
start_time=data.get("start_time"),
end_time=data.get("end_time"),
)
[docs]
def get_logs(self, job_id: str) -> str:
redis = get_redis_connection()
data = redis.hgetall(f"{REDIS_JOB_PREFIX}{job_id}")
job_name = data.get("job_name", job_id)
return f"Local job {job_name} (see output files)"
[docs]
def cancel(self, job_id: str) -> bool:
redis = get_redis_connection()
data = redis.hgetall(f"{REDIS_JOB_PREFIX}{job_id}")
pid = data.get("pid")
if pid:
try:
import os
import signal
os.kill(int(pid), signal.SIGTERM)
redis.hset(f"{REDIS_JOB_PREFIX}{job_id}", mapping={
"status": HPCJobStatus.CANCELLED.value,
"end_time": datetime.now(timezone.utc).isoformat(),
})
logger.info("local_job_cancelled", job_id=job_id, pid=pid)
return True
except OSError:
pass
return False