HPC Backends#

Abstract base class and backend implementations for HPC job management.

Base#

Abstract HPC backend interface.

class ccat_workflow_manager.hpc.base.HPCJobStatus(value)[source]#

Bases: str, Enum

An enumeration.

PENDING = 'pending'#
RUNNING = 'running'#
COMPLETED = 'completed'#
FAILED = 'failed'#
CANCELLED = 'cancelled'#
UNKNOWN = 'unknown'#
class ccat_workflow_manager.hpc.base.HPCJobInfo(job_id: str, status: ccat_workflow_manager.hpc.base.HPCJobStatus, exit_code: int | None = None, start_time: str | None = None, end_time: str | None = None, node: str | None = None, wall_time_seconds: float | None = None, cpu_hours: float | None = None, peak_memory_gb: float | None = None)[source]#

Bases: object

job_id: str#
status: HPCJobStatus#
exit_code: int | None = None#
start_time: str | None = None#
end_time: str | None = None#
node: str | None = None#
wall_time_seconds: float | None = None#
cpu_hours: float | None = None#
peak_memory_gb: float | None = None#
__init__(job_id: str, status: HPCJobStatus, exit_code: int | None = None, start_time: str | None = None, end_time: str | None = None, node: str | None = None, wall_time_seconds: float | None = None, cpu_hours: float | None = None, peak_memory_gb: float | None = None) None#
class ccat_workflow_manager.hpc.base.HPCBackend[source]#

Bases: ABC

Abstract interface for HPC job submission and monitoring.

abstract submit(execution_command: str, image_ref: str, sif_path: str, input_dir: str, output_dir: str, workspace_dir: str, manifest_path: str, resource_requirements: dict, environment_variables: dict, job_name: str) str[source]#

Submit a job to the HPC backend. Returns the job ID.

abstract get_status(job_id: str) HPCJobInfo[source]#

Get the status of a submitted job.

abstract get_logs(job_id: str) str[source]#

Get the logs of a submitted job.

abstract cancel(job_id: str) bool[source]#

Cancel a running job. Returns True if successful.

Kubernetes#

Kubernetes HPC backend - submits jobs as K8s Jobs.

class ccat_workflow_manager.hpc.kubernetes.KubernetesBackend[source]#

Bases: HPCBackend

Submit and monitor jobs on a Kubernetes cluster.

__init__()[source]#
property client#
property core_client#
submit(execution_command: str, image_ref: str, sif_path: str, input_dir: str, output_dir: str, workspace_dir: str, manifest_path: str, resource_requirements: dict, environment_variables: dict, job_name: str) str[source]#

Submit a job to the HPC backend. Returns the job ID.

get_status(job_id: str) HPCJobInfo[source]#

Get the status of a submitted job.

get_logs(job_id: str) str[source]#

Get the logs of a submitted job.

cancel(job_id: str) bool[source]#

Cancel a running job. Returns True if successful.

SLURM#

SLURM HPC backend - submits jobs via sbatch.

class ccat_workflow_manager.hpc.slurm.SLURMBackend[source]#

Bases: HPCBackend

Submit and monitor jobs on a SLURM cluster.

__init__()[source]#
submit(execution_command: str, image_ref: str, sif_path: str, input_dir: str, output_dir: str, workspace_dir: str, manifest_path: str, resource_requirements: dict, environment_variables: dict, job_name: str) str[source]#

Submit a job to the HPC backend. Returns the job ID.

get_status(job_id: str) HPCJobInfo[source]#

Get the status of a submitted job.

get_logs(job_id: str) str[source]#

Get the logs of a submitted job.

cancel(job_id: str) bool[source]#

Cancel a running job. Returns True if successful.

Local#

Local HPC backend - direct Apptainer execution via subprocess.

For local development, standalone servers, or sites without SLURM/K8s. Tracks jobs in Redis with synthetic UUID job IDs.

class ccat_workflow_manager.hpc.local.LocalBackend[source]#

Bases: HPCBackend

Run jobs locally via direct apptainer exec.

submit(execution_command: str, image_ref: str, sif_path: str, input_dir: str, output_dir: str, workspace_dir: str, manifest_path: str, resource_requirements: dict, environment_variables: dict, job_name: str) str[source]#

Submit a job to the HPC backend. Returns the job ID.

get_status(job_id: str) HPCJobInfo[source]#

Get the status of a submitted job.

get_logs(job_id: str) str[source]#

Get the logs of a submitted job.

cancel(job_id: str) bool[source]#

Cancel a running job. Returns True if successful.