# Pipeline
This page describes the end-to-end data flow through the calibration engine,
from opening the L0 store to writing L1 output.
## Pipeline Overview
```{eval-rst}
.. mermaid::
graph TD
S1[1. Open Store
raw.rs] -->|ScanData| SG[2. Gain Correction
gain.rs]
SG --> S2[3. Resolve Loads
resolve.rs]
S2 -->|CalibrationLoadFull| S4[4. Prepare
prepare.rs]
S4 -->|PreparedData| S3[5. Atmosphere
atmosphere.rs]
S3 -->|tr_s, tr_i, PWV| S5[6. Calibrate
calibrate.rs]
S5 -->|CalibrationResult| S5b[7. DBS Merge
dbs.rs]
S5b --> S6[8. Write L1
calibrated.rs]
style S1 fill:#e3f2fd
style S5 fill:#e8f5e9
style S6 fill:#fff3e0
```
Note: The prepare step runs **before** atmosphere determination so that
per-subscan OFF indices are available for per-pair atmosphere fitting.
## Stage 1: Open Store and List Scans
**File:** `cal-io/src/raw.rs`
The pipeline opens the L0 Zarr store once (shared handle via `Arc`) and
discovers all scan groups. Calibration-only scans (no source data) are
filtered out — they serve as calibration references for adjacent scans.
```text
open_store(path) → ReadableWritableListableStorage
list_scans(path) → Vec
scan_has_source(path, n) → bool
```
## Stage 2: Prefetch I/O
**File:** `cal-io/src/pipeline/mod.rs`
A dedicated prefetch thread loads the next scan while the main thread
processes the current one, overlapping I/O with compute:
```{eval-rst}
.. mermaid::
sequenceDiagram
participant P as Prefetch Thread
participant M as Main Thread
participant D as Disk
P->>D: load scan N
D-->>P: ScanData N
P->>M: send(ScanData N)
P->>D: load scan N+1
M->>M: calibrate scan N
D-->>P: ScanData N+1
M->>M: write L1 scan N
P->>M: send(ScanData N+1)
M->>M: calibrate scan N+1
```
Memory is bounded to two scans by using `sync_channel(1)`.
## Stage 2b: Gain Interpolation (optional)
**File:** `cal-core/src/gain.rs`
When `--gain-interpolate` is 2 or 3, raw counts are corrected for
receiver gain drift between two bracketing HOT/COLD calibration scans.
The prefetch thread loads the next calibration snapshot alongside the
current one. This stage modifies `ScanData` in place before resolve.
## Stage 3: Resolve Calibration Loads
**File:** `cal-io/src/resolve.rs`
Raw L0 data is resolved into physics-ready structures:
1. **Frequency grid** -- compute per-channel signal and image frequencies:
$$
\nu_{sig}(c) = \nu_0^{sig} + (c - c_{ref}) \cdot \Delta\nu
\nu_{img}(c) = \nu_0^{img} - (c - c_{ref}) \cdot \Delta\nu
$$
2. **HOT/COLD extraction** -- identify calibration subscans by `sobsmode`
(using `CalibrationSnapshot::find_hot_cold()`), average over dumps
(NaN-aware for padded data)
3. **CalibrationLoadFull** -- compute gamma, Trec, bad channels for all
pixels `[C, R, A]` in one pass (see {doc}`physics/calibration-loads`).
Constructors take `&CalibrationParams`.
## Stage 4: Data Preparation
**File:** `cal-core/src/modes/prepare.rs`
A single `prepare()` function auto-detects the mode and dispatches
to mode-specific preparation:
- **TotalPower:** each ON matched to bracketing OFFs (linear interpolation by MJD)
- **OTF TotalPower:** each OTF-ON matched to bracketing OFFs (NaN-aware dump average)
- **OTF Chopped:** beam-switched ON/OFF pairing (per-dump subtraction)
Output is `PreparedData` containing reference spectra `[C, R, A, S_on]`,
ON subscan indices (zero-copy -- ON data stays in `ScanData`), and
`off_index_per_on` for per-subscan atmosphere fitting.
This step runs **before** atmosphere determination so that the OFF
pairing information is available for per-pair atmosphere fitting.
See {doc}`physics/observation-modes` for mode details.
## Stage 5: Atmosphere Determination
**File:** `cal-core/src/atmosphere.rs`
Atmospheric transmission is determined from the data itself:
1. Extract ATM coefficients `b(v)`, `c(v)` for both sidebands
2. Compute observed sky signal $S_{H,obs}$ from OFF and HOT counts
3. Fit PWV using grid search + Newton refinement with frequency-adaptive
sigma weighting (see {doc}`physics/atmosphere`)
4. Compute per-channel transmission and zenith opacity
Three atmosphere strategies are available:
- `determine_atmosphere_full` -- single atmosphere for entire scan
- `determine_atmosphere_per_pixel` -- independent fit per pixel
(when `--pwv-per-pixel` is set)
- `determine_atmosphere_per_pair` -- per ON-OFF pair atmosphere
## Stage 6: Calibrate
**File:** `cal-core/src/calibrate.rs`
`calibrate_full()` is the **single calibration function**. It performs
vectorized calibration across all channels, dumps, pixels, and subscans:
$$
T_A^*(\nu, d, r, a, s) =
\frac{\big(C_{ON}(\nu,d,r,a,s) - C_{REF}(\nu,r,a,s)\big) \cdot \gamma(\nu,r,a)}
{\big(C_{hot}(\nu,r,a) - C_{cold}(\nu,r,a)\big) \cdot t_r^{sig}(\nu)}
$$
A precomputed factor `gamma / (H - C) / tr_s` is broadcast across the
dump and subscan axes for cache-friendly inner loops.
T_sys is output in the DSB convention.
See {doc}`physics/calibration-equation` for the full derivation.
## Stage 7: DBS Merge (optional)
**File:** `cal-io/src/pipeline/dbs.rs`
For OTF-chopped mode with `--dbs-coupling` enabled, beam A and beam B
calibration results are merged: `output_A = 0.5 * (cal_A + cal_B)`,
and B positions are removed.
## Stage 8: Write L1
**File:** `cal-io/src/calibrated.rs`
Output arrays are written to a Zarr v3 store with zstd compression:
- `spectra[C, D, R, A, S]` -- calibrated $T_A^*$ (f64, K)
- `t_sys[C, R, A, S]` -- system temperature (f64, K)
- `flags[C, D, R, A, S]` -- u16 bitmask
- `tau_signal[C]`, `tau_image[C]` -- zenith opacity (f64, Np)
- Plus metadata, QA metrics, OTF coordinates, and provenance
(see {doc}`data-formats`)
Provenance metadata (L2-to-L0 traceability) is written to scan-level
attributes when a `ProvenanceContext` is provided.
## Optional Diagnostics
Two diagnostics hook into the per-scan flow (see {doc}`diagnostics`):
- **SKYDIFF** (`--skydiff `) records each OFF's calibrated sky-hot
spectrum after Stage 3 and diffs it against the cross-scan history.
Forces serial scan processing to keep the history ordered.
- **SKYCHOPDIFF** (`--skychopdiff`) runs after Stage 5: its calibration
factor divides by the per-channel transmission of each group's
tagged ON, which only exists once the atmosphere is determined.
## Error Handling
Scan-level failures are captured without aborting the pipeline:
- Failed scans are collected in `PipelineResult::failures`
- A `failures.json` file is written to the output directory
- Exit codes: 0 = all OK, 1 = total failure, 2 = partial
## Timing
Each scan logs a millisecond breakdown:
```text
scan timing breakdown scan=39088
load_ms=753 resolve_ms=244 atm_ms=762
prepare_ms=2399 calibrate_ms=36876 write_ms=1175
```