Pipeline#

This page describes the end-to-end data flow through the calibration engine, from opening the L0 store to writing L1 output.

Pipeline Overview#

        graph TD
  S1[1. Open Store<br/>raw.rs] -->|ScanData| SG[2. Gain Correction<br/>gain.rs]
  SG --> S2[3. Resolve Loads<br/>resolve.rs]
  S2 -->|CalibrationLoadFull| S4[4. Prepare<br/>prepare.rs]
  S4 -->|PreparedData| S3[5. Atmosphere<br/>atmosphere.rs]
  S3 -->|tr_s, tr_i, PWV| S5[6. Calibrate<br/>calibrate.rs]
  S5 -->|CalibrationResult| S5b[7. DBS Merge<br/>dbs.rs]
  S5b --> S6[8. Write L1<br/>calibrated.rs]

  style S1 fill:#e3f2fd
  style S5 fill:#e8f5e9
  style S6 fill:#fff3e0
    

Note: The prepare step runs before atmosphere determination so that per-subscan OFF indices are available for per-pair atmosphere fitting.

Stage 1: Open Store and List Scans#

File: cal-io/src/raw.rs

The pipeline opens the L0 Zarr store once (shared handle via Arc) and discovers all scan groups. Calibration-only scans (no source data) are filtered out — they serve as calibration references for adjacent scans.

open_store(path)          → ReadableWritableListableStorage
list_scans(path)          → Vec<u32>
scan_has_source(path, n)  → bool

Stage 2: Prefetch I/O#

File: cal-io/src/pipeline/mod.rs

A dedicated prefetch thread loads the next scan while the main thread processes the current one, overlapping I/O with compute:

        sequenceDiagram
  participant P as Prefetch Thread
  participant M as Main Thread
  participant D as Disk

  P->>D: load scan N
  D-->>P: ScanData N
  P->>M: send(ScanData N)
  P->>D: load scan N+1
  M->>M: calibrate scan N
  D-->>P: ScanData N+1
  M->>M: write L1 scan N
  P->>M: send(ScanData N+1)
  M->>M: calibrate scan N+1
    

Memory is bounded to two scans by using sync_channel(1).

Stage 2b: Gain Interpolation (optional)#

File: cal-core/src/gain.rs

When --gain-interpolate is 2 or 3, raw counts are corrected for receiver gain drift between two bracketing HOT/COLD calibration scans. The prefetch thread loads the next calibration snapshot alongside the current one. This stage modifies ScanData in place before resolve.

Stage 3: Resolve Calibration Loads#

File: cal-io/src/resolve.rs

Raw L0 data is resolved into physics-ready structures:

  1. Frequency grid – compute per-channel signal and image frequencies:

    $$ \nu_{sig}(c) = \nu_0^{sig} + (c - c_{ref}) \cdot \Delta\nu

    \nu_{img}(c) = \nu_0^{img} - (c - c_{ref}) \cdot \Delta\nu $$

  2. HOT/COLD extraction – identify calibration subscans by sobsmode (using CalibrationSnapshot::find_hot_cold()), average over dumps (NaN-aware for padded data)

  3. CalibrationLoadFull – compute gamma, Trec, bad channels for all pixels [C, R, A] in one pass (see Calibration Loads). Constructors take &CalibrationParams.

Stage 4: Data Preparation#

File: cal-core/src/modes/prepare.rs

A single prepare() function auto-detects the mode and dispatches to mode-specific preparation:

  • TotalPower: each ON matched to bracketing OFFs (linear interpolation by MJD)

  • OTF TotalPower: each OTF-ON matched to bracketing OFFs (NaN-aware dump average)

  • OTF Chopped: beam-switched ON/OFF pairing (per-dump subtraction)

Output is PreparedData containing reference spectra [C, R, A, S_on], ON subscan indices (zero-copy – ON data stays in ScanData), and off_index_per_on for per-subscan atmosphere fitting.

This step runs before atmosphere determination so that the OFF pairing information is available for per-pair atmosphere fitting.

See Observation Modes for mode details.

Stage 5: Atmosphere Determination#

File: cal-core/src/atmosphere.rs

Atmospheric transmission is determined from the data itself:

  1. Extract ATM coefficients b(v), c(v) for both sidebands

  2. Compute observed sky signal $S_{H,obs}$ from OFF and HOT counts

  3. Fit PWV using grid search + Newton refinement with frequency-adaptive sigma weighting (see Atmosphere)

  4. Compute per-channel transmission and zenith opacity

Three atmosphere strategies are available:

  • determine_atmosphere_full – single atmosphere for entire scan

  • determine_atmosphere_per_pixel – independent fit per pixel (when --pwv-per-pixel is set)

  • determine_atmosphere_per_pair – per ON-OFF pair atmosphere

Stage 6: Calibrate#

File: cal-core/src/calibrate.rs

calibrate_full() is the single calibration function. It performs vectorized calibration across all channels, dumps, pixels, and subscans:

$$ T_A^*(\nu, d, r, a, s) = \frac{\big(C_{ON}(\nu,d,r,a,s) - C_{REF}(\nu,r,a,s)\big) \cdot \gamma(\nu,r,a)} {\big(C_{hot}(\nu,r,a) - C_{cold}(\nu,r,a)\big) \cdot t_r^{sig}(\nu)} $$

A precomputed factor gamma / (H - C) / tr_s is broadcast across the dump and subscan axes for cache-friendly inner loops.

T_sys is output in the DSB convention.

See Calibration Equation for the full derivation.

Stage 7: DBS Merge (optional)#

File: cal-io/src/pipeline/dbs.rs

For OTF-chopped mode with --dbs-coupling enabled, beam A and beam B calibration results are merged: output_A = 0.5 * (cal_A + cal_B), and B positions are removed.

Stage 8: Write L1#

File: cal-io/src/calibrated.rs

Output arrays are written to a Zarr v3 store with zstd compression:

  • spectra[C, D, R, A, S] – calibrated $T_A^*$ (f64, K)

  • t_sys[C, R, A, S] – system temperature (f64, K)

  • flags[C, D, R, A, S] – u16 bitmask

  • tau_signal[C], tau_image[C] – zenith opacity (f64, Np)

  • Plus metadata, QA metrics, OTF coordinates, and provenance (see Data Formats)

Provenance metadata (L2-to-L0 traceability) is written to scan-level attributes when a ProvenanceContext is provided.

Optional Diagnostics#

Two diagnostics hook into the per-scan flow (see Sky Diagnostics (SKYDIFF / SKYCHOPDIFF)):

  • SKYDIFF (--skydiff <N>) records each OFF’s calibrated sky-hot spectrum after Stage 3 and diffs it against the cross-scan history. Forces serial scan processing to keep the history ordered.

  • SKYCHOPDIFF (--skychopdiff) runs after Stage 5: its calibration factor divides by the per-channel transmission of each group’s tagged ON, which only exists once the atmosphere is determined.

Error Handling#

Scan-level failures are captured without aborting the pipeline:

  • Failed scans are collected in PipelineResult::failures

  • A failures.json file is written to the output directory

  • Exit codes: 0 = all OK, 1 = total failure, 2 = partial

Timing#

Each scan logs a millisecond breakdown:

scan timing breakdown scan=39088
  load_ms=753 resolve_ms=244 atm_ms=762
  prepare_ms=2399 calibrate_ms=36876 write_ms=1175