Source code for ccat_workflow_manager.managers.result_manager

"""Result Manager - monitors HPC jobs and collects outputs into DataProducts.

Polls ExecutedReductionSteps in SUBMITTED/RUNNING state, checks HPC job status,
discovers outputs by convention (directory structure), and creates DataProduct records.
"""

import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path

from sqlalchemy.orm import Session
from ccat_ops_db import models
from ccat_ops_db.models import RunStatus, DataProductType

from ..database import DatabaseConnection
from ..hpc import KubernetesBackend, SLURMBackend, LocalBackend, HPCJobStatus
from ..config.config import ccat_workflow_manager_settings
from ..logging_utils import get_structured_logger

logger = get_structured_logger(__name__)

# Convention-based output type mapping from subdirectory names
OUTPUT_DIR_TYPE_MAP = {
    "science": DataProductType.SCIENCE,
    "qa": DataProductType.QA_METRIC,
    "plots": DataProductType.PLOT,
    "statistics": DataProductType.STATISTICS,
    "logs": DataProductType.LOG,
}


def _get_hpc_backend():
    backend_type = ccat_workflow_manager_settings.HPC_BACKEND
    if backend_type == "kubernetes":
        return KubernetesBackend()
    elif backend_type == "slurm":
        return SLURMBackend()
    elif backend_type == "local":
        return LocalBackend()
    else:
        raise ValueError(f"Unknown HPC backend: {backend_type}")


