"""Result Manager - monitors HPC jobs and collects outputs into DataProducts.
Polls ExecutedReductionSteps in SUBMITTED/RUNNING state, checks HPC job status,
discovers outputs by convention (directory structure), and creates DataProduct records.
"""
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy.orm import Session
from ccat_ops_db import models
from ccat_ops_db.models import RunStatus, DataProductType
from ..database import DatabaseConnection
from ..hpc import KubernetesBackend, SLURMBackend, LocalBackend, HPCJobStatus
from ..config.config import ccat_workflow_manager_settings
from ..logging_utils import get_structured_logger
logger = get_structured_logger(__name__)
# Convention-based output type mapping from subdirectory names
OUTPUT_DIR_TYPE_MAP = {
"science": DataProductType.SCIENCE,
"qa": DataProductType.QA_METRIC,
"plots": DataProductType.PLOT,
"statistics": DataProductType.STATISTICS,
"logs": DataProductType.LOG,
}
def _get_hpc_backend():
backend_type = ccat_workflow_manager_settings.HPC_BACKEND
if backend_type == "kubernetes":
return KubernetesBackend()
elif backend_type == "slurm":
return SLURMBackend()
elif backend_type == "local":
return LocalBackend()
else:
raise ValueError(f"Unknown HPC backend: {backend_type}")
[docs]
def process_running_steps():
"""Main result collection loop - called by the CLI poll loop."""
session, _ = DatabaseConnection.get_connection()
try:
active_runs = (
session.query(models.ExecutedReductionStep)
.filter(
models.ExecutedReductionStep.status.in_([
RunStatus.SUBMITTED,
RunStatus.RUNNING,
])
)
.all()
)
hpc_backend = _get_hpc_backend()
for run in active_runs:
try:
_check_run(session, run, hpc_backend)
except Exception as e:
logger.error(
"result_check_failed",
executed_step_id=run.id,
error=str(e),
)
session.commit()
except Exception:
session.rollback()
raise
finally:
session.remove()
def _check_run(session: Session, run: models.ExecutedReductionStep, hpc_backend):
"""Check a single run's HPC job status and handle state transitions."""
if not run.hpc_job_id:
logger.warning("no_hpc_job_id", executed_step_id=run.id)
return
job_info = hpc_backend.get_status(run.hpc_job_id)
if job_info.status == HPCJobStatus.RUNNING:
if run.status != RunStatus.RUNNING:
run.status = RunStatus.RUNNING
logger.info(
"step_running",
executed_step_id=run.id,
hpc_job_id=run.hpc_job_id,
)
elif job_info.status == HPCJobStatus.COMPLETED:
run.status = RunStatus.COLLECTING_RESULTS
session.commit()
try:
_collect_results(session, run, hpc_backend, job_info)
run.status = RunStatus.COMPLETED
run.end_time = datetime.now(timezone.utc)
# Store job metrics
if job_info.wall_time_seconds is not None:
run.processing_time_s = job_info.wall_time_seconds
if job_info.peak_memory_gb is not None:
run.peak_memory_gb = job_info.peak_memory_gb
if job_info.cpu_hours is not None:
run.cpu_hours = job_info.cpu_hours
logger.info(
"step_completed",
executed_step_id=run.id,
)
except Exception as e:
logger.error(
"result_collection_failed",
executed_step_id=run.id,
error=str(e),
)
run.status = RunStatus.FAILED
run.failure_error_message = f"Result collection failed: {e}"
run.end_time = datetime.now(timezone.utc)
elif job_info.status == HPCJobStatus.FAILED:
_handle_failure(session, run, hpc_backend, job_info)
elif job_info.status == HPCJobStatus.CANCELLED:
run.status = RunStatus.CANCELLED
run.end_time = datetime.now(timezone.utc)
logger.info("step_cancelled", executed_step_id=run.id)
def _collect_results(
session: Session,
run: models.ExecutedReductionStep,
hpc_backend,
job_info,
):
"""Scan output directory and create DataProduct records.
Uses convention-based output discovery:
- Final step: scan output/{subdir}/ — type from subdirectory name
- Intermediate step: scan workspace/{step_name}/ — all INTERMEDIATE
"""
step = run.reduction_step
pipeline = step.pipeline
pipeline_base = ccat_workflow_manager_settings.PIPELINE_BASE_DIR
base_dir = Path(f"{pipeline_base}/{pipeline.id}/{run.sub_group_key}")
# Determine if this is the final step
max_order = max(s.step_order for s in pipeline.reduction_steps)
is_final_step = step.step_order == max_order
# Store job logs
try:
logs = hpc_backend.get_logs(run.hpc_job_id)
_create_step_log(session, run, logs)
except Exception as e:
logger.warning("log_collection_failed", error=str(e))
if is_final_step:
_collect_final_outputs(session, run, base_dir / "output")
else:
_collect_workspace_outputs(session, run, base_dir / "workspace" / step.name)
logger.info(
"results_collected",
executed_step_id=run.id,
is_final=is_final_step,
)
def _collect_final_outputs(
session: Session, run: models.ExecutedReductionStep, output_dir: Path
):
"""Collect outputs from the final step's output directory."""
if not output_dir.exists():
logger.warning(
"output_dir_missing",
executed_step_id=run.id,
output_dir=str(output_dir),
)
return
for file_path in output_dir.rglob("*"):
if not file_path.is_file():
continue
if file_path.name.endswith(".metadata.json"):
continue
# Determine type from parent subdirectory
relative = file_path.relative_to(output_dir)
parts = relative.parts
if len(parts) > 1:
subdir = parts[0]
product_type = OUTPUT_DIR_TYPE_MAP.get(subdir, DataProductType.SCIENCE)
else:
product_type = DataProductType.SCIENCE
metadata = _load_sidecar_metadata(file_path)
_create_data_product(session, run, file_path, output_dir, product_type, metadata)
def _collect_workspace_outputs(
session: Session, run: models.ExecutedReductionStep, workspace_dir: Path
):
"""Collect outputs from an intermediate step's workspace directory."""
if not workspace_dir.exists():
logger.warning(
"workspace_dir_missing",
executed_step_id=run.id,
workspace_dir=str(workspace_dir),
)
return
for file_path in workspace_dir.rglob("*"):
if not file_path.is_file():
continue
if file_path.name.endswith(".metadata.json"):
continue
metadata = _load_sidecar_metadata(file_path)
_create_data_product(
session, run, file_path, workspace_dir,
DataProductType.INTERMEDIATE, metadata,
)
def _load_sidecar_metadata(file_path: Path) -> dict:
"""Load .metadata.json sidecar file if it exists."""
sidecar = file_path.parent / f"{file_path.name}.metadata.json"
if sidecar.exists():
try:
return json.loads(sidecar.read_text())
except (json.JSONDecodeError, OSError):
pass
return {}
def _create_data_product(
session: Session,
run: models.ExecutedReductionStep,
file_path: Path,
base_dir: Path,
product_type: DataProductType,
metadata: dict,
):
"""Create a DataProduct record for a discovered file."""
relative_path = str(file_path.relative_to(base_dir))
checksum = _compute_checksum(file_path)
step = run.reduction_step
data_product = models.DataProduct(
name=file_path.name,
relative_path=relative_path,
size=file_path.stat().st_size,
checksum=checksum,
file_type=file_path.suffix.lstrip("."),
product_type=product_type,
executed_reduction_step_id=run.id,
data_grouping_id=step.pipeline.data_grouping_id,
metadata_=metadata if metadata else None,
)
session.add(data_product)
session.flush()
# Link provenance to input raw data
data_product.raw_data_lineage = list(run.input_packages)
def _handle_failure(
session: Session,
run: models.ExecutedReductionStep,
hpc_backend,
job_info,
):
"""Handle a failed HPC job - retry or mark as permanently failed."""
try:
logs = hpc_backend.get_logs(run.hpc_job_id)
_create_step_log(session, run, logs)
except Exception:
logs = "Failed to retrieve logs"
step = run.reduction_step
max_retries = step.max_retries
if run.retry_count < max_retries:
run.retry_count += 1
run.status = RunStatus.PENDING
run.hpc_job_id = None
run.failure_error_message = f"Retry {run.retry_count}/{max_retries}: {logs[:500]}"
logger.info(
"step_retry",
executed_step_id=run.id,
retry_count=run.retry_count,
)
else:
run.status = RunStatus.FAILED
run.end_time = datetime.now(timezone.utc)
run.failure_error_message = f"Max retries exceeded. Last error: {logs[:500]}"
logger.error(
"step_permanently_failed",
executed_step_id=run.id,
)
def _create_step_log(
session: Session, run: models.ExecutedReductionStep, log_text: str
):
"""Create an ExecutedReductionStepLog entry."""
log_entry = models.ExecutedReductionStepLog(
executed_reduction_step_id=run.id,
log=log_text,
timestamp=datetime.now(timezone.utc),
)
session.add(log_entry)
def _compute_checksum(file_path: Path) -> str:
"""Compute SHA256 checksum of a file."""
sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return sha256.hexdigest()