# Pipeline This page describes the end-to-end data flow through the calibration engine, from opening the L0 store to writing L1 output. ## Pipeline Overview ```{eval-rst} .. mermaid:: graph TD S1[1. Open Store
raw.rs] -->|ScanData| SG[2. Gain Correction
gain.rs] SG --> S2[3. Resolve Loads
resolve.rs] S2 -->|CalibrationLoadFull| S4[4. Prepare
prepare.rs] S4 -->|PreparedData| S3[5. Atmosphere
atmosphere.rs] S3 -->|tr_s, tr_i, PWV| S5[6. Calibrate
calibrate.rs] S5 -->|CalibrationResult| S5b[7. DBS Merge
dbs.rs] S5b --> S6[8. Write L1
calibrated.rs] style S1 fill:#e3f2fd style S5 fill:#e8f5e9 style S6 fill:#fff3e0 ``` Note: The prepare step runs **before** atmosphere determination so that per-subscan OFF indices are available for per-pair atmosphere fitting. ## Stage 1: Open Store and List Scans **File:** `cal-io/src/raw.rs` The pipeline opens the L0 Zarr store once (shared handle via `Arc`) and discovers all scan groups. Calibration-only scans (no source data) are filtered out — they serve as calibration references for adjacent scans. ```text open_store(path) → ReadableWritableListableStorage list_scans(path) → Vec scan_has_source(path, n) → bool ``` ## Stage 2: Prefetch I/O **File:** `cal-io/src/pipeline/mod.rs` A dedicated prefetch thread loads the next scan while the main thread processes the current one, overlapping I/O with compute: ```{eval-rst} .. mermaid:: sequenceDiagram participant P as Prefetch Thread participant M as Main Thread participant D as Disk P->>D: load scan N D-->>P: ScanData N P->>M: send(ScanData N) P->>D: load scan N+1 M->>M: calibrate scan N D-->>P: ScanData N+1 M->>M: write L1 scan N P->>M: send(ScanData N+1) M->>M: calibrate scan N+1 ``` Memory is bounded to two scans by using `sync_channel(1)`. ## Stage 2b: Gain Interpolation (optional) **File:** `cal-core/src/gain.rs` When `--gain-interpolate` is 2 or 3, raw counts are corrected for receiver gain drift between two bracketing HOT/COLD calibration scans. The prefetch thread loads the next calibration snapshot alongside the current one. This stage modifies `ScanData` in place before resolve. ## Stage 3: Resolve Calibration Loads **File:** `cal-io/src/resolve.rs` Raw L0 data is resolved into physics-ready structures: 1. **Frequency grid** -- compute per-channel signal and image frequencies: $$ \nu_{sig}(c) = \nu_0^{sig} + (c - c_{ref}) \cdot \Delta\nu \nu_{img}(c) = \nu_0^{img} - (c - c_{ref}) \cdot \Delta\nu $$ 2. **HOT/COLD extraction** -- identify calibration subscans by `sobsmode` (using `CalibrationSnapshot::find_hot_cold()`), average over dumps (NaN-aware for padded data) 3. **CalibrationLoadFull** -- compute gamma, Trec, bad channels for all pixels `[C, R, A]` in one pass (see {doc}`physics/calibration-loads`). Constructors take `&CalibrationParams`. ## Stage 4: Data Preparation **File:** `cal-core/src/modes/prepare.rs` A single `prepare()` function auto-detects the mode and dispatches to mode-specific preparation: - **TotalPower:** each ON matched to bracketing OFFs (linear interpolation by MJD) - **OTF TotalPower:** each OTF-ON matched to bracketing OFFs (NaN-aware dump average) - **OTF Chopped:** beam-switched ON/OFF pairing (per-dump subtraction) Output is `PreparedData` containing reference spectra `[C, R, A, S_on]`, ON subscan indices (zero-copy -- ON data stays in `ScanData`), and `off_index_per_on` for per-subscan atmosphere fitting. This step runs **before** atmosphere determination so that the OFF pairing information is available for per-pair atmosphere fitting. See {doc}`physics/observation-modes` for mode details. ## Stage 5: Atmosphere Determination **File:** `cal-core/src/atmosphere.rs` Atmospheric transmission is determined from the data itself: 1. Extract ATM coefficients `b(v)`, `c(v)` for both sidebands 2. Compute observed sky signal $S_{H,obs}$ from OFF and HOT counts 3. Fit PWV using grid search + Newton refinement with frequency-adaptive sigma weighting (see {doc}`physics/atmosphere`) 4. Compute per-channel transmission and zenith opacity Three atmosphere strategies are available: - `determine_atmosphere_full` -- single atmosphere for entire scan - `determine_atmosphere_per_pixel` -- independent fit per pixel (when `--pwv-per-pixel` is set) - `determine_atmosphere_per_pair` -- per ON-OFF pair atmosphere ## Stage 6: Calibrate **File:** `cal-core/src/calibrate.rs` `calibrate_full()` is the **single calibration function**. It performs vectorized calibration across all channels, dumps, pixels, and subscans: $$ T_A^*(\nu, d, r, a, s) = \frac{\big(C_{ON}(\nu,d,r,a,s) - C_{REF}(\nu,r,a,s)\big) \cdot \gamma(\nu,r,a)} {\big(C_{hot}(\nu,r,a) - C_{cold}(\nu,r,a)\big) \cdot t_r^{sig}(\nu)} $$ A precomputed factor `gamma / (H - C) / tr_s` is broadcast across the dump and subscan axes for cache-friendly inner loops. T_sys is output in the DSB convention. See {doc}`physics/calibration-equation` for the full derivation. ## Stage 7: DBS Merge (optional) **File:** `cal-io/src/pipeline/dbs.rs` For OTF-chopped mode with `--dbs-coupling` enabled, beam A and beam B calibration results are merged: `output_A = 0.5 * (cal_A + cal_B)`, and B positions are removed. ## Stage 8: Write L1 **File:** `cal-io/src/calibrated.rs` Output arrays are written to a Zarr v3 store with zstd compression: - `spectra[C, D, R, A, S]` -- calibrated $T_A^*$ (f64, K) - `t_sys[C, R, A, S]` -- system temperature (f64, K) - `flags[C, D, R, A, S]` -- u16 bitmask - `tau_signal[C]`, `tau_image[C]` -- zenith opacity (f64, Np) - Plus metadata, QA metrics, OTF coordinates, and provenance (see {doc}`data-formats`) Provenance metadata (L2-to-L0 traceability) is written to scan-level attributes when a `ProvenanceContext` is provided. ## Optional Diagnostics Two diagnostics hook into the per-scan flow (see {doc}`diagnostics`): - **SKYDIFF** (`--skydiff `) records each OFF's calibrated sky-hot spectrum after Stage 3 and diffs it against the cross-scan history. Forces serial scan processing to keep the history ordered. - **SKYCHOPDIFF** (`--skychopdiff`) runs after Stage 5: its calibration factor divides by the per-channel transmission of each group's tagged ON, which only exists once the atmosphere is determined. ## Error Handling Scan-level failures are captured without aborting the pipeline: - Failed scans are collected in `PipelineResult::failures` - A `failures.json` file is written to the output directory - Exit codes: 0 = all OK, 1 = total failure, 2 = partial ## Timing Each scan logs a millisecond breakdown: ```text scan timing breakdown scan=39088 load_ms=753 resolve_ms=244 atm_ms=762 prepare_ms=2399 calibrate_ms=36876 write_ms=1175 ```