[docs] def process_running_steps(): """Main result collection loop - called by the CLI poll loop.""" session, _ = DatabaseConnection.get_connection() try: active_runs = ( session.query(models.ExecutedReductionStep) .filter( models.ExecutedReductionStep.status.in_([ RunStatus.SUBMITTED, RunStatus.RUNNING, ]) ) .all() ) hpc_backend = _get_hpc_backend() for run in active_runs: try: _check_run(session, run, hpc_backend) except Exception as e: logger.error( "result_check_failed", executed_step_id=run.id, error=str(e), ) session.commit() except Exception: session.rollback() raise finally: session.remove()
def _check_run(session: Session, run: models.ExecutedReductionStep, hpc_backend): """Check a single run's HPC job status and handle state transitions.""" if not run.hpc_job_id: logger.warning("no_hpc_job_id", executed_step_id=run.id) return job_info = hpc_backend.get_status(run.hpc_job_id) if job_info.status == HPCJobStatus.RUNNING: if run.status != RunStatus.RUNNING: run.status = RunStatus.RUNNING logger.info( "step_running", executed_step_id=run.id, hpc_job_id=run.hpc_job_id, ) elif job_info.status == HPCJobStatus.COMPLETED: run.status = RunStatus.COLLECTING_RESULTS session.commit() try: _collect_results(session, run, hpc_backend, job_info) run.status = RunStatus.COMPLETED run.end_time = datetime.now(timezone.utc) # Store job metrics if job_info.wall_time_seconds is not None: run.processing_time_s = job_info.wall_time_seconds if job_info.peak_memory_gb is not None: run.peak_memory_gb = job_info.peak_memory_gb if job_info.cpu_hours is not None: run.cpu_hours = job_info.cpu_hours logger.info( "step_completed", executed_step_id=run.id, ) except Exception as e: logger.error( "result_collection_failed", executed_step_id=run.id, error=str(e), ) run.status = RunStatus.FAILED run.failure_error_message = f"Result collection failed: {e}" run.end_time = datetime.now(timezone.utc) elif job_info.status == HPCJobStatus.FAILED: _handle_failure(session, run, hpc_backend, job_info) elif job_info.status == HPCJobStatus.CANCELLED: run.status = RunStatus.CANCELLED run.end_time = datetime.now(timezone.utc) logger.info("step_cancelled", executed_step_id=run.id) def _collect_results( session: Session, run: models.ExecutedReductionStep, hpc_backend, job_info, ): """Scan output directory and create DataProduct records. Uses convention-based output discovery: - Final step: scan output/{subdir}/ — type from subdirectory name - Intermediate step: scan workspace/{step_name}/ — all INTERMEDIATE """ step = run.reduction_step pipeline = step.pipeline pipeline_base = ccat_workflow_manager_settings.PIPELINE_BASE_DIR base_dir = Path(f"{pipeline_base}/{pipeline.id}/{run.sub_group_key}") # Determine if this is the final step max_order = max(s.step_order for s in pipeline.reduction_steps) is_final_step = step.step_order == max_order # Store job logs try: logs = hpc_backend.get_logs(run.hpc_job_id) _create_step_log(session, run, logs) except Exception as e: logger.warning("log_collection_failed", error=str(e)) if is_final_step: _collect_final_outputs(session, run, base_dir / "output") else: _collect_workspace_outputs(session, run, base_dir / "workspace" / step.name) logger.info( "results_collected", executed_step_id=run.id, is_final=is_final_step, ) def _collect_final_outputs( session: Session, run: models.ExecutedReductionStep, output_dir: Path ): """Collect outputs from the final step's output directory.""" if not output_dir.exists(): logger.warning( "output_dir_missing", executed_step_id=run.id, output_dir=str(output_dir), ) return for file_path in output_dir.rglob("*"): if not file_path.is_file(): continue if file_path.name.endswith(".metadata.json"): continue # Determine type from parent subdirectory relative = file_path.relative_to(output_dir) parts = relative.parts if len(parts) > 1: subdir = parts[0] product_type = OUTPUT_DIR_TYPE_MAP.get(subdir, DataProductType.SCIENCE) else: product_type = DataProductType.SCIENCE metadata = _load_sidecar_metadata(file_path) _create_data_product(session, run, file_path, output_dir, product_type, metadata) def _collect_workspace_outputs( session: Session, run: models.ExecutedReductionStep, workspace_dir: Path ): """Collect outputs from an intermediate step's workspace directory.""" if not workspace_dir.exists(): logger.warning( "workspace_dir_missing", executed_step_id=run.id, workspace_dir=str(workspace_dir), ) return for file_path in workspace_dir.rglob("*"): if not file_path.is_file(): continue if file_path.name.endswith(".metadata.json"): continue metadata = _load_sidecar_metadata(file_path) _create_data_product( session, run, file_path, workspace_dir, DataProductType.INTERMEDIATE, metadata, ) def _load_sidecar_metadata(file_path: Path) -> dict: """Load .metadata.json sidecar file if it exists.""" sidecar = file_path.parent / f"{file_path.name}.metadata.json" if sidecar.exists(): try: return json.loads(sidecar.read_text()) except (json.JSONDecodeError, OSError): pass return {} def _create_data_product( session: Session, run: models.ExecutedReductionStep, file_path: Path, base_dir: Path, product_type: DataProductType, metadata: dict, ): """Create a DataProduct record for a discovered file.""" relative_path = str(file_path.relative_to(base_dir)) checksum = _compute_checksum(file_path) step = run.reduction_step data_product = models.DataProduct( name=file_path.name, relative_path=relative_path, size=file_path.stat().st_size, checksum=checksum, file_type=file_path.suffix.lstrip("."), product_type=product_type, executed_reduction_step_id=run.id, data_grouping_id=step.pipeline.data_grouping_id, metadata_=metadata if metadata else None, ) session.add(data_product) session.flush() # Link provenance to input raw data data_product.raw_data_lineage = list(run.input_packages) def _handle_failure( session: Session, run: models.ExecutedReductionStep, hpc_backend, job_info, ): """Handle a failed HPC job - retry or mark as permanently failed.""" try: logs = hpc_backend.get_logs(run.hpc_job_id) _create_step_log(session, run, logs) except Exception: logs = "Failed to retrieve logs" step = run.reduction_step max_retries = step.max_retries if run.retry_count < max_retries: run.retry_count += 1 run.status = RunStatus.PENDING run.hpc_job_id = None run.failure_error_message = f"Retry {run.retry_count}/{max_retries}: {logs[:500]}" logger.info( "step_retry", executed_step_id=run.id, retry_count=run.retry_count, ) else: run.status = RunStatus.FAILED run.end_time = datetime.now(timezone.utc) run.failure_error_message = f"Max retries exceeded. Last error: {logs[:500]}" logger.error( "step_permanently_failed", executed_step_id=run.id, ) def _create_step_log( session: Session, run: models.ExecutedReductionStep, log_text: str ): """Create an ExecutedReductionStepLog entry.""" log_entry = models.ExecutedReductionStepLog( executed_reduction_step_id=run.id, log=log_text, timestamp=datetime.now(timezone.utc), ) session.add(log_entry) def _compute_checksum(file_path: Path) -> str: """Compute SHA256 checksum of a file.""" sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(8192), b""): sha256.update(chunk) return sha256.hexdigest()