Source code for ccat_workflow_manager.hpc.base

"""Abstract HPC backend interface."""

from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Optional


[docs] class HPCJobStatus(str, Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" CANCELLED = "cancelled" UNKNOWN = "unknown"
[docs] @dataclass class HPCJobInfo: job_id: str status: HPCJobStatus exit_code: Optional[int] = None start_time: Optional[str] = None end_time: Optional[str] = None node: Optional[str] = None wall_time_seconds: Optional[float] = None cpu_hours: Optional[float] = None peak_memory_gb: Optional[float] = None
[docs] class HPCBackend(ABC): """Abstract interface for HPC job submission and monitoring."""
[docs] @abstractmethod def submit( self, execution_command: str, image_ref: str, sif_path: str, input_dir: str, output_dir: str, workspace_dir: str, manifest_path: str, resource_requirements: dict, environment_variables: dict, job_name: str, ) -> str: """Submit a job to the HPC backend. Returns the job ID.""" raise NotImplementedError
[docs] @abstractmethod def get_status(self, job_id: str) -> HPCJobInfo: """Get the status of a submitted job.""" raise NotImplementedError
[docs] @abstractmethod def get_logs(self, job_id: str) -> str: """Get the logs of a submitted job.""" raise NotImplementedError
[docs] @abstractmethod def cancel(self, job_id: str) -> bool: """Cancel a running job. Returns True if successful.""" raise NotImplementedError