Source code for ccat_workflow_manager.execution.command_builder

"""Command builder for direct Apptainer execution.

Replaces CWL generation. Builds the full `apptainer exec` command string
and the input manifest for container consumption.
"""

from typing import List

from ..config.config import ccat_workflow_manager_settings
from ..logging_utils import get_structured_logger

logger = get_structured_logger(__name__)


[docs] class CommandBuilder: """Builds Apptainer execution commands and input manifests."""
[docs] def build_command( self, step, run, version, software, config, input_files: List[str], input_products: List[dict], is_final_step: bool, ) -> str: """Build the full apptainer exec command string. Args: step: ReductionStep model instance run: ExecutedReductionStep model instance version: ReductionSoftwareVersion model instance software: ReductionSoftware model instance config: ReductionStepConfig model instance input_files: List of staged input file paths input_products: List of upstream intermediate product dicts is_final_step: Whether this is the final step in the pipeline """ pipeline = step.pipeline pipeline_base = ccat_workflow_manager_settings.PIPELINE_BASE_DIR base_dir = f"{pipeline_base}/{pipeline.id}/{run.sub_group_key}" # Determine SIF path sif_path = self._get_sif_path(software, version) # Determine output dir inside the container if is_final_step: workflow_output_dir = "/workflow/output" else: workflow_output_dir = f"/workflow/workspace/{step.name}" # Build bind mounts staging_dir = f"/data/staging/wf-{step.id}-{run.id}" binds = [ f"{staging_dir}:/workflow/input:ro", f"{base_dir}/workspace:/workflow/workspace", f"{base_dir}/output:/workflow/output", f"{base_dir}/config:/workflow/config:ro", f"{base_dir}/logs:/workflow/logs", ] bind_args = " ".join(f"--bind {b}" for b in binds) # Build environment variables env_vars = config.environment_variables or {} env_vars["WORKFLOW_OUTPUT_DIR"] = workflow_output_dir env_args = " ".join( f"--env {k}={v}" for k, v in env_vars.items() ) # Get command from config command = config.command_template or "/pipeline/run.sh" full_command = ( f"apptainer exec {bind_args} {env_args} {sif_path} {command}" ) logger.info( "command_built", run_id=run.id, step_name=step.name, is_final=is_final_step, ) return full_command
[docs] def build_manifest( self, run, step, input_files: List[str], input_products: List[dict], config, ) -> dict: """Build the manifest.json structure for the container. The manifest is written to /workflow/config/manifest.json and provides the container with all context about its execution. """ config_params = config.config_parameters or {} env_vars = config.environment_variables or {} manifest = { "run_id": run.id, "step_name": step.name, "sub_group_key": run.sub_group_key, "input_files": [ {"path": f"/workflow/input/{f.split('/')[-1]}", "role": "science"} for f in input_files ], "input_products": input_products, "config": config_params, "environment": env_vars, } return manifest
def _get_sif_path(self, software, version) -> str: """Build the SIF file path from software and version.""" cache_dir = ccat_workflow_manager_settings.APPTAINER_CACHE_DIR image_url = software.ghcr_image_url or "" safe_name = ( image_url.replace("/", "_").replace(":", "_").replace("@", "_") ) tag = version.version_tag return f"{cache_dir}/{safe_name}_{tag}.sif"