Pipeline#
This page describes the end-to-end data flow through the calibration engine, from opening the L0 store to writing L1 output.
Pipeline Overview#
graph TD
S1[1. Open Store<br/>raw.rs] -->|ScanData| S2[2. Resolve Loads<br/>resolve.rs]
S2 -->|CalibrationLoadFull| S3[3. Atmosphere<br/>atmosphere.rs]
S3 -->|tr_s, tr_i, PWV| S4[4. Prepare<br/>prepare.rs]
S4 -->|PreparedData| S5[5. Calibrate<br/>calibrate.rs]
S5 -->|CalibrationResult| S6[6. Write L1<br/>calibrated.rs]
style S1 fill:#e3f2fd
style S5 fill:#e8f5e9
style S6 fill:#fff3e0
Stage 1: Open Store and List Scans#
File: cal-io/src/raw.rs
The pipeline opens the L0 Zarr store once (shared handle via Arc) and
discovers all scan groups. Calibration-only scans (no source data) are
filtered out — they serve as calibration references for adjacent scans.
open_store(path) → ReadableWritableListableStorage
list_scans(path) → Vec<u32>
scan_has_source(path, n) → bool
Stage 2: Prefetch I/O#
File: cal-io/src/pipeline.rs
A dedicated prefetch thread loads the next scan while the main thread processes the current one, overlapping I/O with compute:
sequenceDiagram
participant P as Prefetch Thread
participant M as Main Thread
participant D as Disk
P->>D: load scan N
D-->>P: ScanData N
P->>M: send(ScanData N)
P->>D: load scan N+1
M->>M: calibrate scan N
D-->>P: ScanData N+1
M->>M: write L1 scan N
P->>M: send(ScanData N+1)
M->>M: calibrate scan N+1
Memory is bounded to two scans by using sync_channel(1).
Stage 3: Resolve Calibration Loads#
File: cal-io/src/resolve.rs
Raw L0 data is resolved into physics-ready structures:
Frequency grid — compute per-channel signal and image frequencies:
\[ \begin{align}\begin{aligned}\nu_{sig}(c) = \nu_0^{sig} + (c - c_{ref}) \cdot \Delta\nu\\\nu_{img}(c) = \nu_0^{img} - (c - c_{ref}) \cdot \Delta\nu\end{aligned}\end{align} \]HOT/COLD extraction — identify calibration subscans by
sobsmode, average over dumps (NaN-aware for padded data)CalibrationLoadFull — compute gamma, Trec, bad channels for all pixels
[C, R, A]in one pass (see Calibration Loads)
Stage 4: Atmosphere Determination#
File: cal-core/src/atmosphere.rs
Atmospheric transmission is determined from the data itself:
Extract ATM coefficients
b(ν),c(ν)for both sidebandsCompute observed sky signal \(S_{H,obs}\) from OFF and HOT counts
Fit PWV using grid search + Newton refinement (see Atmosphere)
Compute per-channel transmission and zenith opacity
Stage 5: Data Preparation#
File: cal-core/src/modes/prepare.rs
Mode-specific ON/OFF reference pairing:
TotalPower: each ON matched to nearest-time OFF (single dump)
OTF: each OTF-ON matched to nearest OFF (NaN-aware dump average)
Output is PreparedData containing reference spectra [C, R, A, S_on]
and ON subscan indices (zero-copy — ON data stays in ScanData).
See Observation Modes for mode details.
Stage 6: Calibrate#
File: cal-core/src/calibrate.rs
Vectorized calibration across all channels, dumps, pixels, and subscans:
A precomputed factor gamma / (H - C) / tr_s is broadcast across the
dump and subscan axes for cache-friendly inner loops.
See Calibration Equation for the full derivation.
Stage 7: Write L1#
File: cal-io/src/calibrated.rs
Output arrays are written to a Zarr v3 store with zstd compression:
spectra[C, D, R, A, S]— calibrated \(T_A^*\) (f64, K)t_sys[C, R, A, S]— system temperature (f64, K)flags[C, D, R, A, S]— u16 bitmasktau_signal[C],tau_image[C]— zenith opacity (f64, Np)Plus metadata, QA metrics, and OTF coordinates (see Data Formats)
Error Handling#
Scan-level failures are captured without aborting the pipeline:
Failed scans are collected in
PipelineResult::failuresA
failures.jsonfile is written to the output directoryExit codes: 0 = all OK, 1 = total failure, 2 = partial
Timing#
Each scan logs a millisecond breakdown:
scan timing breakdown scan=39088
load_ms=753 resolve_ms=244 atm_ms=762
prepare_ms=2399 calibrate_ms=36876 write_ms=1175