Pipeline Architecture#

Documentation Verified Last checked: 2025-10-17 Reviewer: Christof Buchbender

The Data Transfer System implements a 7-stage pipeline that moves data from telescope to long-term archive. Each stage is independent but coordinates through database state and Celery tasks.

Pipeline Overview#

Stage 0: Instrument Filing
─────────────────────────────
Instruments create RawDataFile records via ops-db-api
Files physically written to SOURCE locations

↓

Stage 1: Raw Data Package Creation
───────────────────────────────────
Group related files into RawDataPackages
Create tar archives at SOURCE locations
Move to BUFFER locations

↓

Stage 2: Data Transfer Package Assembly
────────────────────────────────────────
Aggregate RawDataPackages into DataTransferPackages
Optimize for network transfer efficiency
Create transfer archives at BUFFER locations

↓

Stage 3: Inter-Site Transfer
─────────────────────────────
Transfer DataTransferPackages between sites
Uses BBCP for high-performance parallel transfer
Round-robin distribution to LTA sites

↓

Stage 4: Unpacking and Verification
────────────────────────────────────
Extract archives at destination BUFFERs
Verify checksums of all extracted files
Create PhysicalCopy records

↓

Stage 5: Long-Term Archive
──────────────────────────
Move RawDataPackages from BUFFER to LTA locations
Generate metadata sidecar files
Mark packages as ARCHIVED

↓

Stage 6: Staging (On-Demand)
─────────────────────────────
Scientists request data for processing
System stages from LTA to PROCESSING locations
Data cleaned up after processing completes

↓

Stage 7: Deletion and Cleanup
──────────────────────────────
Remove temporary files from SOURCE and BUFFER
Delete completed DataTransferPackages
Apply retention policies to PROCESSING locations

Stage Details#

Stage 0: Instrument Filing#

Purpose: Data enters the system

Actors: Instruments (Prime-Cam, CHAI), instrument control software

Process:

  1. Instrument completes observation, writes files to disk

  2. Instrument software calls Operations Database API (ops-db-api) to create RawDataFile records

  3. Files linked to ExecutedObsUnit (observation) and InstrumentModule

  4. Files remain at SOURCE location (instrument computer)

Database Changes:

API: Operations Database API (ops-db-api) endpoints for filing observations

Notes:

  • This stage is outside the data-transfer system

  • Data transfer system discovers these files in /pipeline.rst#stage-1-raw-data-package-creation

  • Files may arrive continuously or in bursts

  • Important for instruments to file metadata correctly

Stage 1: Raw Data Package Creation#

Purpose: Group and package raw files for transfer

Manager: raw_data_package_manager

Key Function: raw_data_package_manager_service()

Process:

  1. Discovery: Manager queries for RawDataFile not yet in a RawDataPackage

    SELECT * FROM raw_data_file
    WHERE raw_data_package_id IS NULL
    AND data_location IN (SOURCE locations)
    
  2. Grouping:

  3. Record Creation:

    • Manager creates RawDataPackage record in database

    • Links all files to new package

    • Determines destination buffer for package

  4. Task Submission:

  5. Worker Execution: Worker running at SOURCE location:

    • Creates tar archive of all files in RawDataPackage

    • Preserves directory structure

    • Calculates xxHash64 checksum

    • Moves tar to BUFFER location (local or remote)

    • Updates RawDataPackage record with checksum and Status

Database Changes:

Key Implementation Details:

  • Archive format: TAR (optionally compressed)

  • Checksum algorithm: xxHash64 (fast for large files)

  • Transfer method: Local copy or rsync to buffer

  • Hierarchical path structure preserved in archive

Configuration:

  • RAW_DATA_PACKAGE_MANAGER_SLEEP_TIME: How often manager runs

  • DEVELOPMENT_MODE_LOCALHOST_ONLY: Whether to treat localhost as same host for remote commands

  • VERBOSE: Whether to enable verbose mode

  • Buffer selection: Primary buffer per site (by priority) and active flag

Error Handling:

  • Archive creation failure: Retry with exponential backoff

  • Worker crash: Detected by heartbeat system, task resubmitted

Stage 2: Data Transfer Package Assembly#

Purpose: Bundle raw data packages and establish transfer routes

Manager: ccat_data_transfer.data_transfer_package_manager

Key Function: create_data_transfer_packages()

Process:

  1. Discovery: Manager queries for RawDataPackage in BUFFER locations not yet in a DataTransferPackage using get_unpackaged_raw_data_packages_in_buffers()

  2. Size-Based Grouping: Aggregate packages using group_packages_for_transfer():

    • Target size: MAXIMUM_DATA_TRANSFER_PACKAGE_SIZE_GB

    • Packages ≥90% of max size bundled separately

    • Packages grouped until reaching 90-110% of max size

    • Sorted largest-first for optimal packing

  3. Package Record Creation:

  4. Worker Execution: Celery task create_data_transfer_package_task() assembles physical archive:

  5. Route Determination & Transfer Seeding: After package completion, manager creates DataTransfer records for next stage:

    Primary Routes (SOURCE site → LTA site):

    Secondary Routes (LTA site → LTA site):

    • Discovered via discover_secondary_routes()

    • Ensures all LTA sites eventually receive all data

    • Created only after primary transfer completes

    • Also uses round-robin distribution

Database Changes:

Key Implementation Details:

  • Physical tar.gz archives created in BUFFER by workers routed via route_task_by_location()

  • Multiple RawDataPackage archives bundled into single transfer archive

  • Routes computed dynamically from active site topology

  • Round-robin ensures balanced distribution across LTA sites

