# Pipeline Hierarchy ```{eval-rst} .. verified:: 2026-03-07 :reviewer: Christof Buchbender ``` The Workflow Manager uses a hierarchical model to describe science reduction workflows. Understanding this hierarchy is essential for creating and managing pipelines. ## Model Hierarchy ``` Pipeline (DAG container — "what, where, when") ├── DataGrouping (shared — "what data", filter_rules) ├── processing_location_id (FK → DataLocation — where to run) ├── trigger_type (CONTINUOUS, CRON, MANUAL) ├── enabled, priority │ ├── ReductionStep 1 (calibrate) │ ├── group_by ([executed_obs_unit] — per scan) │ ├── cooldown_seconds (60 — fire quickly) │ ├── ReductionSoftware (chai-calibration) │ ├── ReductionStepConfig ({calibration_mode: "hot_cold"}) │ └── version_policy ("compatible") │ ├── ReductionStep 2 (baseline, depends_on: Step 1) │ ├── group_by ([executed_obs_unit] — per scan) │ ├── cooldown_seconds (60) │ └── ReductionSoftware (chai-baseline) │ └── ReductionStep 3 (grid+map, depends_on: Step 2) ├── group_by ([] — all data combined) ├── cooldown_seconds (86400 — daily) └── ReductionSoftware (chai-gridding) ``` ## Pipeline The top-level object. A Pipeline is a DAG of ReductionSteps that share a common DataGrouping and processing location. **Key attributes:** - `data_grouping_id` --- what data to process (FK → DataGrouping) - `processing_location_id` --- where to run (FK → DataLocation of type PROCESSING) - `trigger_type` --- when to trigger: CONTINUOUS, CRON, or MANUAL - `trigger_config` --- JSON for trigger-specific settings (e.g., cron expression) - `enabled` --- whether the trigger-manager evaluates this pipeline - `priority` --- scheduling priority (higher = more important) - `created_by_user_id` --- authority tracking **Database model**: {py:class}`ccat_ops_db.models.Pipeline` ## ReductionStep One execution unit within a Pipeline. Each step runs a piece of reduction software against a subset of the data, determined by the `group_by` dimensions. **Key attributes:** - `pipeline_id` --- belongs to a Pipeline - `step_order` --- ordering within the DAG - `reduction_software_id` --- which container to run - `reduction_step_config_id` --- configuration parameters - `group_by` --- JSON list of dimensions for sub-grouping - `cooldown_seconds` --- minimum wait after latest input change before firing - `schedule` --- optional per-step cron expression - `max_concurrent_runs` --- parallelism control - `pinned_version_id` --- optional: pin to a specific software version - `version_policy` --- `"compatible"` (use latest) or `"exact"` (use pinned) - `max_retries` --- how many times to retry on failure **Database model**: {py:class}`ccat_ops_db.models.ReductionStep` ### Step Dependencies Steps within a Pipeline form a DAG via the `reduction_step_dependency` association table. Each step can declare upstream dependencies. The trigger-manager enforces that downstream steps only fire after upstream steps complete for the same sub-group scope. ## ReductionSoftware A registered container image in GHCR. Represents a piece of reduction software as a black box --- could be a single tool or an entire reduction pipeline internally. **Key attributes:** - `name` --- human-readable name (e.g., "chai-calibration") - `ghcr_url` --- GitHub Container Registry URL - `github_repo_url` --- link to source code - `description` --- what the software does **Database model**: {py:class}`ccat_ops_db.models.ReductionSoftware` Versions are tracked separately in **ReductionSoftwareVersion**, which records the image tag, digest, and `is_latest` flag. ## ReductionStepConfig Configuration for a specific step: parameters, environment variables, command template, and resource requirements. No polymorphism --- all instrument-specific parameters go in the `config_parameters` JSON column. **Key attributes:** - `name` --- descriptive name - `config_parameters` --- JSON dict of parameters passed to the container - `environment_variables` --- JSON dict of extra env vars - `command_template` --- the command to run inside the container - `resource_requirements` --- JSON dict (cpu, memory_gb, time_hours, gpu) **Database model**: {py:class}`ccat_ops_db.models.ReductionStepConfig` Example resource requirements: ``` {"cpu": 4, "memory_gb": 16, "time_hours": 2, "gpu": 0} ``` These are translated to platform-specific flags by the HPC backend: - SLURM: `--cpus-per-task=4 --mem=16G --time=02:00:00` - Kubernetes: resource requests/limits - Local: informational only ## ExecutedReductionStep A concrete run of a ReductionStep for a specific sub-group. Created by the trigger-manager, progressed through statuses by the workflow-manager and result-manager. **Key attributes:** - `reduction_step_id` --- which step this is a run of - `reduction_software_version_id` --- exact version used - `status` --- current lifecycle state (see {doc}`execution_flow`) - `sub_group_key` --- identifies the data subset (e.g., `source=NGC253|line=CO43`) - `hpc_job_id` --- backend-specific job identifier - `execution_command` --- full Apptainer command (stored for reproducibility) - `processing_time_s`, `peak_memory_gb`, `cpu_hours` --- execution metrics **Database model**: {py:class}`ccat_ops_db.models.ExecutedReductionStep` ## DataProduct An output file produced by a run. Classified by type based on the output directory convention (see {doc}`container_contract`). **Types**: SCIENCE, INTERMEDIATE, QA_METRIC, PLOT, STATISTICS, LOG **Key attributes:** - `executed_reduction_step_id` --- which run produced this - `data_product_type` --- classification - `file_path` --- path to the file on disk - `file_size` --- in bytes - `checksum` --- integrity verification - `metadata` --- JSON from optional sidecar files Provenance lineage is tracked via the `data_product_lineage` association table, linking each DataProduct back to the RawDataPackages that contributed to it. **Database model**: {py:class}`ccat_ops_db.models.DataProduct` ## Version Management Two modes for software version binding: **Compatible** (default) : Use the latest version of the ReductionSoftware. Intermediates from older versions are still valid. Non-breaking version bumps don't trigger re-processing. **Exact** : Use exactly the pinned version. Only intermediates from this exact version count as valid. Changing the pinned version invalidates old intermediates, causing the gap evaluation to trigger full re-processing. ## Related Documentation - {doc}`execution_flow` - How runs progress through statuses - {doc}`data_grouping` - How DataGroupings resolve into sub-groups - {doc}`container_contract` - How containers interact with the system