Source code for ccat_workflow_manager.managers.workflow_manager

"""Workflow Manager - processes PENDING ExecutedReductionSteps through staging and HPC submission.

Picks up PENDING runs, stages input data, builds execution commands,
writes manifests, and submits to HPC backend.
"""

import json
from datetime import datetime, timezone
from pathlib import Path

from sqlalchemy.orm import Session
from ccat_ops_db import models
from ccat_ops_db.models import RunStatus, Status

from ..database import DatabaseConnection
from ..execution.command_builder import CommandBuilder
from ..hpc import KubernetesBackend, SLURMBackend, LocalBackend
from ..config.config import ccat_workflow_manager_settings
from ..logging_utils import get_structured_logger
from ..exceptions import StagingError

logger = get_structured_logger(__name__)

command_builder = CommandBuilder()


def _get_hpc_backend():
    """Get the configured HPC backend."""
    backend_type = ccat_workflow_manager_settings.HPC_BACKEND
    if backend_type == "kubernetes":
        return KubernetesBackend()
    elif backend_type == "slurm":
        return SLURMBackend()
    elif backend_type == "local":
        return LocalBackend()
    else:
        raise ValueError(f"Unknown HPC backend: {backend_type}")


[docs] def process_pending_runs(): """Main workflow processing loop - called by the CLI poll loop.""" session, _ = DatabaseConnection.get_connection() try: pending_runs = ( session.query(models.ExecutedReductionStep) .filter( models.ExecutedReductionStep.status == RunStatus.PENDING, ) .join(models.ReductionStep) .join(models.Pipeline) .order_by(models.Pipeline.priority.desc()) .limit(ccat_workflow_manager_settings.MAX_CONCURRENT_RUNS) .all() ) for run in pending_runs: try: _process_run(session, run) except Exception as e: logger.error( "workflow_processing_failed", executed_step_id=run.id, error=str(e), ) run.status = RunStatus.FAILED run.failure_error_message = str(e) session.commit() except Exception: session.rollback() raise finally: session.remove()
def _process_run(session: Session, run: models.ExecutedReductionStep): """Process a single PENDING ExecutedReductionStep through to HPC submission.""" step = run.reduction_step pipeline = step.pipeline config = step.reduction_step_config version = run.reduction_software_version software = step.reduction_software # Determine if this is the final step in the pipeline max_order = max(s.step_order for s in pipeline.reduction_steps) is_final_step = step.step_order == max_order # Step 1: Stage data run.status = RunStatus.STAGING_DATA session.commit() staging_job = _stage_data(session, run, pipeline) if staging_job: run.staging_job_id = staging_job.id # Step 2: Build manifest and command input_files = _get_input_file_paths(session, run) input_products = _get_input_product_paths(session, run) # Create directory structure pipeline_base = ccat_workflow_manager_settings.PIPELINE_BASE_DIR base_dir = Path(f"{pipeline_base}/{pipeline.id}/{run.sub_group_key}") for subdir in ["workspace", "output", "config", "logs"]: (base_dir / subdir).mkdir(parents=True, exist_ok=True) # Build and write manifest manifest = command_builder.build_manifest( run=run, step=step, input_files=input_files, input_products=input_products, config=config, ) manifest_path = base_dir / "config" / "manifest.json" manifest_path.write_text(json.dumps(manifest, indent=2)) # Build execution command execution_command = command_builder.build_command( step=step, run=run, version=version, software=software, config=config, input_files=input_files, input_products=input_products, is_final_step=is_final_step, ) run.execution_command = execution_command # Step 3: Submit to HPC hpc_backend = _get_hpc_backend() image_ref = _build_image_ref(software, version) sif_path = command_builder._get_sif_path(software, version) resource_reqs = config.resource_requirements or {} env_vars = config.environment_variables or {} job_name = f"wf-{step.id}-{run.id}" staging_dir = f"/data/staging/wf-{step.id}-{run.id}" job_id = hpc_backend.submit( execution_command=execution_command, image_ref=image_ref, sif_path=sif_path, input_dir=staging_dir, output_dir=str(base_dir / "output"), workspace_dir=str(base_dir / "workspace"), manifest_path=str(manifest_path), resource_requirements=resource_reqs, environment_variables=env_vars, job_name=job_name, ) run.hpc_job_id = job_id run.status = RunStatus.SUBMITTED run.start_time = datetime.now(timezone.utc) session.commit() logger.info( "step_submitted", executed_step_id=run.id, hpc_job_id=job_id, image=image_ref, is_final=is_final_step, ) def _stage_data( session: Session, run: models.ExecutedReductionStep, pipeline: models.Pipeline, ) -> models.StagingJob: """Create a StagingJob for the input data packages.""" if not run.input_packages: return None # Use the pipeline's processing location as staging destination dest_location_id = pipeline.processing_location_id # Find source location from existing physical copies first_package = run.input_packages[0] source_copy = ( session.query(models.RawDataPackagePhysicalCopy) .filter( models.RawDataPackagePhysicalCopy.raw_data_package_id == first_package.id, models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) .first() ) if not source_copy: raise StagingError( f"No physical copy found for package {first_package.id}", operation_id=run.id, ) staging_job = models.StagingJob( origin_data_location_id=source_copy.data_location_id, destination_data_location_id=dest_location_id, status=Status.PENDING, ) session.add(staging_job) session.flush() staging_job.raw_data_packages = list(run.input_packages) session.commit() logger.info( "staging_job_created", staging_job_id=staging_job.id, executed_step_id=run.id, package_count=len(run.input_packages), ) return staging_job def _get_input_file_paths( session: Session, run: models.ExecutedReductionStep ) -> list: """Get file paths for input packages from their physical copies.""" paths = [] for package in run.input_packages: copies = ( session.query(models.RawDataPackagePhysicalCopy) .filter( models.RawDataPackagePhysicalCopy.raw_data_package_id == package.id, ) .all() ) for copy in copies: if hasattr(copy, "full_path") and copy.full_path: paths.append(copy.full_path) return paths def _get_input_product_paths( session: Session, run: models.ExecutedReductionStep ) -> list: """Get paths for upstream intermediate products.""" products = [] for product in run.input_products: # Find the physical copy path copies = ( session.query(models.DataProductPhysicalCopy) .filter( models.DataProductPhysicalCopy.data_product_id == product.id, ) .all() ) for copy in copies: if hasattr(copy, "full_path") and copy.full_path: products.append({ "path": copy.full_path, "data_product_id": product.id, }) break return products def _build_image_ref( software: models.ReductionSoftware, version: models.ReductionSoftwareVersion, ) -> str: """Build the container image reference.""" if version.image_digest: return f"{software.ghcr_image_url}@{version.image_digest}" return f"{software.ghcr_image_url}:{version.version_tag}"