Configuration:

  • MAXIMUM_DATA_TRANSFER_PACKAGE_SIZE_GB: Size threshold for bundling (default: 100 GB)

  • PACKAGE_MANAGER_SLEEP_TIME: Manager polling interval

  • Round-robin Redis key: round_robin:source:{site_short_name}

Error Handling:

  • Retry logic: DataTransferPackageOperations (max 3 retries)

  • Failed packages: Status set to FAILED, constituent packages marked FAILED

  • Database errors: Transaction rollback, logged via structured logger

Stage 3: Inter-Site Transfer#

Purpose: Transfer assembled packages between sites (SOURCE → LTA, LTA → LTA) Manager: ccat_data_transfer.transfer_manager Key Function: transfer_transfer_packages() Worker Task: transfer_files_bbcp()

Process:

  1. Discovery: Manager queries for DataTransfer records with status PENDING using _get_pending_transfers()

    • These records were created in Stage 2 by the data transfer package manager

    • Each represents a route from origin to destination location

  2. Buffer State Check: Before scheduling transfers, consult BufferManager:

    • can_create_data(): If buffer in emergency state, postpone transfer

    • get_max_parallel_transfers(): Adjust concurrency based on buffer health

    • Prevents overwhelming destination storage during buffer pressure

  3. Transfer Method Filtering: Filter for supported methods using _filter_supported_transfers():

    • Currently supports: bbcp (Bulk Block Copy Protocol)

    • Unsupported methods logged as errors

    • Configuration: SUPPORTED_DATA_TRANSFER_METHODS

  4. Task Scheduling: For each supported transfer:

  5. Worker Execution: Celery task performs actual transfer:

    1. URL Construction: Build source/destination URLs via _construct_transfer_urls():

    2. Destination Directory Creation: Ensure target directory exists:

    3. Transfer Execution: Execute bbcp command via _execute_bbcp_command():

    4. Metrics Collection: Track transfer performance:

      • Bytes transferred, duration, transfer rates (peak/average)

      • Number of streams, network errors

      • Sent to InfluxDB via HousekeepingMetrics

    5. Status Update: On success:

Database Changes:

Error Handling & Retry Logic:

Key Implementation Details:

  • Polymorphic storage handling: Disk, S3, and Tape locations supported

  • Dynamic queue routing ensures workers execute on correct hosts

  • Buffer-aware scheduling prevents storage overflow

  • Metrics tracked for performance monitoring and troubleshooting

  • bbcp output logged to BBCPLog table

  • Redis pub/sub notifications: transfer:overview channel for UI updates

Configuration:

  • SUPPORTED_DATA_TRANSFER_METHODS: Enabled transfer protocols (default: ["bbcp"])

  • DATA_TRANSFER_WORKERS: Maximum parallel transfers

  • DEVELOPMENT_MODE: If True, uses cp instead of bbcp for local transfers

  • TRANSFER_MANAGER_SLEEP_TIME: Manager polling interval

  • BBCP settings: Configurable via BBCPSettings

Monitoring & Observability:

  • Transfer metrics sent to InfluxDB with tags:

    • source_location, destination_location, transfer_id

    • transfer_method, bbcp configuration parameters

    • Performance: peak_transfer_rate_mbps, average_transfer_rate_mbps

    • Reliability: network_errors, retry_count

  • Structured logging via get_structured_logger()

  • Real-time notifications via Redis pub/sub for monitoring dashboards

Stage 4: Unpacking and Verification#

Purpose: Extract DataTransferPackage archives and verify constituent RawDataPackage integrity

Manager: ccat_data_transfer.data_integrity_manager

Key Function: unpack_and_verify_files()

Worker Task: unpack_data_transfer_package()

Process:

  1. Discovery: Manager queries for DataTransfer records with completed transfers needing unpacking using _get_pending_unpackpings():

    transfers = session.query(DataTransfer).filter(
        DataTransfer.status == Status.COMPLETED,
        DataTransfer.unpack_status == Status.PENDING
    ).all()
    
    • Transfer must be completed (archive successfully transferred)

    • Unpacking not yet attempted or previously failed and reset for retry

  2. Task Scheduling: For each pending unpack:

  3. Worker Execution: Task performs unpacking and verification:

    1. Path Resolution: Determine archive location via _get_paths():

      • Supports DiskDataLocation (currently)

      • Archive path: destination_location.path / data_transfer_package.relative_path

      • Extraction destination: destination_location.path (base directory)

    2. Directory Creation: Ensure extraction destination exists via create_local_folder()

    3. Archive Extraction: Unpack using unpack_local():

      • Extracts all constituent RawDataPackage tar.gz files

      • Preserves directory structure from relative_path

      • Single-level unpacking: DataTransferPackage → RawDataPackages (not extracted further)

    4. Checksum Verification: Verify each extracted RawDataPackage via _verify_checksums():

    5. Physical Copy Creation: On successful verification via _update_data_transfer_status():

  4. Corruption Handling: If archive corruption detected via ArchiveCorruptionError, execute cleanup via _cleanup_corrupted_transfer():

    1. Delete Corrupted Archive at Destination:

      • Find DataTransferPackagePhysicalCopy at destination

      • Mark deletion_status as SCHEDULED

      • Dispatch delete_physical_copy() task

      • Wait for deletion completion (5 minute timeout)

    2. Conditional Source Cleanup (Secondary Transfers Only):

      • Detect secondary transfer: Check if both origin and destination are LTA sites

      • If secondary transfer:

        • Delete source DataTransferPackage archive (already replicated elsewhere)

        • Schedule deletion of source physical copy

      • If primary transfer: Skip source cleanup (source is original buffer)

    3. Deregister RawDataPackages:

    4. Delete DataTransferPackage Record (Conditional):

      • Only if both destination and source (if applicable) deletions successful

      • Otherwise keep record for retry by automatic cleanup

    5. Re-raise Exception: Trigger task retry system after cleanup

