========
Pipeline
========
This page describes the end-to-end data flow through the calibration engine,
from opening the L0 store to writing L1 output.
Pipeline Overview
-----------------
.. mermaid::
graph TD
S1[1. Open Store
raw.rs] -->|ScanData| S2[2. Resolve Loads
resolve.rs]
S2 -->|CalibrationLoadFull| S3[3. Atmosphere
atmosphere.rs]
S3 -->|tr_s, tr_i, PWV| S4[4. Prepare
prepare.rs]
S4 -->|PreparedData| S5[5. Calibrate
calibrate.rs]
S5 -->|CalibrationResult| S6[6. Write L1
calibrated.rs]
style S1 fill:#e3f2fd
style S5 fill:#e8f5e9
style S6 fill:#fff3e0
Stage 1: Open Store and List Scans
-----------------------------------
**File:** ``cal-io/src/raw.rs``
The pipeline opens the L0 Zarr store once (shared handle via ``Arc``) and
discovers all scan groups. Calibration-only scans (no source data) are
filtered out — they serve as calibration references for adjacent scans.
.. code-block:: text
open_store(path) → ReadableWritableListableStorage
list_scans(path) → Vec
scan_has_source(path, n) → bool
Stage 2: Prefetch I/O
----------------------
**File:** ``cal-io/src/pipeline.rs``
A dedicated prefetch thread loads the next scan while the main thread
processes the current one, overlapping I/O with compute:
.. mermaid::
sequenceDiagram
participant P as Prefetch Thread
participant M as Main Thread
participant D as Disk
P->>D: load scan N
D-->>P: ScanData N
P->>M: send(ScanData N)
P->>D: load scan N+1
M->>M: calibrate scan N
D-->>P: ScanData N+1
M->>M: write L1 scan N
P->>M: send(ScanData N+1)
M->>M: calibrate scan N+1
Memory is bounded to two scans by using ``sync_channel(1)``.
Stage 3: Resolve Calibration Loads
-----------------------------------
**File:** ``cal-io/src/resolve.rs``
Raw L0 data is resolved into physics-ready structures:
1. **Frequency grid** — compute per-channel signal and image frequencies:
.. math::
\nu_{sig}(c) = \nu_0^{sig} + (c - c_{ref}) \cdot \Delta\nu
\nu_{img}(c) = \nu_0^{img} - (c - c_{ref}) \cdot \Delta\nu
2. **HOT/COLD extraction** — identify calibration subscans by ``sobsmode``,
average over dumps (NaN-aware for padded data)
3. **CalibrationLoadFull** — compute gamma, Trec, bad channels for all
pixels ``[C, R, A]`` in one pass (see :doc:`physics/calibration-loads`)
Stage 4: Atmosphere Determination
----------------------------------
**File:** ``cal-core/src/atmosphere.rs``
Atmospheric transmission is determined from the data itself:
1. Extract ATM coefficients ``b(ν)``, ``c(ν)`` for both sidebands
2. Compute observed sky signal :math:`S_{H,obs}` from OFF and HOT counts
3. Fit PWV using grid search + Newton refinement
(see :doc:`physics/atmosphere`)
4. Compute per-channel transmission and zenith opacity
Stage 5: Data Preparation
--------------------------
**File:** ``cal-core/src/modes/prepare.rs``
Mode-specific ON/OFF reference pairing:
- **TotalPower:** each ON matched to nearest-time OFF (single dump)
- **OTF:** each OTF-ON matched to nearest OFF (NaN-aware dump average)
Output is ``PreparedData`` containing reference spectra ``[C, R, A, S_on]``
and ON subscan indices (zero-copy — ON data stays in ``ScanData``).
See :doc:`physics/observation-modes` for mode details.
Stage 6: Calibrate
-------------------
**File:** ``cal-core/src/calibrate.rs``
Vectorized calibration across all channels, dumps, pixels, and subscans:
.. math::
T_A^*(\nu, d, r, a, s) =
\frac{\big(C_{ON}(\nu,d,r,a,s) - C_{REF}(\nu,r,a,s)\big) \cdot \gamma(\nu,r,a)}
{\big(C_{hot}(\nu,r,a) - C_{cold}(\nu,r,a)\big) \cdot t_r^{sig}(\nu)}
A precomputed factor ``gamma / (H - C) / tr_s`` is broadcast across the
dump and subscan axes for cache-friendly inner loops.
See :doc:`physics/calibration-equation` for the full derivation.
Stage 7: Write L1
------------------
**File:** ``cal-io/src/calibrated.rs``
Output arrays are written to a Zarr v3 store with zstd compression:
- ``spectra[C, D, R, A, S]`` — calibrated :math:`T_A^*` (f64, K)
- ``t_sys[C, R, A, S]`` — system temperature (f64, K)
- ``flags[C, D, R, A, S]`` — u16 bitmask
- ``tau_signal[C]``, ``tau_image[C]`` — zenith opacity (f64, Np)
- Plus metadata, QA metrics, and OTF coordinates (see :doc:`data-formats`)
Error Handling
--------------
Scan-level failures are captured without aborting the pipeline:
- Failed scans are collected in ``PipelineResult::failures``
- A ``failures.json`` file is written to the output directory
- Exit codes: 0 = all OK, 1 = total failure, 2 = partial
Timing
------
Each scan logs a millisecond breakdown:
.. code-block:: text
scan timing breakdown scan=39088
load_ms=753 resolve_ms=244 atm_ms=762
prepare_ms=2399 calibrate_ms=36876 write_ms=1175