Source code for ccat_workflow_manager.hpc.base
"""Abstract HPC backend interface."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Optional
[docs]
class HPCJobStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
UNKNOWN = "unknown"
[docs]
@dataclass
class HPCJobInfo:
job_id: str
status: HPCJobStatus
exit_code: Optional[int] = None
start_time: Optional[str] = None
end_time: Optional[str] = None
node: Optional[str] = None
wall_time_seconds: Optional[float] = None
cpu_hours: Optional[float] = None
peak_memory_gb: Optional[float] = None
[docs]
class HPCBackend(ABC):
"""Abstract interface for HPC job submission and monitoring."""
[docs]
@abstractmethod
def submit(
self,
execution_command: str,
image_ref: str,
sif_path: str,
input_dir: str,
output_dir: str,
workspace_dir: str,
manifest_path: str,
resource_requirements: dict,
environment_variables: dict,
job_name: str,
) -> str:
"""Submit a job to the HPC backend. Returns the job ID."""
raise NotImplementedError
[docs]
@abstractmethod
def get_status(self, job_id: str) -> HPCJobInfo:
"""Get the status of a submitted job."""
raise NotImplementedError
[docs]
@abstractmethod
def get_logs(self, job_id: str) -> str:
"""Get the logs of a submitted job."""
raise NotImplementedError
[docs]
@abstractmethod
def cancel(self, job_id: str) -> bool:
"""Cancel a running job. Returns True if successful."""
raise NotImplementedError