"""Workflow Manager - processes PENDING ExecutedReductionSteps through staging and HPC submission.
Picks up PENDING runs, stages input data, builds execution commands,
writes manifests, and submits to HPC backend.
"""
import json
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy.orm import Session
from ccat_ops_db import models
from ccat_ops_db.models import RunStatus, Status
from ..database import DatabaseConnection
from ..execution.command_builder import CommandBuilder
from ..hpc import KubernetesBackend, SLURMBackend, LocalBackend
from ..config.config import ccat_workflow_manager_settings
from ..logging_utils import get_structured_logger
from ..exceptions import StagingError
logger = get_structured_logger(__name__)
command_builder = CommandBuilder()
def _get_hpc_backend():
"""Get the configured HPC backend."""
backend_type = ccat_workflow_manager_settings.HPC_BACKEND
if backend_type == "kubernetes":
return KubernetesBackend()
elif backend_type == "slurm":
return SLURMBackend()
elif backend_type == "local":
return LocalBackend()
else:
raise ValueError(f"Unknown HPC backend: {backend_type}")
[docs]
def process_pending_runs():
"""Main workflow processing loop - called by the CLI poll loop."""
session, _ = DatabaseConnection.get_connection()
try:
pending_runs = (
session.query(models.ExecutedReductionStep)
.filter(
models.ExecutedReductionStep.status == RunStatus.PENDING,
)
.join(models.ReductionStep)
.join(models.Pipeline)
.order_by(models.Pipeline.priority.desc())
.limit(ccat_workflow_manager_settings.MAX_CONCURRENT_RUNS)
.all()
)
for run in pending_runs:
try:
_process_run(session, run)
except Exception as e:
logger.error(
"workflow_processing_failed",
executed_step_id=run.id,
error=str(e),
)
run.status = RunStatus.FAILED
run.failure_error_message = str(e)
session.commit()
except Exception:
session.rollback()
raise
finally:
session.remove()
def _process_run(session: Session, run: models.ExecutedReductionStep):
"""Process a single PENDING ExecutedReductionStep through to HPC submission."""
step = run.reduction_step
pipeline = step.pipeline
config = step.reduction_step_config
version = run.reduction_software_version
software = step.reduction_software
# Determine if this is the final step in the pipeline
max_order = max(s.step_order for s in pipeline.reduction_steps)
is_final_step = step.step_order == max_order
# Step 1: Stage data
run.status = RunStatus.STAGING_DATA
session.commit()
staging_job = _stage_data(session, run, pipeline)
if staging_job:
run.staging_job_id = staging_job.id
# Step 2: Build manifest and command
input_files = _get_input_file_paths(session, run)
input_products = _get_input_product_paths(session, run)
# Create directory structure
pipeline_base = ccat_workflow_manager_settings.PIPELINE_BASE_DIR
base_dir = Path(f"{pipeline_base}/{pipeline.id}/{run.sub_group_key}")
for subdir in ["workspace", "output", "config", "logs"]:
(base_dir / subdir).mkdir(parents=True, exist_ok=True)
# Build and write manifest
manifest = command_builder.build_manifest(
run=run,
step=step,
input_files=input_files,
input_products=input_products,
config=config,
)
manifest_path = base_dir / "config" / "manifest.json"
manifest_path.write_text(json.dumps(manifest, indent=2))
# Build execution command
execution_command = command_builder.build_command(
step=step,
run=run,
version=version,
software=software,
config=config,
input_files=input_files,
input_products=input_products,
is_final_step=is_final_step,
)
run.execution_command = execution_command
# Step 3: Submit to HPC
hpc_backend = _get_hpc_backend()
image_ref = _build_image_ref(software, version)
sif_path = command_builder._get_sif_path(software, version)
resource_reqs = config.resource_requirements or {}
env_vars = config.environment_variables or {}
job_name = f"wf-{step.id}-{run.id}"
staging_dir = f"/data/staging/wf-{step.id}-{run.id}"
job_id = hpc_backend.submit(
execution_command=execution_command,
image_ref=image_ref,
sif_path=sif_path,
input_dir=staging_dir,
output_dir=str(base_dir / "output"),
workspace_dir=str(base_dir / "workspace"),
manifest_path=str(manifest_path),
resource_requirements=resource_reqs,
environment_variables=env_vars,
job_name=job_name,
)
run.hpc_job_id = job_id
run.status = RunStatus.SUBMITTED
run.start_time = datetime.now(timezone.utc)
session.commit()
logger.info(
"step_submitted",
executed_step_id=run.id,
hpc_job_id=job_id,
image=image_ref,
is_final=is_final_step,
)
def _stage_data(
session: Session,
run: models.ExecutedReductionStep,
pipeline: models.Pipeline,
) -> models.StagingJob:
"""Create a StagingJob for the input data packages."""
if not run.input_packages:
return None
# Use the pipeline's processing location as staging destination
dest_location_id = pipeline.processing_location_id
# Find source location from existing physical copies
first_package = run.input_packages[0]
source_copy = (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
models.RawDataPackagePhysicalCopy.raw_data_package_id == first_package.id,
models.RawDataPackagePhysicalCopy.status
== models.PhysicalCopyStatus.PRESENT,
)
.first()
)
if not source_copy:
raise StagingError(
f"No physical copy found for package {first_package.id}",
operation_id=run.id,
)
staging_job = models.StagingJob(
origin_data_location_id=source_copy.data_location_id,
destination_data_location_id=dest_location_id,
status=Status.PENDING,
)
session.add(staging_job)
session.flush()
staging_job.raw_data_packages = list(run.input_packages)
session.commit()
logger.info(
"staging_job_created",
staging_job_id=staging_job.id,
executed_step_id=run.id,
package_count=len(run.input_packages),
)
return staging_job
def _get_input_file_paths(
session: Session, run: models.ExecutedReductionStep
) -> list:
"""Get file paths for input packages from their physical copies."""
paths = []
for package in run.input_packages:
copies = (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
models.RawDataPackagePhysicalCopy.raw_data_package_id == package.id,
)
.all()
)
for copy in copies:
if hasattr(copy, "full_path") and copy.full_path:
paths.append(copy.full_path)
return paths
def _get_input_product_paths(
session: Session, run: models.ExecutedReductionStep
) -> list:
"""Get paths for upstream intermediate products."""
products = []
for product in run.input_products:
# Find the physical copy path
copies = (
session.query(models.DataProductPhysicalCopy)
.filter(
models.DataProductPhysicalCopy.data_product_id == product.id,
)
.all()
)
for copy in copies:
if hasattr(copy, "full_path") and copy.full_path:
products.append({
"path": copy.full_path,
"data_product_id": product.id,
})
break
return products
def _build_image_ref(
software: models.ReductionSoftware,
version: models.ReductionSoftwareVersion,
) -> str:
"""Build the container image reference."""
if version.image_digest:
return f"{software.ghcr_image_url}@{version.image_digest}"
return f"{software.ghcr_image_url}:{version.version_tag}"