Database Changes:

Error Handling & Retry Logic:

Key Implementation Details:

  • Single-level unpacking: DataTransferPackage (tar) contains RawDataPackages (tar.gz files), which remain compressed

  • Checksum verification at package level: Verifies RawDataPackage archives, not individual raw data files

  • Smart corruption recovery: Distinguishes primary vs secondary transfers for appropriate cleanup

  • Synchronous deletion: Waits for corruption cleanup to complete before proceeding

  • Redis pub/sub notifications: transfer:overview channel with events:

    • unpack_scheduled, unpack_completed, unpack_failed

    • corrupted_transfer_cleanup

Configuration:

  • DATA_INTEGRITY_MANAGER_SLEEP_TIME: Manager polling interval

  • Corruption cleanup timeout: 300 seconds (5 minutes) per deletion task

Monitoring & Observability:

Stage 5: Long-Term Archive#

Purpose: Transfer RawDataPackage from BUFFER to permanent LTA storage

Manager: ccat_data_transfer.archive_manager

Key Function: transfer_raw_data_packages_to_long_term_archive()

Worker Task: send_data_to_long_term_archive()

Process:

  1. Discovery: Manager queries for RawDataPackage in BUFFER locations needing archival via _get_pending_new_transfers_to_lta_location():

    # For each LTA location at each site
    pending_packages = session.query(RawDataPackage)\
        .join(RawDataPackagePhysicalCopy)\
        .filter(
            RawDataPackagePhysicalCopy.data_location_id.in_(buffer_location_ids),
            RawDataPackagePhysicalCopy.status == PhysicalCopyStatus.PRESENT,
            ~RawDataPackage.id.in_(existing_lta_transfer_ids)
        ).distinct().all()
    
    • Packages must be physically present in a BUFFER at the LTA site

    • No existing LongTermArchiveTransfer to this LTA location

    • Site-specific: Each site’s buffer can archive to its own LTA locations

  2. Transfer Record Creation: For each pending package via _create_long_term_archive_transfer():

  3. Task Scheduling: Via _schedule_transfer_task():

    • Status: PENDINGSCHEDULED

    • Queue routing via route_task_by_location() based on destination LTA location

    • Ensures worker executes on host with access to both buffer and LTA

    • Record start_time

  4. Worker Execution: Task performs storage-type-specific transfer via _execute_transfer():

    1. URL Construction: Build source and destination paths via _construct_transfer_urls():

    2. Transfer Execution: Based on source → destination storage type combination:

      Disk → Disk: Via _execute_disk_to_disk_transfer()

      • Simple cp command

      • Create destination directory if needed

      • Preserves tar.gz archive format

      Disk → S3: Two implementation options:

      • boto3 (default): Via _execute_s3_upload()

        • Reads entire file into memory

        • Calculates SHA256 checksum

        • Uploads with ChecksumSHA256 for integrity verification

        • Includes comprehensive S3 object metadata (see below)

        • Uploads separate extended metadata JSON file

      • Coscine (RDM platform): Via _execute_coscine_s3_upload()

        • Uses Coscine API client

        • Uploads to configured Coscine project/resource

        • Includes Coscine-specific metadata form

        • Separate extended metadata upload via _upload_coscine_extended_metadata()

      S3 → Disk: Via _execute_s3_to_disk_transfer()

      • Uses aws s3 cp command

      • Creates destination directory

      • Downloads from S3 to local filesystem

      S3 → S3: Not yet implemented (raises ValueError)

    3. Metadata Generation: For S3 destinations, comprehensive metadata created:

      Object Metadata (S3 headers): Via _get_s3_metadata()

      • Essential discovery keys:

        • obs_dataset_id: Unique observation identifier

        • obs_collection: Temporal collection (e.g., CCAT-2024-Q3)

        • obs_content_type: Always raw_data_package

      • Core provenance:

        • obs_telescope, obs_instrument, obs_date_obs

        • obs_program_id, obs_subprogram_id

      • Target information:

        • obs_target_name, obs_ra_deg, obs_dec_deg

      • File integrity:

        • file_xxhash64: Package checksum

        • file_size_bytes, file_record_count

      • Archive management:

        • archive_retention: permanent

        • archive_access_class: public

        • metadata_path: Link to extended metadata JSON

      Extended Metadata (JSON sidecar): Via _generate_ivoa_metadata()

      • IVOA-compatible structure for astronomical data discovery

      • Comprehensive nested document with:

        • Identifiers: dataset_id, publisher_did (IVOA URI), ivoa_collection

        • Facility: Observatory, telescope, instrument details with coordinates

        • Target: Name, type, coordinates (decimal + sexagesimal), frame, epoch

        • Observation: Program info, timing, duration

        • Contents: File listing with individual file checksums and sizes

        • Quality: Observation status and quality metrics

        • Processing: Pipeline name and version

        • Custom: Instrument-specific metadata from RawDataPackageMetadata

      • Uploaded as separate JSON file: {package_name}_metadata.json

      • Stored alongside data package for data discovery systems

    4. Checksum Verification: SHA256 for S3 uploads (handled by S3 API)

    5. Metrics Collection: Send to InfluxDB via HousekeepingMetrics:

      • operation: long_term_archive_transfer or s3_upload

      • Transfer rate (MB/s), file size, duration

      • Source/destination locations, transfer method

      • Success/failure status

  5. Completion: Via _mark_transfer_successful():

