Source code for ccat_workflow_manager.execution.command_builder
"""Command builder for direct Apptainer execution.
Replaces CWL generation. Builds the full `apptainer exec` command string
and the input manifest for container consumption.
"""
from typing import List
from ..config.config import ccat_workflow_manager_settings
from ..logging_utils import get_structured_logger
logger = get_structured_logger(__name__)
[docs]
class CommandBuilder:
"""Builds Apptainer execution commands and input manifests."""
[docs]
def build_command(
self,
step,
run,
version,
software,
config,
input_files: List[str],
input_products: List[dict],
is_final_step: bool,
) -> str:
"""Build the full apptainer exec command string.
Args:
step: ReductionStep model instance
run: ExecutedReductionStep model instance
version: ReductionSoftwareVersion model instance
software: ReductionSoftware model instance
config: ReductionStepConfig model instance
input_files: List of staged input file paths
input_products: List of upstream intermediate product dicts
is_final_step: Whether this is the final step in the pipeline
"""
pipeline = step.pipeline
pipeline_base = ccat_workflow_manager_settings.PIPELINE_BASE_DIR
base_dir = f"{pipeline_base}/{pipeline.id}/{run.sub_group_key}"
# Determine SIF path
sif_path = self._get_sif_path(software, version)
# Determine output dir inside the container
if is_final_step:
workflow_output_dir = "/workflow/output"
else:
workflow_output_dir = f"/workflow/workspace/{step.name}"
# Build bind mounts
staging_dir = f"/data/staging/wf-{step.id}-{run.id}"
binds = [
f"{staging_dir}:/workflow/input:ro",
f"{base_dir}/workspace:/workflow/workspace",
f"{base_dir}/output:/workflow/output",
f"{base_dir}/config:/workflow/config:ro",
f"{base_dir}/logs:/workflow/logs",
]
bind_args = " ".join(f"--bind {b}" for b in binds)
# Build environment variables
env_vars = config.environment_variables or {}
env_vars["WORKFLOW_OUTPUT_DIR"] = workflow_output_dir
env_args = " ".join(
f"--env {k}={v}" for k, v in env_vars.items()
)
# Get command from config
command = config.command_template or "/pipeline/run.sh"
full_command = (
f"apptainer exec {bind_args} {env_args} {sif_path} {command}"
)
logger.info(
"command_built",
run_id=run.id,
step_name=step.name,
is_final=is_final_step,
)
return full_command
[docs]
def build_manifest(
self,
run,
step,
input_files: List[str],
input_products: List[dict],
config,
) -> dict:
"""Build the manifest.json structure for the container.
The manifest is written to /workflow/config/manifest.json and
provides the container with all context about its execution.
"""
config_params = config.config_parameters or {}
env_vars = config.environment_variables or {}
manifest = {
"run_id": run.id,
"step_name": step.name,
"sub_group_key": run.sub_group_key,
"input_files": [
{"path": f"/workflow/input/{f.split('/')[-1]}", "role": "science"}
for f in input_files
],
"input_products": input_products,
"config": config_params,
"environment": env_vars,
}
return manifest
def _get_sif_path(self, software, version) -> str:
"""Build the SIF file path from software and version."""
cache_dir = ccat_workflow_manager_settings.APPTAINER_CACHE_DIR
image_url = software.ghcr_image_url or ""
safe_name = (
image_url.replace("/", "_").replace(":", "_").replace("@", "_")
)
tag = version.version_tag
return f"{cache_dir}/{safe_name}_{tag}.sif"