Database Changes:

Storage-Specific Implementation:

Disk LTA (DiskDataLocation):

  • Simple file copy (cp command)

  • Preserves tar.gz archive format

  • Instant access for retrieval

  • Lower redundancy than S3

S3 LTA (S3DataLocation):

  • Primary Method (boto3):

    • Direct Python SDK upload

    • SHA256 integrity verification built-in

    • Rich S3 object metadata (32 custom key-value pairs)

    • Extended IVOA metadata as separate JSON object

    • Suitable for AWS S3, MinIO, or compatible storage

  • Alternative Method (Coscine):

    • Research Data Management platform integration

    • Coscine API for uploads

    • Metadata mapped to Coscine’s schema

    • Suitable for institutional RDM compliance

    • Configuration: COSCINE_API_TOKEN, COSCINE_PROJECT, COSCINE_RESOURCE

  • Object lifecycle policies (configured in S3, not in code)

  • Future: Automatic Glacier transitions for cost optimization

Tape LTA (TapeDataLocation):

  • Not yet implemented

  • Future: Write to tape library, update catalog

  • Slowest retrieval but lowest cost per TB

Error Handling & Retry Logic:

  • Base class: LongTermArchiveTask with max 3 retries

  • On retry: reset_state_on_failure()

    • Status reset to PENDING

    • Increment attempt_count

    • Clear failure_error_message`

    • Reset state to TRANSFERRING

    • Log retry reason

  • On permanent failure: mark_permanent_failure()

    • Status: FAILED

    • state: FAILED

    • Log final failure with attempt count

  • Transfer-specific errors:

    • S3 throttling: Handled by boto3 retry logic

    • Disk full: Raises exception, requires manual intervention

    • Checksum mismatch: S3 rejects upload automatically

    • Network errors: Retry with exponential backoff

Key Implementation Details:

  • Site-specific archiving: Each site has its own LTA location(s)

  • Multi-destination: Packages can be archived to multiple LTA locations (different sites)

  • Transfer tracking: Separate LongTermArchiveTransfer record per destination

  • Comprehensive logging: LongTermArchiveTransferLog table maintains audit trail

  • Metadata-rich S3: Object metadata enables data discovery without downloading

  • IVOA compliance: Extended metadata follows International Virtual Observatory Alliance standards

  • Queue routing: Dynamic routing ensures tasks execute on hosts with proper access

  • Metrics integration: InfluxDB metrics for transfer monitoring and troubleshooting

Configuration:

  • ARCHIVE_MANAGER_SLEEP_TIME: Manager polling interval

  • s3_method: Choose boto3 (default) or coscine

  • boto3 configuration:

    • s3_endpoint_url, s3_access_key_id, s3_secret_access_key

    • s3_region_name

    • Per-location credentials via S3DataLocation configuration

  • Coscine configuration:

    • COSCINE_API_TOKEN: API authentication token

    • COSCINE_PROJECT: Target project identifier

    • COSCINE_RESOURCE: Target resource identifier

  • LTA locations defined per site in database

  • Retention policies configured at S3 bucket level (external to system)

Monitoring & Observability:

  • Structured logging via get_structured_logger()

  • Key log events:

    • pending_transfers, scheduling_long_term_archive_transfer

    • s3_upload_successful, disk_to_disk_transfer_successful

    • long_term_archive_transfer_completed

    • metadata_generation, extended_metadata_upload_successful

  • Transfer metrics to InfluxDB:

    • Transfer rate (MB/s), file size, duration

    • Source/destination locations and storage types

    • Success/failure rates per storage type

    • Metadata upload success tracking

  • Redis pub/sub notifications via transfer:overview channel:

    • long_term_archive_transfer_created

    • long_term_archive_transfer_scheduled

    • long_term_archive_transfer_completed

    • long_term_archive_transfer_failed

    • long_term_archive_transfer_reset

Important Notes:

  • This stage changes state to ARCHIVED

  • ARCHIVED state unlocks deletion of buffer copies in Stage 7

  • Packages remain as tar.gz archives in LTA (not extracted)

  • S3 metadata enables data discovery without database queries

  • Extended IVOA metadata supports astronomical data portals

  • Multiple transfers can be created for the same package to different LTA sites

  • Transfer logs provide complete audit trail for compliance

Stage 6: Staging (On-Demand)#

Purpose: Make archived data available for scientific processing by unpacking RawDataPackage archives

Manager: ccat_data_transfer.staging_manager

Key Function: process_staging_jobs()

Worker Task: stage_data_task()

Trigger: Scientists create StagingJob records via ops-db-ui, or workflow-manager (To be implemented) creates a StagingJob record in the database.

Process:

  1. Job Creation (by scientist via UI):

    User requests data through web interface:

    • Select observations, date ranges, or specific packages

    • Choose destination DataLocation (type: PROCESSING)

    • Specify origin DataLocation (typically LTA)

    • Submit staging request

    Database: StagingJob record created with:

  2. Discovery: Manager queries for pending jobs via _get_pending_staging_jobs():

    jobs = session.query(StagingJob).filter(
        StagingJob.status == Status.PENDING
    ).all()
    
    • No active flag check in actual code

    • Processes all pending jobs regardless of creation time

  3. Task Scheduling: Via _process_staging_job():

    • Status: PENDINGSCHEDULED

    • Queue routing via route_task_by_location() based on destination PROCESSING location

    • Ensures worker executes on host where files will be staged

    • Single task handles all packages in the job

  4. Worker Execution: Via _stage_data_internal(), processes each package independently:

    For each RawDataPackage in the job:

    1. Skip Check: Via _check_existing_copies()

      • Check if package already staged (status: STAGED)

      • Verify physical file presence if status is PRESENT

      • Fix inconsistent database records (file missing but status PRESENT)

    2. Source Location: Via _get_physical_copy()

      • Query RawDataPackagePhysicalCopy at origin location

      • No “locate source” step - origin known from job configuration

      • Validate copy status is PRESENT

    3. Destination Path Construction: Via _construct_destination_path()

      • Temporary package path: destination_location.path / raw_data_packages / {filename}

      • Final extraction path: destination_location.path (root directory)

      • Preserves hierarchical structure from source (e.g., CHAI/LFA/filename.fits)

    4. Download Package: Via _execute_polymorphic_copy()

      Based on origin storage type:

      S3 → Disk: Primary staging path

      • boto3 method: Via _execute_boto3_s3_download()

      • Coscine method: Via _execute_coscine_s3_download()

        • Uses Coscine API client

        • Replaces colons in object keys (same as upload)

        • Two strategies:

          • Direct file download if API supports: resource.file(key).download()

          • Fallback: Download entire resource, extract specific file

      Disk → Disk: Future support (commented out)

      • Local copy: shutil.copy()

      • Remote copy: scp command

      Tape → Disk: Planned (raises NotImplementedError)

    5. Unpack Archive: Via _unpack_file()

      • Validate file is tar.gz format

      • List tar contents for debugging

      • Extract directly to final destination: tar -xzf {package} -C {destination} --overwrite

      • Preserves full hierarchical structure (e.g., CHAI/LFA/filename.fits)

      • Overwrites existing files if present

    6. Verification: Via _check_raw_data_files()

      • For each RawDataFile in package:

        • Check file exists at: destination_path / file.relative_path

        • Validate complete hierarchical path

      • Raise ValueError if any files missing

      • No checksum verification (assumes transfer integrity from S3)

    7. Create File Physical Copies: Via _create_raw_data_file_physical_copies()

    8. Mark Package as Staged: Via _mark_package_as_staged_and_cleanup()

      • Create or update RawDataPackagePhysicalCopy

      • Set status: STAGED

      • Delete temporary tar.gz file from raw_data_packages/ directory

      • STAGED status means: files extracted, tar.gz removed

    9. Per-Package Error Handling:

      • Errors logged, marked in package_results dictionary

      • Job continues with next package (doesn’t fail entire job)

      • Partial success possible

  5. Job Completion: After all packages processed:

    • If all packages successful: StagingJob.statusCOMPLETED

    • If any packages failed: StagingJob.statusFAILED

    • Record start_time and end_time

    • Failed package IDs stored in failure_error_message

  6. Scientist Access:

    Data now available for processing:

    • Files unpacked with hierarchical structure preserved

    • No tar.gz archives in working directory (deleted after extraction)

    • Individual files tracked via RawDataFilePhysicalCopy

    • Ready for pipeline processing

Database Changes:

  • status: PENDINGSCHEDULEDCOMPLETED/FAILED

  • RawDataPackagePhysicalCopy created at PROCESSING location:

    • Status: STAGED

    • Indicates files extracted, package archive removed

  • RawDataFilePhysicalCopy created for each file:

    • Status: PRESENT

    • Enables individual file tracking and deletion

  • Start and end times recorded on staging job

  • Failure messages capture failed package IDs

Storage Type Support:

S3 → Disk (Primary path):

  • boto3 (default):

    • Direct Python SDK download

    • Uses get_s3_client() for configuration

    • Constructs consistent S3 keys

    • Suitable for AWS S3, MinIO, compatible storage

  • Coscine (RDM platform):

    • Research Data Management platform integration

    • API-based download

    • Fallback strategies for API limitations

    • Configuration: COSCINE_API_TOKEN, COSCINE_PROJECT, COSCINE_RESOURCE

Disk → Disk (Planned):

  • Local: shutil.copy()

  • Remote: scp command

  • Currently commented out

Tape → Disk (Future):

  • Raises NotImplementedError

  • Placeholder for tape library integration

Key Implementation Details:

  • Per-package processing: Each package processed independently with error isolation

  • Hierarchical structure: Full relative paths preserved (e.g., CHAI/LFA/filename.fits)

  • Temporary location: Tar.gz downloaded to raw_data_packages/ subdirectory

  • Final location: Files extracted to location root, recreating source structure

  • Archive cleanup: tar.gz files deleted immediately after successful extraction

  • File-level tracking: Individual RawDataFile records enable granular deletion

  • Partial success: Job can complete even if some packages fail

  • Queue routing: Tasks execute on destination host via dynamic routing

Cleanup (Stage 7):

When processing completes:

  • Scientist can manually trigger deletion or set retention period

  • Deletion manager removes RawDataFilePhysicalCopy records

  • Individual files deleted from PROCESSING location

  • Frees space for next staging job

Error Handling & Retry Logic:

  • Base class: StagingTask with max 3 retries

  • On retry: reset_state_on_failure()

  • On permanent failure: mark_permanent_failure()

    • Status: FAILED

    • Error message stored

  • Package-level errors:

    • Package already exists: Skip, mark success

    • Physical copy not found: ValueError, continue with next package

    • Download failure: Exception logged, marked failed, continue

    • Extraction failure: ValueError, marked failed, continue

    • Verification failure: ValueError, marked failed, continue

  • Database inconsistency handling:

    • Status PRESENT but file missing: Fix database, mark as DELETED

    • Status PRESENT but file empty: Fix database, mark as DELETED

Configuration:

  • STAGING_MANAGER_SLEEP_TIME: Manager polling interval

  • s3_method: Choose boto3 (default) or coscine

  • PROCESSING locations defined per site in database

  • No default retention period in code (managed externally)

  • boto3 configuration: Same as Stage 5 (S3 credentials)

  • Coscine configuration: Same as Stage 5 (API token, project, resource)

Monitoring & Observability:

  • Comprehensive debug logging for troubleshooting:

    • Staging job configuration details

    • Per-package processing steps

    • Physical copy lookups and validation

    • File path construction and verification

    • Tar contents listing before extraction

  • Key log events:

    • staging_job_scheduled, Starting staging job

    • s3_download_details, coscine_download_details

    • Unpacking {file} directly into {destination}

    • Successfully verified {n} files

    • Created physical copies for {n} RawDataFiles

    • Marked RawDataPackage as STAGED

  • Error tracking:

    • Error staging package {id}

    • Missing files in destination directory

    • Failed to unpack {file}

  • Metrics integration via track_metrics():

    • Operation type: staging

    • Transfer method: s3

    • Success/failure rates

Important Notes:

  • Files extracted with full hierarchical paths (e.g., CHAI/LFA/filename.fits)

  • Package tar.gz files deleted immediately after extraction to save space

  • STAGED status specifically means: unpacked files present, archive removed

  • Individual file tracking enables granular deletion policies

  • Partial job success possible - some packages can succeed while others fail

  • No checksum verification after staging (trusts S3 integrity)

  • Queue routing ensures task executes on correct host

  • Database inconsistency detection and auto-correction built-in

Workflow Example:

  1. Scientist requests observation XYZ via UI

  2. System creates StagingJob linking to RawDataPackages

  3. Manager schedules task to PROCESSING location worker

  4. Worker downloads each package from S3 LTA

  5. Worker extracts files to preserve hierarchy: PROCESSING/CHAI/LFA/*.fits

  6. Worker deletes downloaded tar.gz files

  7. Worker creates physical copy records for individual files

  8. Scientist accesses files at PROCESSING/CHAI/LFA/*.fits

  9. After processing, deletion manager removes files based on retention policy

Stage 7: Deletion and Cleanup#

Purpose: Remove temporary files and manage storage capacity across all DataLocation types

Manager: ccat_data_transfer.deletion_manager

Main Function: delete_data_packages()

Sub-Managers: The main function orchestrates multiple specialized deletion processes:

  • delete_data_transfer_packages(): Cleanup transfer archives

  • delete_raw_data_packages_bulk(): Bulk package deletion

  • delete_processing_raw_data_files(): Processing file cleanup

  • delete_staged_raw_data_files_from_processing(): Staged file cleanup

  • process_deletion_possible_raw_data_files(): Conditional file deletion

Worker Tasks:

  • delete_physical_copy(): Single file/package deletion

  • delete_bulk_raw_data_files(): Bulk file deletion

  • delete_bulk_raw_data_packages(): Bulk package deletion

Deletion Policies by Location Type:

SOURCE Locations:

  • RawDataFiles: Marked as DELETION_POSSIBLE when parent RawDataPackage deleted

  • Condition: Parent package must exist in at least one LTA location

  • Two-phase process:

    1. RawDataPackage deleted → RawDataFiles marked DELETION_POSSIBLE

    2. DELETION_POSSIBLE files evaluated based on buffer status and retention

  • Safety: Never delete until verified copies exist in LTA

BUFFER Locations:

At SOURCE Sites (sites with SOURCE locations):

  • RawDataPackages:

    • Delete when exists in any LTA LONG_TERM_ARCHIVE location

    • Checked via can_delete_raw_data_package_from_source_buffer()

  • DataTransferPackages:

    • Delete when ALL transfers from this buffer are completed AND unpacked

    • At least one transfer must reach LTA site buffer

    • Checked via can_delete_data_transfer_package_from_source_buffer()

At LTA Sites (sites with LONG_TERM_ARCHIVE locations):

  • RawDataPackages:

    • Delete when exists in LTA location at same site

    • Checked via can_delete_raw_data_package_from_lta_buffer()

  • DataTransferPackages:

    • Delete when synced to ALL other LTA site buffers

    • Uses secondary route discovery for multi-LTA synchronization

    • Checked via can_delete_data_transfer_package_from_lta_buffer()

PROCESSING Locations:

  • RawDataFiles (from active staging):

    • Delete when no active StagingJob references the file

    • Checked per file via find_deletable_processing_raw_data_files()

  • RawDataFiles (from completed staging):

    • Delete when parent package has status STAGED

    • All staging jobs for package must be inactive (active=False)

    • Checked per package via find_deletable_staged_raw_data_files_by_location()

  • RawDataPackages: Marked as STAGED (not deleted, files extracted)

Process Overview:

  1. Discovery Phase: Each sub-manager finds eligible items for deletion

    1. DataTransferPackages: Via find_deletable_data_transfer_packages()

      • Query all PRESENT DataTransferPackagePhysicalCopy records

      • Check no pending unpack operations

      • Apply location-specific deletion conditions

      • Group by location

    2. RawDataPackages: Via find_deletable_raw_data_packages_by_location()

      • Query all PRESENT RawDataPackagePhysicalCopy records

      • Apply site-specific deletion conditions

      • Group by location

      • Returns dictionary: {DataLocation: [RawDataPackage, ...]}

    3. Processing RawDataFiles: Via find_deletable_processing_raw_data_files()

      • Find files in PROCESSING locations

      • Check no active staging jobs reference the file

    4. Staged Processing RawDataFiles: Via find_deletable_staged_raw_data_files_by_location()

      • Find packages with status STAGED

      • Verify all staging jobs inactive

      • Return all RawDataFile physical copies for those packages

      • Group by location

  2. Marking Phase: Mark eligible physical copies for deletion

    • Update status: PRESENTDELETION_SCHEDULED

    • Use with_for_update() for row-level locking

    • Commit per location to avoid long-held locks

  3. Task Scheduling: Schedule deletion tasks with dynamic queue routing

    Single Item Deletion: delete_physical_copy()

    Bulk Deletion: delete_bulk_raw_data_files() or delete_bulk_raw_data_packages()

    • Used for RawDataPackagePhysicalCopy and RawDataFilePhysicalCopy

    • Processes entire list (no 1000-item limit mentioned in docs)

    • Queue: route_task_by_location(OperationType.DELETION, location)

    • Separate bulk tasks per location (handles multi-site SOURCE/BUFFER)

  4. Worker Execution: Deletion task performs actual file removal

    1. Polymorphic Type Resolution: Via DeletionTask base class

    2. Status Verification:

      • Verify status is DELETION_SCHEDULED

      • Log warning and skip if unexpected state

      • Mark as DELETION_IN_PROGRESS

    3. File Deletion: Based on storage_type

      Disk (DiskDataLocation):

      if os.path.exists(physical_copy.full_path):
          os.remove(physical_copy.full_path)
      else:
          # Already deleted, continue
          pass
      

      S3 (S3DataLocation):

      s3_client = get_s3_client()
      s3_client.delete_object(
          Bucket=physical_copy.data_location.bucket_name,
          Key=physical_copy.full_path
      )
      

      Tape (TapeDataLocation):

      • Not implemented - just marks as deleted

      • Logs warning about unimplemented deletion

    4. Database Update:

      • Status: DELETION_IN_PROGRESSDELETED

      • Set deleted_at: Current timestamp (Berlin timezone)

      • Commit transaction

    5. Bulk Operation Details: For bulk tasks

      • Process each physical copy ID in the list

      • Continue on individual failures (don’t fail entire batch)

      • Track success/failure counts

      • Increment attempt_count on failure

      • Reset status to PRESENT on failure for retry

  5. RawDataFile Cascade Deletion: Special handling for SOURCE locations

    When RawDataPackage deleted from SOURCE:

    1. Via mark_raw_data_files_for_deletion():

      • Bulk update all RawDataFile physical copies at SOURCE

      • Status: PRESENTDELETION_POSSIBLE

      • Uses bulk SQL update (no loop for performance)

    2. Via process_deletion_possible_raw_data_files():

      • Query files with status DELETION_POSSIBLE

      • Check buffer status from Redis

      • Apply retention and disk threshold logic

      • Schedule deletion if conditions met

Database Changes:

  • status: PRESENTDELETION_SCHEDULEDDELETION_IN_PROGRESSDELETED

  • deleted_at: Timestamp recorded (Berlin timezone)

  • attempt_count: Incremented on failure

  • DeletionLog entries created for audit trail

Key Implementation Details:

  • Polymorphic handling: Must load correct subclass based on type field

  • Row-level locking: Uses with_for_update() to prevent race conditions

  • Location-based batching: Groups deletions by location for proper queue routing

  • Site-aware logic: Different rules for SOURCE sites vs LTA sites

  • Multi-site file deletion: RawDataFiles deleted from their SOURCE location (may differ from package location)

  • Bulk operations: No hard batch size limit (processes all eligible items)

  • DELETION_POSSIBLE state: Intermediate state for conditional deletion

  • STAGED handling: Special status for unpacked packages in processing

  • Two-phase SOURCE deletion: Package deleted → files marked → files evaluated

  • Error isolation: Bulk operations continue despite individual failures

Site Classification:

SOURCE Site: Via is_source_site()

  • Has at least one location with location_type = SOURCE

  • Example: Telescope site in Chile

LTA Site: Via is_lta_site()

  • Has at least one location with location_type = LONG_TERM_ARCHIVE

  • Example: Data centers in Germany, USA

Disk Monitoring Integration:

  • Buffer status stored in Redis: buffer_monitor:{location_name}

  • Queried via get_buffer_status_for_location()

  • Threshold checking via should_delete_based_on_buffer_status():

    • SOURCE locations: Delete if disk > 80%

    • BUFFER locations: Delete if disk > 85%

  • Disk usage from Redis: disk_usage:{location_name}:percent_used

Configuration:

  • DELETION_MANAGER_SLEEP_TIME: Manager polling interval

  • No hardcoded retention periods in deletion manager code

  • Disk thresholds:

    • SOURCE: 80%

    • BUFFER: 85%

    • (Other thresholds may exist in buffer_monitor module)

  • Deletion policies encoded in deletion condition functions

  • Queue routing: Dynamic per location

Error Handling & Retry Logic:

  • Base class: DeletionTask

  • On individual failure in bulk operation:

    • Log error

    • Increment attempt_count

    • Reset status to PRESENT

    • Continue with next item

  • On retry: reset_state_on_failure()

    • Status: PRESENT

    • Increment attempt_count

    • Clear failure message

    • Add deletion log entry

  • On permanent failure: mark_permanent_failure()

    • Status: FAILED

    • Add final deletion log entry

  • Storage-specific errors:

    • FileNotFoundError: Log as already deleted, continue

    • OSError on disk: Raise, trigger retry

    • S3 errors: Raise, trigger retry

    • Database errors: Rollback, retry operation

Safety Features:

  • Multiple verification steps: Discovery → marking → worker verification

  • Row-level locking: Prevents race conditions

  • Never delete last copy: Conditions check for copies in LTA

  • Require LTA presence: SOURCE/BUFFER deletions only if LTA copy exists

  • Audit trail: DeletionLog records all deletion events

  • Context logging: Extended context via get_physical_copy_context()

  • Polymorphic type safety: Explicit type checking and loading

  • Transaction isolation: Commit per location to limit lock duration

  • Error isolation: Failures don’t stop entire batch

Monitoring & Observability:

  • Structured logging via get_structured_logger()

  • Key log events:

    • Checking if package can be deleted from [location type]

    • Package marked for deletion

    • Scheduled bulk [package/file] deletion

    • Starting physical copy deletion

    • Physical file deleted

    • Bulk [package/file] deletion completed

  • Extended context logging:

    • Package/transfer status

    • Long-term archive transfer details

    • Physical copy verification status

    • Site and location information

  • Redis pub/sub notifications via transfer:overview channel:

    • physical_copy_deletion_in_progress

    • physical_copy_deletion_completed

    • physical_copy_deletion_failed

    • bulk_raw_data_package_deletion_scheduled

    • bulk_raw_data_file_deletion_completed

    • staged_raw_data_files_bulk_deletion_scheduled

  • Bulk operation statistics:

    • successful_deletions, failed_deletions, total_deletions

Important Notes:

  • Deletion logic is site-aware: Different rules for SOURCE vs LTA sites

  • No 1000-item batch limit: Processes all eligible items per location

  • Multi-site coordination: RawDataFiles may be deleted from different hosts than their packages

  • DELETION_POSSIBLE: Intermediate state allows conditional evaluation

  • STAGED: Special package status in PROCESSING (files extracted, archive removed)

  • Two-phase SOURCE deletion: Enables retention policy and disk threshold evaluation

  • Queue routing: Every deletion task routed to correct worker via location

  • Storage polymorphism: Supports Disk, S3, Tape (placeholder) deletion

  • Transaction scope: Per-location commits avoid database lock contention

  • Error resilience: Individual failures don’t stop bulk operations

Workflow Examples:

Example 1: RawDataPackage deletion from SOURCE site buffer:

  1. Package successfully archived to LTA location in Germany

  2. Deletion manager discovers package in SOURCE site buffer (Chile)

  3. Checks: Package exists in LTA? Yes → Can delete

  4. Marks RawDataPackagePhysicalCopy as DELETION_SCHEDULED

  5. Marks all RawDataFile physical copies as DELETION_POSSIBLE

  6. Schedules bulk package deletion task to Chile worker

  7. Schedules bulk file deletion task to SOURCE location worker (may be different)

  8. Worker deletes package tar.gz file

  9. Evaluates DELETION_POSSIBLE files based on buffer status

  10. Deletes eligible RawDataFiles

Example 2: Staged files cleanup from PROCESSING:

  1. Scientist’s staging job completes, marked inactive

  2. Deletion manager finds STAGED package in PROCESSING

  3. Verifies no active staging jobs reference the package

  4. Schedules bulk deletion of all RawDataFile physical copies

  5. Worker deletes individual files (preserving directory structure)

  6. RawDataPackagePhysicalCopy remains as STAGED (archive already removed)

Example 3: DataTransferPackage from LTA site buffer:

  1. DataTransferPackage successfully transferred to all other LTA sites

  2. Deletion manager checks secondary routes

  3. Verifies completed and unpacked transfers to all expected LTA sites

  4. Marks DataTransferPackagePhysicalCopy as DELETION_SCHEDULED

  5. Schedules single deletion task to LTA site worker

  6. Worker deletes transfer package archive

Pipeline Coordination#

State Transitions

RawDataPackage move through states as pipeline progresses:

WAITING → TRANSFERRING → ARCHIVED (→ FAILED)

Triggers:
• WAITING: Package exists at SOURCE only
• TRANSFERRING: Added to DataTransferPackage
• ARCHIVED: Exists at LTA location
• FAILED: "failed" - any status failed

Database as Coordinator

The database serves as the coordination point:

  • Managers query for work

  • Workers update results

  • State changes trigger next stage

  • Transactions ensure consistency

Manager Sleep Cycles

Managers run in loops:

while True:
    try:
        process_work(session)
    except Exception as e:
        log_error(e)
    finally:
        time.sleep(MANAGER_SLEEP_TIME)

Sleep times balance:

  • Responsiveness (short sleep → faster reactions)

  • Database load (long sleep → less querying)

Parallel Execution

Multiple stages run concurrently:

  • Stage 1 packages new files

  • Stage 2 creates data transfer packages

  • Stage 3 transfers packages

  • Stage 4 unpacks packages

  • Stage 5 archives packages

  • Stage 6 stages packages

  • Stage 7 deletes old packages

This pipeline parallelism maximizes throughput.

Error Recovery#

Resumability

All operations are designed to be resumable:

  • Database records track state

  • Workers check current state before acting

  • Partial progress preserved

  • Retrying is safe (idempotent where possible)

Retry Logic

ccat_data_transfer.setup_celery_app.CCATBaseTask provides:

  • Automatic retry with exponential backoff

  • Maximum retry counts per operation

  • Circuit breaker for permanent failures

  • Administrator notifications

Recovery Service

ccat_data_transfer.recovery_service_runner monitors for stalled tasks:

  • Detects missing heartbeats

  • Resets task state for retry

  • Alerts on repeated failures

Next Steps#