Pipeline Architecture ===================== .. verified:: 2025-10-17 :reviewer: Christof Buchbender The Data Transfer System implements a 7-stage pipeline that moves data from telescope to long-term archive. Each stage is independent but coordinates through database state and Celery tasks. Pipeline Overview ----------------- .. code-block:: text Stage 0: Instrument Filing ───────────────────────────── Instruments create RawDataFile records via ops-db-api Files physically written to SOURCE locations ↓ Stage 1: Raw Data Package Creation ─────────────────────────────────── Group related files into RawDataPackages Create tar archives at SOURCE locations Move to BUFFER locations ↓ Stage 2: Data Transfer Package Assembly ──────────────────────────────────────── Aggregate RawDataPackages into DataTransferPackages Optimize for network transfer efficiency Create transfer archives at BUFFER locations ↓ Stage 3: Inter-Site Transfer ───────────────────────────── Transfer DataTransferPackages between sites Uses BBCP for high-performance parallel transfer Round-robin distribution to LTA sites ↓ Stage 4: Unpacking and Verification ──────────────────────────────────── Extract archives at destination BUFFERs Verify checksums of all extracted files Create PhysicalCopy records ↓ Stage 5: Long-Term Archive ────────────────────────── Move RawDataPackages from BUFFER to LTA locations Generate metadata sidecar files Mark packages as ARCHIVED ↓ Stage 6: Staging (On-Demand) ───────────────────────────── Scientists request data for processing System stages from LTA to PROCESSING locations Data cleaned up after processing completes ↓ Stage 7: Deletion and Cleanup ────────────────────────────── Remove temporary files from SOURCE and BUFFER Delete completed DataTransferPackages Apply retention policies to PROCESSING locations Stage Details ------------- Stage 0: Instrument Filing ~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Purpose**: Data enters the system **Actors**: Instruments (Prime-Cam, CHAI), instrument control software **Process**: 1. Instrument completes observation, writes files to disk 2. Instrument software calls :doc:`/ops-db-api/docs/index` to create :py:class:`~ccat_ops_db.models.RawDataFile` records 3. Files linked to :py:class:`~ccat_ops_db.models.ExecutedObsUnit` (observation) and :py:class:`~ccat_ops_db.models.InstrumentModule` 4. Files remain at ``SOURCE`` location (instrument computer) **Database Changes**: * :py:class:`~ccat_ops_db.models.RawDataFile` records created * Associated with :py:class:`~ccat_ops_db.models.ExecutedObsUnit` and :py:class:`~ccat_ops_db.models.InstrumentModule` * No :py:class:`~ccat_ops_db.models.RawDataPackage` yet (that's Stage 1) **API**: :doc:`/ops-db-api/docs/index` endpoints for filing observations **Notes**: * This stage is outside the data-transfer system * Data transfer system discovers these files in :doc:`/pipeline.rst#stage-1-raw-data-package-creation` * Files may arrive continuously or in bursts * Important for instruments to file metadata correctly Stage 1: Raw Data Package Creation ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Purpose**: Group and package raw files for transfer **Manager**: :py:mod:`~ccat_data_transfer.raw_data_package_manager` **Key Function**: :py:func:`~ccat_data_transfer.raw_data_package_manager.raw_data_package_manager_service` **Process**: 1. **Discovery**: Manager queries for :py:class:`~ccat_ops_db.models.RawDataFile` not yet in a :py:class:`~ccat_ops_db.models.RawDataPackage` .. code-block:: sql SELECT * FROM raw_data_file WHERE raw_data_package_id IS NULL AND data_location IN (SOURCE locations) 2. **Grouping**: * Files grouped by :py:class:`~ccat_ops_db.models.ExecutedObsUnit` and :py:class:`~ccat_ops_db.models.InstrumentModule` * Each group becomes one :py:class:`~ccat_ops_db.models.RawDataPackage` 3. **Record Creation**: * Manager creates :py:class:`~ccat_ops_db.models.RawDataPackage` record in database * Links all files to new package * Determines destination buffer for package 4. **Task Submission**: * Celery task submitted to SOURCE location queue * Task: :py:func:`~ccat_data_transfer.raw_data_package_manager.create_raw_data_packages` 5. **Worker Execution**: Worker running at ``SOURCE`` location: * Creates tar archive of all files in :py:class:`~ccat_ops_db.models.RawDataPackage` * Preserves directory structure * Calculates xxHash64 checksum * Moves tar to ``BUFFER`` location (local or remote) * Updates :py:class:`~ccat_ops_db.models.RawDataPackage` record with checksum and :py:class:`~ccat_ops_db.models.Status` **Database Changes**: * :py:class:`~ccat_ops_db.models.RawDataPackage` record created * :py:attr:`~ccat_ops_db.models.RawDataFile.raw_data_package_id` foreign key set on all :py:class:`~ccat_ops_db.models.RawDataFile` records * :py:class:`~ccat_ops_db.models.PhysicalCopy` records created for ``SOURCE`` and ``BUFFER`` locations * :py:class:`~ccat_ops_db.models.RawDataPackage` :py:class:`~ccat_ops_db.models.Status` transitions: PENDING → SCHEDULED → IN_PROGRESS → COMPLETED/FAILED **Key Implementation Details**: * Archive format: TAR (optionally compressed) * Checksum algorithm: xxHash64 (fast for large files) * Transfer method: Local copy or rsync to buffer * Hierarchical path structure preserved in archive **Configuration**: * ``RAW_DATA_PACKAGE_MANAGER_SLEEP_TIME``: How often manager runs * ``DEVELOPMENT_MODE_LOCALHOST_ONLY``: Whether to treat localhost as same host for remote commands * ``VERBOSE``: Whether to enable verbose mode * Buffer selection: Primary buffer per site (by priority) and active flag **Error Handling**: * Archive creation failure: Retry with exponential backoff * Worker crash: Detected by heartbeat system, task resubmitted Stage 2: Data Transfer Package Assembly ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Purpose**: Bundle raw data packages and establish transfer routes **Manager**: :py:mod:`ccat_data_transfer.data_transfer_package_manager` **Key Function**: :py:func:`~ccat_data_transfer.data_transfer_package_manager.create_data_transfer_packages` **Process**: 1. **Discovery**: Manager queries for :py:class:`~ccat_ops_db.models.RawDataPackage` in ``BUFFER`` locations not yet in a :py:class:`~ccat_ops_db.models.DataTransferPackage` using :py:func:`~ccat_data_transfer.data_transfer_package_manager.get_unpackaged_raw_data_packages_in_buffers` 2. **Size-Based Grouping**: Aggregate packages using :py:func:`~ccat_data_transfer.data_transfer_package_manager.group_packages_for_transfer`: * Target size: :py:data:`~ccat_data_transfer.config.settings.MAXIMUM_DATA_TRANSFER_PACKAGE_SIZE_GB` * Packages ≥90% of max size bundled separately * Packages grouped until reaching 90-110% of max size * Sorted largest-first for optimal packing 3. **Package Record Creation**: * Create :py:class:`~ccat_ops_db.models.DataTransferPackage` record (status: ``PENDING``) * Link :py:class:`~ccat_ops_db.models.RawDataPackage` via ``data_transfer_package_id`` * Set :py:attr:`~ccat_ops_db.models.RawDataPackage.state` to ``TRANSFERRING`` 4. **Worker Execution**: Celery task :py:func:`~ccat_data_transfer.data_transfer_package_manager.create_data_transfer_package_task` assembles physical archive: * Status: ``PENDING`` → ``SCHEDULED`` → ``IN_PROGRESS`` → ``COMPLETED`` * Creates tar.gz archive from constituent :py:class:`~ccat_ops_db.models.RawDataPackage` files * Calculates checksum and total size * Creates :py:class:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy` at buffer location 5. **Route Determination & Transfer Seeding**: After package completion, manager creates :py:class:`~ccat_ops_db.models.DataTransfer` records for next stage: **Primary Routes** (``SOURCE`` site → ``LTA`` site): * Discovered via :py:func:`~ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes` * Round-robin selection among ``LTA`` sites using :py:func:`~ccat_data_transfer.data_transfer_package_manager.get_next_lta_site_round_robin` * State tracked in Redis: ``round_robin:source:{site_short_name}`` * Each package sent to exactly one ``LTA`` initially **Secondary Routes** (``LTA`` site → ``LTA`` site): * Discovered via :py:func:`~ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes` * Ensures all ``LTA`` sites eventually receive all data * Created only after primary transfer completes * Also uses round-robin distribution **Database Changes**: * :py:class:`~ccat_ops_db.models.DataTransferPackage` created with metadata and checksum * :py:attr:`~ccat_ops_db.models.RawDataPackage.data_transfer_package_id` links constituent packages * :py:class:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy` records presence in buffer * :py:class:`~ccat_ops_db.models.DataTransfer` records seed :doc:`/pipeline.rst#stage-3-inter-site-transfer` * Package state: ``TRANSFERRING`` (ready for inter-site transfer) **Key Implementation Details**: * Physical tar.gz archives created in ``BUFFER`` by workers routed via :py:func:`~ccat_data_transfer.queue_discovery.route_task_by_location` * Multiple :py:class:`~ccat_ops_db.models.RawDataPackage` archives bundled into single transfer archive * Routes computed dynamically from active site topology * Round-robin ensures balanced distribution across ``LTA`` sites **Configuration**: * ``MAXIMUM_DATA_TRANSFER_PACKAGE_SIZE_GB``: Size threshold for bundling (default: 100 GB) * ``PACKAGE_MANAGER_SLEEP_TIME``: Manager polling interval * Round-robin Redis key: ``round_robin:source:{site_short_name}`` **Error Handling**: * Retry logic: :py:class:`~ccat_data_transfer.data_transfer_package_manager.DataTransferPackageOperations` (max 3 retries) * Failed packages: Status set to ``FAILED``, constituent packages marked ``FAILED`` * Database errors: Transaction rollback, logged via structured logger Stage 3: Inter-Site Transfer ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Purpose**: Transfer assembled packages between sites (SOURCE → LTA, LTA → LTA) **Manager**: :py:mod:`ccat_data_transfer.transfer_manager` **Key Function**: :py:func:`~ccat_data_transfer.transfer_manager.transfer_transfer_packages` **Worker Task**: :py:func:`~ccat_data_transfer.transfer_manager.transfer_files_bbcp` **Process**: 1. **Discovery**: Manager queries for :py:class:`~ccat_ops_db.models.DataTransfer` records with status ``PENDING`` using :py:func:`~ccat_data_transfer.transfer_manager._get_pending_transfers` * These records were created in Stage 2 by the data transfer package manager * Each represents a route from origin to destination location 2. **Buffer State Check**: Before scheduling transfers, consult :py:class:`~ccat_data_transfer.buffer_manager.BufferManager`: * ``can_create_data()``: If buffer in emergency state, postpone transfer * ``get_max_parallel_transfers()``: Adjust concurrency based on buffer health * Prevents overwhelming destination storage during buffer pressure 3. **Transfer Method Filtering**: Filter for supported methods using :py:func:`~ccat_data_transfer.transfer_manager._filter_supported_transfers`: * Currently supports: ``bbcp`` (Bulk Block Copy Protocol) * Unsupported methods logged as errors * Configuration: :py:data:`~ccat_data_transfer.config.settings.SUPPORTED_DATA_TRANSFER_METHODS` 4. **Task Scheduling**: For each supported transfer: * Status: ``PENDING`` → ``SCHEDULED`` * Queue routing via :py:func:`~ccat_data_transfer.queue_discovery.route_task_by_location` based on :py:attr:`~ccat_ops_db.models.DataTransfer.origin_location` * Rate limiting applied if buffer state requires reduced parallelism * Celery task :py:func:`~ccat_data_transfer.transfer_manager.transfer_files_bbcp` dispatched 5. **Worker Execution**: Celery task performs actual transfer: a. **URL Construction**: Build source/destination URLs via :py:func:`~ccat_data_transfer.transfer_manager._construct_transfer_urls`: * Handles :py:class:`~ccat_ops_db.models.DiskDataLocation` (SSH/local paths) * Handles :py:class:`~ccat_ops_db.models.S3DataLocation` (S3 URLs) * Handles :py:class:`~ccat_ops_db.models.TapeDataLocation` (mount paths) b. **Destination Directory Creation**: Ensure target directory exists: * Local: :py:func:`~ccat_data_transfer.utils.create_local_folder` * Remote: :py:func:`~ccat_data_transfer.utils.create_remote_folder` via SSH c. **Transfer Execution**: Execute bbcp command via :py:func:`~ccat_data_transfer.transfer_manager._execute_bbcp_command`: * Command built by :py:func:`~ccat_data_transfer.utils.make_bbcp_command` * Development mode: Falls back to ``cp`` for localhost transfers * Captures stdout/stderr for metrics and logging * Parses output via :py:func:`~ccat_data_transfer.utils.parse_bbcp_output` d. **Metrics Collection**: Track transfer performance: * Bytes transferred, duration, transfer rates (peak/average) * Number of streams, network errors * Sent to InfluxDB via :py:class:`~ccat_data_transfer.metrics.HousekeepingMetrics` e. **Status Update**: On success: * Status: ``COMPLETED`` * Create :py:class:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy` at destination * Record start/end times **Database Changes**: * :py:attr:`~ccat_ops_db.models.DataTransfer.status`: ``PENDING`` → ``SCHEDULED`` → ``COMPLETED`` * :py:class:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy` created at destination with: * ``status``: :py:attr:`~ccat_ops_db.models.PhysicalCopyStatus.PRESENT` * ``checksum``: Inherited from :py:class:`~ccat_ops_db.models.DataTransferPackage` * :py:attr:`~ccat_ops_db.models.DataTransfer.start_time` and ``end_time`` recorded **Error Handling & Retry Logic**: * Base class: :py:class:`~ccat_data_transfer.transfer_manager.DataTransferTask` with max 3 retries * Specific error recovery: * :py:exc:`~ccat_data_transfer.exceptions.DestinationFileExistsError`: Removes existing file, retries * :py:exc:`~ccat_data_transfer.exceptions.NetworkError`: Retry on connection refused/timeout * :py:exc:`~ccat_data_transfer.exceptions.SegmentationFaultError`: Retry on bbcp crash * :py:exc:`~ccat_data_transfer.exceptions.BBCPError`: Generic bbcp failures * On retry: :py:meth:`~ccat_data_transfer.transfer_manager.DataTransferTask.reset_state_on_failure` * Status reset to ``PENDING`` * Increment :py:attr:`~ccat_ops_db.models.DataTransfer.retry_count` * Clear :py:attr:`~ccat_ops_db.models.DataTransfer.failure_error_message` * On permanent failure: :py:meth:`~ccat_data_transfer.transfer_manager.DataTransferTask.mark_permanent_failure` * Status: ``FAILED`` * Associated :py:attr:`~ccat_ops_db.models.RawDataPackage.state`: ``FAILED`` **Key Implementation Details**: * Polymorphic storage handling: Disk, S3, and Tape locations supported * Dynamic queue routing ensures workers execute on correct hosts * Buffer-aware scheduling prevents storage overflow * Metrics tracked for performance monitoring and troubleshooting * bbcp output logged to :py:class:`~ccat_ops_db.models.BBCPLog` table * Redis pub/sub notifications: ``transfer:overview`` channel for UI updates **Configuration**: * ``SUPPORTED_DATA_TRANSFER_METHODS``: Enabled transfer protocols (default: ``["bbcp"]``) * ``DATA_TRANSFER_WORKERS``: Maximum parallel transfers * ``DEVELOPMENT_MODE``: If ``True``, uses ``cp`` instead of ``bbcp`` for local transfers * ``TRANSFER_MANAGER_SLEEP_TIME``: Manager polling interval * BBCP settings: Configurable via :py:class:`~ccat_data_transfer.bbcp_settings.BBCPSettings` **Monitoring & Observability**: * Transfer metrics sent to InfluxDB with tags: * ``source_location``, ``destination_location``, ``transfer_id`` * ``transfer_method``, bbcp configuration parameters * Performance: ``peak_transfer_rate_mbps``, ``average_transfer_rate_mbps`` * Reliability: ``network_errors``, ``retry_count`` * Structured logging via :py:func:`~ccat_data_transfer.logging_utils.get_structured_logger` * Real-time notifications via Redis pub/sub for monitoring dashboards Stage 4: Unpacking and Verification ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Purpose**: Extract :py:class:`~ccat_ops_db.models.DataTransferPackage` archives and verify constituent :py:class:`~ccat_ops_db.models.RawDataPackage` integrity **Manager**: :py:mod:`ccat_data_transfer.data_integrity_manager` **Key Function**: :py:func:`~ccat_data_transfer.data_integrity_manager.unpack_and_verify_files` **Worker Task**: :py:func:`~ccat_data_transfer.data_integrity_manager.unpack_data_transfer_package` **Process**: 1. **Discovery**: Manager queries for :py:class:`~ccat_ops_db.models.DataTransfer` records with completed transfers needing unpacking using :py:func:`~ccat_data_transfer.data_integrity_manager._get_pending_unpackpings`: .. code-block:: python transfers = session.query(DataTransfer).filter( DataTransfer.status == Status.COMPLETED, DataTransfer.unpack_status == Status.PENDING ).all() * Transfer must be completed (archive successfully transferred) * Unpacking not yet attempted or previously failed and reset for retry 2. **Task Scheduling**: For each pending unpack: * Status: :py:attr:`~ccat_ops_db.models.DataTransfer.unpack_status` → ``SCHEDULED`` * Queue routing via :py:func:`~ccat_data_transfer.queue_discovery.route_task_by_location` based on :py:attr:`~ccat_ops_db.models.DataTransfer.destination_location` * Ensures worker executes on host where archive resides * Celery task :py:func:`~ccat_data_transfer.data_integrity_manager.unpack_data_transfer_package` dispatched 3. **Worker Execution**: Task performs unpacking and verification: a. **Path Resolution**: Determine archive location via :py:func:`~ccat_data_transfer.data_integrity_manager._get_paths`: * Supports :py:class:`~ccat_ops_db.models.DiskDataLocation` (currently) * Archive path: ``destination_location.path / data_transfer_package.relative_path`` * Extraction destination: ``destination_location.path`` (base directory) b. **Directory Creation**: Ensure extraction destination exists via :py:func:`~ccat_data_transfer.utils.create_local_folder` c. **Archive Extraction**: Unpack using :py:func:`~ccat_data_transfer.utils.unpack_local`: * Extracts all constituent :py:class:`~ccat_ops_db.models.RawDataPackage` tar.gz files * Preserves directory structure from :py:attr:`~ccat_ops_db.models.RawDataPackage.relative_path` * Single-level unpacking: DataTransferPackage → RawDataPackages (not extracted further) d. **Checksum Verification**: Verify each extracted :py:class:`~ccat_ops_db.models.RawDataPackage` via :py:func:`~ccat_data_transfer.data_integrity_manager._verify_checksums`: * For each RawDataPackage in the transfer package: * Calculate checksum of extracted tar.gz file * Compare with :py:attr:`~ccat_ops_db.models.RawDataPackage.checksum` * Failure triggers :py:exc:`~ccat_data_transfer.exceptions.ChecksumVerificationError` e. **Physical Copy Creation**: On successful verification via :py:func:`~ccat_data_transfer.data_integrity_manager._update_data_transfer_status`: * Create :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` for each RawDataPackage at destination * Status: :py:attr:`~ccat_ops_db.models.PhysicalCopyStatus.PRESENT` * Checksum inherited from :py:class:`~ccat_ops_db.models.RawDataPackage` * **Note**: Does NOT create RawDataFilePhysicalCopy records (files remain inside RawDataPackage archives) 4. **Corruption Handling**: If archive corruption detected via :py:exc:`~ccat_data_transfer.exceptions.ArchiveCorruptionError`, execute cleanup via :py:func:`~ccat_data_transfer.data_integrity_manager._cleanup_corrupted_transfer`: a. **Delete Corrupted Archive at Destination**: * Find :py:class:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy` at destination * Mark :py:attr:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy.deletion_status` as ``SCHEDULED`` * Dispatch :py:func:`~ccat_data_transfer.deletion_manager.delete_physical_copy` task * Wait for deletion completion (5 minute timeout) b. **Conditional Source Cleanup** (Secondary Transfers Only): * Detect secondary transfer: Check if both origin and destination are LTA sites * If secondary transfer: * Delete source DataTransferPackage archive (already replicated elsewhere) * Schedule deletion of source physical copy * If primary transfer: Skip source cleanup (source is original buffer) c. **Deregister RawDataPackages**: * Clear :py:attr:`~ccat_ops_db.models.RawDataPackage.data_transfer_package_id` for all constituent packages * Packages return to unpackaged state for recreation d. **Delete DataTransferPackage Record** (Conditional): * Only if both destination and source (if applicable) deletions successful * Otherwise keep record for retry by automatic cleanup e. **Re-raise Exception**: Trigger task retry system after cleanup **Database Changes**: * :py:attr:`~ccat_ops_db.models.DataTransfer.unpack_status`: ``PENDING`` → ``SCHEDULED`` → ``COMPLETED``/``FAILED`` * :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` created for each constituent package at destination * No :py:class:`~ccat_ops_db.models.RawDataFilePhysicalCopy` created (files remain archived) * :py:attr:`~ccat_ops_db.models.RawDataPackage.state` remains ``TRANSFERRING`` (transitions to ``ARCHIVED`` in Stage 5) **Error Handling & Retry Logic**: * Base class: :py:class:`~ccat_data_transfer.data_integrity_manager.UnpackTask` with max 3 retries * Specific error types: * :py:exc:`~ccat_data_transfer.exceptions.ArchiveCorruptionError`: Cleanup and recreate transfer package * :py:exc:`~ccat_data_transfer.exceptions.ChecksumVerificationError`: Retry unpack operation * :py:exc:`~ccat_data_transfer.exceptions.UnpackError`: Generic extraction failures * :py:exc:`FileNotFoundError`: Missing archive or destination directory * On retry: :py:meth:`~ccat_data_transfer.data_integrity_manager.UnpackTask.reset_state_on_failure` * :py:attr:`~ccat_ops_db.models.DataTransfer.unpack_status` reset to ``PENDING`` * Increment :py:attr:`~ccat_ops_db.models.DataTransfer.unpack_retry_count` * Clear :py:attr:`~ccat_ops_db.models.DataTransfer.unpack_failure_error_message` * Associated RawDataPackages state reset to ``TRANSFERRING`` * On permanent failure: :py:meth:`~ccat_data_transfer.data_integrity_manager.UnpackTask.mark_permanent_failure` * :py:attr:`~ccat_ops_db.models.DataTransfer.unpack_status`: ``FAILED`` * Associated :py:attr:`~ccat_ops_db.models.RawDataPackage.state`: ``FAILED`` **Key Implementation Details**: * **Single-level unpacking**: DataTransferPackage (tar) contains RawDataPackages (tar.gz files), which remain compressed * **Checksum verification at package level**: Verifies RawDataPackage archives, not individual raw data files * **Smart corruption recovery**: Distinguishes primary vs secondary transfers for appropriate cleanup * **Synchronous deletion**: Waits for corruption cleanup to complete before proceeding * **Redis pub/sub notifications**: ``transfer:overview`` channel with events: * ``unpack_scheduled``, ``unpack_completed``, ``unpack_failed`` * ``corrupted_transfer_cleanup`` **Configuration**: * ``DATA_INTEGRITY_MANAGER_SLEEP_TIME``: Manager polling interval * Corruption cleanup timeout: 300 seconds (5 minutes) per deletion task **Monitoring & Observability**: * Structured logging via :py:func:`~ccat_data_transfer.logging_utils.get_structured_logger` * Key log events: * ``pending_unpacks_found``, ``scheduling_unpack`` Stage 5: Long-Term Archive ~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Purpose**: Transfer :py:class:`~ccat_ops_db.models.RawDataPackage` from ``BUFFER`` to permanent ``LTA`` storage **Manager**: :py:mod:`ccat_data_transfer.archive_manager` **Key Function**: :py:func:`~ccat_data_transfer.archive_manager.transfer_raw_data_packages_to_long_term_archive` **Worker Task**: :py:func:`~ccat_data_transfer.archive_manager.send_data_to_long_term_archive` **Process**: 1. **Discovery**: Manager queries for :py:class:`~ccat_ops_db.models.RawDataPackage` in ``BUFFER`` locations needing archival via :py:func:`~ccat_data_transfer.archive_manager._get_pending_new_transfers_to_lta_location`: .. code-block:: python # For each LTA location at each site pending_packages = session.query(RawDataPackage)\ .join(RawDataPackagePhysicalCopy)\ .filter( RawDataPackagePhysicalCopy.data_location_id.in_(buffer_location_ids), RawDataPackagePhysicalCopy.status == PhysicalCopyStatus.PRESENT, ~RawDataPackage.id.in_(existing_lta_transfer_ids) ).distinct().all() * Packages must be physically present in a ``BUFFER`` at the LTA site * No existing :py:class:`~ccat_ops_db.models.LongTermArchiveTransfer` to this LTA location * Site-specific: Each site's buffer can archive to its own LTA locations 2. **Transfer Record Creation**: For each pending package via :py:func:`~ccat_data_transfer.archive_manager._create_long_term_archive_transfer`: * Create :py:class:`~ccat_ops_db.models.LongTermArchiveTransfer` record * Set :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.origin_data_location_id` (source buffer) * Set :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.destination_data_location_id` (LTA location) * Status: ``PENDING`` * Links to :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.site_id` 3. **Task Scheduling**: Via :py:func:`~ccat_data_transfer.archive_manager._schedule_transfer_task`: * Status: ``PENDING`` → ``SCHEDULED`` * Queue routing via :py:func:`~ccat_data_transfer.queue_discovery.route_task_by_location` based on destination LTA location * Ensures worker executes on host with access to both buffer and LTA * Record :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.start_time` 4. **Worker Execution**: Task performs storage-type-specific transfer via :py:func:`~ccat_data_transfer.archive_manager._execute_transfer`: a. **URL Construction**: Build source and destination paths via :py:func:`~ccat_data_transfer.archive_manager._construct_transfer_urls`: * :py:class:`~ccat_ops_db.models.DiskDataLocation`: ``location.path / package.relative_path`` * :py:class:`~ccat_ops_db.models.S3DataLocation`: ``s3://bucket/prefix/package.relative_path`` * :py:class:`~ccat_ops_db.models.TapeDataLocation`: Not yet implemented b. **Transfer Execution**: Based on source → destination storage type combination: **Disk → Disk**: Via :py:func:`~ccat_data_transfer.archive_manager._execute_disk_to_disk_transfer` * Simple ``cp`` command * Create destination directory if needed * Preserves tar.gz archive format **Disk → S3**: Two implementation options: * **boto3** (default): Via :py:func:`~ccat_data_transfer.archive_manager._execute_s3_upload` * Reads entire file into memory * Calculates SHA256 checksum * Uploads with ``ChecksumSHA256`` for integrity verification * Includes comprehensive S3 object metadata (see below) * Uploads separate extended metadata JSON file * **Coscine** (RDM platform): Via :py:func:`~ccat_data_transfer.archive_manager._execute_coscine_s3_upload` * Uses Coscine API client * Uploads to configured Coscine project/resource * Includes Coscine-specific metadata form * Separate extended metadata upload via :py:func:`~ccat_data_transfer.archive_manager._upload_coscine_extended_metadata` **S3 → Disk**: Via :py:func:`~ccat_data_transfer.archive_manager._execute_s3_to_disk_transfer` * Uses ``aws s3 cp`` command * Creates destination directory * Downloads from S3 to local filesystem **S3 → S3**: Not yet implemented (raises ``ValueError``) c. **Metadata Generation**: For S3 destinations, comprehensive metadata created: **Object Metadata** (S3 headers): Via :py:func:`~ccat_data_transfer.archive_manager._get_s3_metadata` * Essential discovery keys: * ``obs_dataset_id``: Unique observation identifier * ``obs_collection``: Temporal collection (e.g., ``CCAT-2024-Q3``) * ``obs_content_type``: Always ``raw_data_package`` * Core provenance: * ``obs_telescope``, ``obs_instrument``, ``obs_date_obs`` * ``obs_program_id``, ``obs_subprogram_id`` * Target information: * ``obs_target_name``, ``obs_ra_deg``, ``obs_dec_deg`` * File integrity: * ``file_xxhash64``: Package checksum * ``file_size_bytes``, ``file_record_count`` * Archive management: * ``archive_retention``: ``permanent`` * ``archive_access_class``: ``public`` * ``metadata_path``: Link to extended metadata JSON **Extended Metadata** (JSON sidecar): Via :py:func:`~ccat_data_transfer.archive_manager._generate_ivoa_metadata` * IVOA-compatible structure for astronomical data discovery * Comprehensive nested document with: * **Identifiers**: ``dataset_id``, ``publisher_did`` (IVOA URI), ``ivoa_collection`` * **Facility**: Observatory, telescope, instrument details with coordinates * **Target**: Name, type, coordinates (decimal + sexagesimal), frame, epoch * **Observation**: Program info, timing, duration * **Contents**: File listing with individual file checksums and sizes * **Quality**: Observation status and quality metrics * **Processing**: Pipeline name and version * **Custom**: Instrument-specific metadata from :py:class:`~ccat_ops_db.models.RawDataPackageMetadata` * Uploaded as separate JSON file: ``{package_name}_metadata.json`` * Stored alongside data package for data discovery systems d. **Checksum Verification**: SHA256 for S3 uploads (handled by S3 API) e. **Metrics Collection**: Send to InfluxDB via :py:class:`~ccat_data_transfer.metrics.HousekeepingMetrics`: * ``operation``: ``long_term_archive_transfer`` or ``s3_upload`` * Transfer rate (MB/s), file size, duration * Source/destination locations, transfer method * Success/failure status 5. **Completion**: Via :py:func:`~ccat_data_transfer.archive_manager._mark_transfer_successful`: * Create :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` at LTA location * Set :py:attr:`~ccat_ops_db.models.RawDataPackagePhysicalCopy.status`: ``PRESENT`` * Set :py:attr:`~ccat_ops_db.models.RawDataPackagePhysicalCopy.verified_at`: Current timestamp * Update :py:attr:`~ccat_ops_db.models.RawDataPackage.state`: ``ARCHIVED`` * Set :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.status`: ``COMPLETED`` * Record :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.end_time` * Add success log via :py:func:`~ccat_data_transfer.archive_manager._add_transfer_log` **Database Changes**: * :py:class:`~ccat_ops_db.models.LongTermArchiveTransfer` created linking package to destination * :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.status`: ``PENDING`` → ``SCHEDULED`` → ``COMPLETED``/``FAILED`` * :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` created at LTA location * :py:attr:`~ccat_ops_db.models.RawDataPackage.state`: ``TRANSFERRING`` → ``ARCHIVED`` * :py:class:`~ccat_ops_db.models.LongTermArchiveTransferLog` entries track progress * Start and end times recorded on transfer **Storage-Specific Implementation**: **Disk LTA** (:py:class:`~ccat_ops_db.models.DiskDataLocation`): * Simple file copy (``cp`` command) * Preserves tar.gz archive format * Instant access for retrieval * Lower redundancy than S3 **S3 LTA** (:py:class:`~ccat_ops_db.models.S3DataLocation`): * **Primary Method** (boto3): * Direct Python SDK upload * SHA256 integrity verification built-in * Rich S3 object metadata (32 custom key-value pairs) * Extended IVOA metadata as separate JSON object * Suitable for AWS S3, MinIO, or compatible storage * **Alternative Method** (Coscine): * Research Data Management platform integration * Coscine API for uploads * Metadata mapped to Coscine's schema * Suitable for institutional RDM compliance * Configuration: ``COSCINE_API_TOKEN``, ``COSCINE_PROJECT``, ``COSCINE_RESOURCE`` * Object lifecycle policies (configured in S3, not in code) * Future: Automatic Glacier transitions for cost optimization **Tape LTA** (:py:class:`~ccat_ops_db.models.TapeDataLocation`): * Not yet implemented * Future: Write to tape library, update catalog * Slowest retrieval but lowest cost per TB **Error Handling & Retry Logic**: * Base class: :py:class:`~ccat_data_transfer.archive_manager.LongTermArchiveTask` with max 3 retries * On retry: :py:meth:`~ccat_data_transfer.archive_manager.LongTermArchiveTask.reset_state_on_failure` * Status reset to ``PENDING`` * Increment :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.attempt_count` * Clear :py:attr:`~ccat_ops_db.models.LongTermArchiveTransfer.failure_error_message`` * Reset :py:attr:`~ccat_ops_db.models.RawDataPackage.state` to ``TRANSFERRING`` * Log retry reason * On permanent failure: :py:meth:`~ccat_data_transfer.archive_manager.LongTermArchiveTask.mark_permanent_failure` * Status: ``FAILED`` * :py:attr:`~ccat_ops_db.models.RawDataPackage.state`: ``FAILED`` * Log final failure with attempt count * Transfer-specific errors: * S3 throttling: Handled by boto3 retry logic * Disk full: Raises exception, requires manual intervention * Checksum mismatch: S3 rejects upload automatically * Network errors: Retry with exponential backoff **Key Implementation Details**: * **Site-specific archiving**: Each site has its own LTA location(s) * **Multi-destination**: Packages can be archived to multiple LTA locations (different sites) * **Transfer tracking**: Separate ``LongTermArchiveTransfer`` record per destination * **Comprehensive logging**: ``LongTermArchiveTransferLog`` table maintains audit trail * **Metadata-rich S3**: Object metadata enables data discovery without downloading * **IVOA compliance**: Extended metadata follows International Virtual Observatory Alliance standards * **Queue routing**: Dynamic routing ensures tasks execute on hosts with proper access * **Metrics integration**: InfluxDB metrics for transfer monitoring and troubleshooting **Configuration**: * ``ARCHIVE_MANAGER_SLEEP_TIME``: Manager polling interval * ``s3_method``: Choose ``boto3`` (default) or ``coscine`` * **boto3 configuration**: * ``s3_endpoint_url``, ``s3_access_key_id``, ``s3_secret_access_key`` * ``s3_region_name`` * Per-location credentials via ``S3DataLocation`` configuration * **Coscine configuration**: * ``COSCINE_API_TOKEN``: API authentication token * ``COSCINE_PROJECT``: Target project identifier * ``COSCINE_RESOURCE``: Target resource identifier * LTA locations defined per site in database * Retention policies configured at S3 bucket level (external to system) **Monitoring & Observability**: * Structured logging via :py:func:`~ccat_data_transfer.logging_utils.get_structured_logger` * Key log events: * ``pending_transfers``, ``scheduling_long_term_archive_transfer`` * ``s3_upload_successful``, ``disk_to_disk_transfer_successful`` * ``long_term_archive_transfer_completed`` * ``metadata_generation``, ``extended_metadata_upload_successful`` * Transfer metrics to InfluxDB: * Transfer rate (MB/s), file size, duration * Source/destination locations and storage types * Success/failure rates per storage type * Metadata upload success tracking * Redis pub/sub notifications via ``transfer:overview`` channel: * ``long_term_archive_transfer_created`` * ``long_term_archive_transfer_scheduled`` * ``long_term_archive_transfer_completed`` * ``long_term_archive_transfer_failed`` * ``long_term_archive_transfer_reset`` **Important Notes**: * This stage changes :py:attr:`~ccat_ops_db.models.RawDataPackage.state` to ``ARCHIVED`` * ``ARCHIVED`` state unlocks deletion of buffer copies in Stage 7 * Packages remain as tar.gz archives in LTA (not extracted) * S3 metadata enables data discovery without database queries * Extended IVOA metadata supports astronomical data portals * Multiple transfers can be created for the same package to different LTA sites * Transfer logs provide complete audit trail for compliance Stage 6: Staging (On-Demand) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Purpose**: Make archived data available for scientific processing by unpacking :py:class:`~ccat_ops_db.models.RawDataPackage` archives **Manager**: :py:mod:`ccat_data_transfer.staging_manager` **Key Function**: :py:func:`~ccat_data_transfer.staging_manager.process_staging_jobs` **Worker Task**: :py:func:`~ccat_data_transfer.staging_manager.stage_data_task` **Trigger**: Scientists create :py:class:`~ccat_ops_db.models.StagingJob` records via ops-db-ui, or workflow-manager (To be implemented) creates a :py:class:`~ccat_ops_db.models.StagingJob` record in the database. **Process**: 1. **Job Creation** (by scientist via UI): User requests data through web interface: * Select observations, date ranges, or specific packages * Choose destination :py:class:`~ccat_ops_db.models.DataLocation` (type: ``PROCESSING``) * Specify origin :py:class:`~ccat_ops_db.models.DataLocation` (typically ``LTA``) * Submit staging request Database: :py:class:`~ccat_ops_db.models.StagingJob` record created with: * :py:attr:`~ccat_ops_db.models.StagingJob.status`: ``PENDING`` * :py:attr:`~ccat_ops_db.models.StagingJob.origin_data_location_id`: Source LTA location * :py:attr:`~ccat_ops_db.models.StagingJob.destination_data_location_id`: Target ``PROCESSING`` location * Linked :py:class:`~ccat_ops_db.models.RawDataPackage` records 2. **Discovery**: Manager queries for pending jobs via :py:func:`~ccat_data_transfer.staging_manager._get_pending_staging_jobs`: .. code-block:: python jobs = session.query(StagingJob).filter( StagingJob.status == Status.PENDING ).all() * No active flag check in actual code * Processes all pending jobs regardless of creation time 3. **Task Scheduling**: Via :py:func:`~ccat_data_transfer.staging_manager._process_staging_job`: * Status: ``PENDING`` → ``SCHEDULED`` * Queue routing via :py:func:`~ccat_data_transfer.queue_discovery.route_task_by_location` based on destination ``PROCESSING`` location * Ensures worker executes on host where files will be staged * Single task handles all packages in the job 4. **Worker Execution**: Via :py:func:`~ccat_data_transfer.staging_manager._stage_data_internal`, processes each package independently: For each :py:class:`~ccat_ops_db.models.RawDataPackage` in the job: a. **Skip Check**: Via :py:func:`~ccat_data_transfer.staging_manager._check_existing_copies` * Check if package already staged (status: ``STAGED``) * Verify physical file presence if status is ``PRESENT`` * Fix inconsistent database records (file missing but status ``PRESENT``) b. **Source Location**: Via :py:func:`~ccat_data_transfer.staging_manager._get_physical_copy` * Query :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` at origin location * No "locate source" step - origin known from job configuration * Validate copy status is ``PRESENT`` c. **Destination Path Construction**: Via :py:func:`~ccat_data_transfer.staging_manager._construct_destination_path` * **Temporary package path**: ``destination_location.path / raw_data_packages / {filename}`` * **Final extraction path**: ``destination_location.path`` (root directory) * Preserves hierarchical structure from source (e.g., ``CHAI/LFA/filename.fits``) d. **Download Package**: Via :py:func:`~ccat_data_transfer.staging_manager._execute_polymorphic_copy` Based on origin storage type: **S3 → Disk**: Primary staging path * **boto3 method**: Via :py:func:`~ccat_data_transfer.staging_manager._execute_boto3_s3_download` * Uses boto3 ``download_file()`` * Constructs S3 key via :py:func:`~ccat_data_transfer.utils.get_s3_key_for_package` * Downloads to temporary package path * **Coscine method**: Via :py:func:`~ccat_data_transfer.staging_manager._execute_coscine_s3_download` * Uses Coscine API client * Replaces colons in object keys (same as upload) * Two strategies: * Direct file download if API supports: ``resource.file(key).download()`` * Fallback: Download entire resource, extract specific file **Disk → Disk**: Future support (commented out) * Local copy: ``shutil.copy()`` * Remote copy: ``scp`` command **Tape → Disk**: Planned (raises ``NotImplementedError``) e. **Unpack Archive**: Via :py:func:`~ccat_data_transfer.staging_manager._unpack_file` * Validate file is tar.gz format * List tar contents for debugging * Extract directly to final destination: ``tar -xzf {package} -C {destination} --overwrite`` * Preserves full hierarchical structure (e.g., CHAI/LFA/filename.fits) * Overwrites existing files if present f. **Verification**: Via :py:func:`~ccat_data_transfer.staging_manager._check_raw_data_files` * For each :py:class:`~ccat_ops_db.models.RawDataFile` in package: * Check file exists at: ``destination_path / file.relative_path`` * Validate complete hierarchical path * Raise ``ValueError`` if any files missing * No checksum verification (assumes transfer integrity from S3) g. **Create File Physical Copies**: Via :py:func:`~ccat_data_transfer.staging_manager._create_raw_data_file_physical_copies` * For each :py:class:`~ccat_ops_db.models.RawDataFile` in package: * Create :py:class:`~ccat_ops_db.models.RawDataFilePhysicalCopy` * Location: Destination PROCESSING location * Status: :py:attr:`~ccat_ops_db.models.PhysicalCopyStatus.PRESENT` * Enables file-level tracking and deletion h. **Mark Package as Staged**: Via :py:func:`~ccat_data_transfer.staging_manager._mark_package_as_staged_and_cleanup` * Create or update :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` * Set status: :py:attr:`~ccat_ops_db.models.PhysicalCopyStatus.STAGED` * **Delete temporary tar.gz file** from ``raw_data_packages/`` directory * ``STAGED`` status means: files extracted, tar.gz removed i. **Per-Package Error Handling**: * Errors logged, marked in ``package_results`` dictionary * Job continues with next package (doesn't fail entire job) * Partial success possible 5. **Job Completion**: After all packages processed: * If all packages successful: ``StagingJob.status`` → ``COMPLETED`` * If any packages failed: ``StagingJob.status`` → ``FAILED`` * Record :py:attr:`~ccat_ops_db.models.StagingJob.start_time` and ``end_time`` * Failed package IDs stored in :py:attr:`~ccat_ops_db.models.StagingJob.failure_error_message` 6. **Scientist Access**: Data now available for processing: * Files unpacked with hierarchical structure preserved * No ``tar.gz`` archives in working directory (deleted after extraction) * Individual files tracked via :py:class:`~ccat_ops_db.models.RawDataFilePhysicalCopy` * Ready for pipeline processing **Database Changes**: * :py:attr:`~ccat_ops_db.models.StagingJob.status`: ``PENDING`` → ``SCHEDULED`` → ``COMPLETED``/``FAILED`` * :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` created at PROCESSING location: * Status: :py:attr:`~ccat_ops_db.models.PhysicalCopyStatus.STAGED` * Indicates files extracted, package archive removed * :py:class:`~ccat_ops_db.models.RawDataFilePhysicalCopy` created for each file: * Status: :py:attr:`~ccat_ops_db.models.PhysicalCopyStatus.PRESENT` * Enables individual file tracking and deletion * Start and end times recorded on staging job * Failure messages capture failed package IDs **Storage Type Support**: **S3 → Disk** (Primary path): * **boto3** (default): * Direct Python SDK download * Uses :py:func:`~ccat_data_transfer.utils.get_s3_client` for configuration * Constructs consistent S3 keys * Suitable for AWS S3, MinIO, compatible storage * **Coscine** (RDM platform): * Research Data Management platform integration * API-based download * Fallback strategies for API limitations * Configuration: ``COSCINE_API_TOKEN``, ``COSCINE_PROJECT``, ``COSCINE_RESOURCE`` **Disk → Disk** (Planned): * Local: ``shutil.copy()`` * Remote: ``scp`` command * Currently commented out **Tape → Disk** (Future): * Raises ``NotImplementedError`` * Placeholder for tape library integration **Key Implementation Details**: * **Per-package processing**: Each package processed independently with error isolation * **Hierarchical structure**: Full relative paths preserved (e.g., ``CHAI/LFA/filename.fits``) * **Temporary location**: Tar.gz downloaded to ``raw_data_packages/`` subdirectory * **Final location**: Files extracted to location root, recreating source structure * **Archive cleanup**: ``tar.gz`` files deleted immediately after successful extraction * **File-level tracking**: Individual :py:class:`~ccat_ops_db.models.RawDataFile` records enable granular deletion * **Partial success**: Job can complete even if some packages fail * **Queue routing**: Tasks execute on destination host via dynamic routing **Cleanup** (Stage 7): When processing completes: * Scientist can manually trigger deletion or set retention period * Deletion manager removes :py:class:`~ccat_ops_db.models.RawDataFilePhysicalCopy` records * Individual files deleted from PROCESSING location * Frees space for next staging job **Error Handling & Retry Logic**: * Base class: :py:class:`~ccat_data_transfer.staging_manager.StagingTask` with max 3 retries * On retry: :py:meth:`~ccat_data_transfer.staging_manager.StagingTask.reset_state_on_failure` * Status reset to ``PENDING`` * Increment :py:attr:`~ccat_ops_db.models.StagingJob.retry_count` * Store error in :py:attr:`~ccat_ops_db.models.StagingJob.failure_error_message` * On permanent failure: :py:meth:`~ccat_data_transfer.staging_manager.StagingTask.mark_permanent_failure` * Status: ``FAILED`` * Error message stored * Package-level errors: * Package already exists: Skip, mark success * Physical copy not found: ``ValueError``, continue with next package * Download failure: Exception logged, marked failed, continue * Extraction failure: ``ValueError``, marked failed, continue * Verification failure: ``ValueError``, marked failed, continue * Database inconsistency handling: * Status ``PRESENT`` but file missing: Fix database, mark as ``DELETED`` * Status ``PRESENT`` but file empty: Fix database, mark as ``DELETED`` **Configuration**: * ``STAGING_MANAGER_SLEEP_TIME``: Manager polling interval * ``s3_method``: Choose ``boto3`` (default) or ``coscine`` * PROCESSING locations defined per site in database * No default retention period in code (managed externally) * **boto3 configuration**: Same as Stage 5 (S3 credentials) * **Coscine configuration**: Same as Stage 5 (API token, project, resource) **Monitoring & Observability**: * Comprehensive debug logging for troubleshooting: * Staging job configuration details * Per-package processing steps * Physical copy lookups and validation * File path construction and verification * Tar contents listing before extraction * Key log events: * ``staging_job_scheduled``, ``Starting staging job`` * ``s3_download_details``, ``coscine_download_details`` * ``Unpacking {file} directly into {destination}`` * ``Successfully verified {n} files`` * ``Created physical copies for {n} RawDataFiles`` * ``Marked RawDataPackage as STAGED`` * Error tracking: * ``Error staging package {id}`` * ``Missing files in destination directory`` * ``Failed to unpack {file}`` * Metrics integration via :py:func:`~ccat_data_transfer.decorators.track_metrics`: * Operation type: ``staging`` * Transfer method: ``s3`` * Success/failure rates **Important Notes**: * Files extracted with full hierarchical paths (e.g., ``CHAI/LFA/filename.fits``) * Package tar.gz files deleted immediately after extraction to save space * ``STAGED`` status specifically means: unpacked files present, archive removed * Individual file tracking enables granular deletion policies * Partial job success possible - some packages can succeed while others fail * No checksum verification after staging (trusts S3 integrity) * Queue routing ensures task executes on correct host * Database inconsistency detection and auto-correction built-in **Workflow Example**: 1. Scientist requests observation XYZ via UI 2. System creates ``StagingJob`` linking to RawDataPackages 3. Manager schedules task to PROCESSING location worker 4. Worker downloads each package from S3 LTA 5. Worker extracts files to preserve hierarchy: ``PROCESSING/CHAI/LFA/*.fits`` 6. Worker deletes downloaded tar.gz files 7. Worker creates physical copy records for individual files 8. Scientist accesses files at ``PROCESSING/CHAI/LFA/*.fits`` 9. After processing, deletion manager removes files based on retention policy Stage 7: Deletion and Cleanup ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Purpose**: Remove temporary files and manage storage capacity across all :py:class:`~ccat_ops_db.models.DataLocation` types **Manager**: :py:mod:`ccat_data_transfer.deletion_manager` **Main Function**: :py:func:`~ccat_data_transfer.deletion_manager.delete_data_packages` **Sub-Managers**: The main function orchestrates multiple specialized deletion processes: * :py:func:`~ccat_data_transfer.deletion_manager.delete_data_transfer_packages`: Cleanup transfer archives * :py:func:`~ccat_data_transfer.deletion_manager.delete_raw_data_packages_bulk`: Bulk package deletion * :py:func:`~ccat_data_transfer.deletion_manager.delete_processing_raw_data_files`: Processing file cleanup * :py:func:`~ccat_data_transfer.deletion_manager.delete_staged_raw_data_files_from_processing`: Staged file cleanup * :py:func:`~ccat_data_transfer.deletion_manager.process_deletion_possible_raw_data_files`: Conditional file deletion **Worker Tasks**: * :py:func:`~ccat_data_transfer.deletion_manager.delete_physical_copy`: Single file/package deletion * :py:func:`~ccat_data_transfer.deletion_manager.delete_bulk_raw_data_files`: Bulk file deletion * :py:func:`~ccat_data_transfer.deletion_manager.delete_bulk_raw_data_packages`: Bulk package deletion **Deletion Policies by Location Type**: **SOURCE Locations**: * **RawDataFiles**: Marked as ``DELETION_POSSIBLE`` when parent :py:class:`~ccat_ops_db.models.RawDataPackage` deleted * **Condition**: Parent package must exist in at least one LTA location * **Two-phase process**: 1. RawDataPackage deleted → RawDataFiles marked ``DELETION_POSSIBLE`` 2. ``DELETION_POSSIBLE`` files evaluated based on buffer status and retention * **Safety**: Never delete until verified copies exist in LTA **BUFFER Locations**: **At SOURCE Sites** (sites with ``SOURCE`` locations): * **RawDataPackages**: * Delete when exists in any LTA ``LONG_TERM_ARCHIVE`` location * Checked via :py:func:`~ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_source_buffer` * **DataTransferPackages**: * Delete when ALL transfers from this buffer are completed AND unpacked * At least one transfer must reach LTA site buffer * Checked via :py:func:`~ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_source_buffer` **At LTA Sites** (sites with ``LONG_TERM_ARCHIVE`` locations): * **RawDataPackages**: * Delete when exists in LTA location at same site * Checked via :py:func:`~ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_lta_buffer` * **DataTransferPackages**: * Delete when synced to ALL other LTA site buffers * Uses secondary route discovery for multi-LTA synchronization * Checked via :py:func:`~ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_lta_buffer` **PROCESSING Locations**: * **RawDataFiles** (from active staging): * Delete when no active :py:class:`~ccat_ops_db.models.StagingJob` references the file * Checked per file via :py:func:`~ccat_data_transfer.deletion_manager.find_deletable_processing_raw_data_files` * **RawDataFiles** (from completed staging): * Delete when parent package has status ``STAGED`` * All staging jobs for package must be inactive (``active=False``) * Checked per package via :py:func:`~ccat_data_transfer.deletion_manager.find_deletable_staged_raw_data_files_by_location` * **RawDataPackages**: Marked as ``STAGED`` (not deleted, files extracted) **Process Overview**: 1. **Discovery Phase**: Each sub-manager finds eligible items for deletion a. **DataTransferPackages**: Via :py:func:`~ccat_data_transfer.deletion_manager.find_deletable_data_transfer_packages` * Query all ``PRESENT`` :py:class:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy` records * Check no pending unpack operations * Apply location-specific deletion conditions * Group by location b. **RawDataPackages**: Via :py:func:`~ccat_data_transfer.deletion_manager.find_deletable_raw_data_packages_by_location` * Query all ``PRESENT`` :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` records * Apply site-specific deletion conditions * Group by location * Returns dictionary: ``{DataLocation: [RawDataPackage, ...]}`` c. **Processing RawDataFiles**: Via :py:func:`~ccat_data_transfer.deletion_manager.find_deletable_processing_raw_data_files` * Find files in ``PROCESSING`` locations * Check no active staging jobs reference the file d. **Staged Processing RawDataFiles**: Via :py:func:`~ccat_data_transfer.deletion_manager.find_deletable_staged_raw_data_files_by_location` * Find packages with status ``STAGED`` * Verify all staging jobs inactive * Return all :py:class:`~ccat_ops_db.models.RawDataFile` physical copies for those packages * Group by location 2. **Marking Phase**: Mark eligible physical copies for deletion * Update status: ``PRESENT`` → ``DELETION_SCHEDULED`` * Use ``with_for_update()`` for row-level locking * Commit per location to avoid long-held locks 3. **Task Scheduling**: Schedule deletion tasks with dynamic queue routing **Single Item Deletion**: :py:func:`~ccat_data_transfer.deletion_manager.delete_physical_copy` * Used for :py:class:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy` * Queue: ``route_task_by_location(OperationType.DELETION, location)`` **Bulk Deletion**: :py:func:`~ccat_data_transfer.deletion_manager.delete_bulk_raw_data_files` or :py:func:`~ccat_data_transfer.deletion_manager.delete_bulk_raw_data_packages` * Used for :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` and :py:class:`~ccat_ops_db.models.RawDataFilePhysicalCopy` * Processes entire list (no 1000-item limit mentioned in docs) * Queue: ``route_task_by_location(OperationType.DELETION, location)`` * Separate bulk tasks per location (handles multi-site SOURCE/BUFFER) 4. **Worker Execution**: Deletion task performs actual file removal a. **Polymorphic Type Resolution**: Via :py:class:`~ccat_data_transfer.deletion_manager.DeletionTask` base class * Load base :py:class:`~ccat_ops_db.models.PhysicalCopy` to determine type * Load specific subclass based on ``type`` field: * ``raw_data_file_physical_copy`` → :py:class:`~ccat_ops_db.models.RawDataFilePhysicalCopy` * ``raw_data_package_physical_copy`` → :py:class:`~ccat_ops_db.models.RawDataPackagePhysicalCopy` * ``data_transfer_package_physical_copy`` → :py:class:`~ccat_ops_db.models.DataTransferPackagePhysicalCopy` b. **Status Verification**: * Verify status is ``DELETION_SCHEDULED`` * Log warning and skip if unexpected state * Mark as ``DELETION_IN_PROGRESS`` c. **File Deletion**: Based on :py:attr:`~ccat_ops_db.models.DataLocation.storage_type` **Disk** (:py:class:`~ccat_ops_db.models.DiskDataLocation`): .. code-block:: python if os.path.exists(physical_copy.full_path): os.remove(physical_copy.full_path) else: # Already deleted, continue pass **S3** (:py:class:`~ccat_ops_db.models.S3DataLocation`): .. code-block:: python s3_client = get_s3_client() s3_client.delete_object( Bucket=physical_copy.data_location.bucket_name, Key=physical_copy.full_path ) **Tape** (:py:class:`~ccat_ops_db.models.TapeDataLocation`): * Not implemented - just marks as deleted * Logs warning about unimplemented deletion d. **Database Update**: * Status: ``DELETION_IN_PROGRESS`` → ``DELETED`` * Set :py:attr:`~ccat_ops_db.models.PhysicalCopy.deleted_at`: Current timestamp (Berlin timezone) * Commit transaction e. **Bulk Operation Details**: For bulk tasks * Process each physical copy ID in the list * Continue on individual failures (don't fail entire batch) * Track success/failure counts * Increment :py:attr:`~ccat_ops_db.models.PhysicalCopy.attempt_count` on failure * Reset status to ``PRESENT`` on failure for retry 5. **RawDataFile Cascade Deletion**: Special handling for SOURCE locations When :py:class:`~ccat_ops_db.models.RawDataPackage` deleted from SOURCE: a. Via :py:func:`~ccat_data_transfer.deletion_manager.mark_raw_data_files_for_deletion`: * Bulk update all :py:class:`~ccat_ops_db.models.RawDataFile` physical copies at SOURCE * Status: ``PRESENT`` → ``DELETION_POSSIBLE`` * Uses bulk SQL update (no loop for performance) b. Via :py:func:`~ccat_data_transfer.deletion_manager.process_deletion_possible_raw_data_files`: * Query files with status ``DELETION_POSSIBLE`` * Check buffer status from Redis * Apply retention and disk threshold logic * Schedule deletion if conditions met **Database Changes**: * :py:attr:`~ccat_ops_db.models.PhysicalCopy.status`: ``PRESENT`` → ``DELETION_SCHEDULED`` → ``DELETION_IN_PROGRESS`` → ``DELETED`` * :py:attr:`~ccat_ops_db.models.PhysicalCopy.deleted_at`: Timestamp recorded (Berlin timezone) * :py:attr:`~ccat_ops_db.models.PhysicalCopy.attempt_count`: Incremented on failure * :py:class:`~ccat_ops_db.models.DeletionLog` entries created for audit trail **Key Implementation Details**: * **Polymorphic handling**: Must load correct subclass based on ``type`` field * **Row-level locking**: Uses ``with_for_update()`` to prevent race conditions * **Location-based batching**: Groups deletions by location for proper queue routing * **Site-aware logic**: Different rules for SOURCE sites vs LTA sites * **Multi-site file deletion**: RawDataFiles deleted from their SOURCE location (may differ from package location) * **Bulk operations**: No hard batch size limit (processes all eligible items) * **DELETION_POSSIBLE state**: Intermediate state for conditional deletion * **STAGED handling**: Special status for unpacked packages in processing * **Two-phase SOURCE deletion**: Package deleted → files marked → files evaluated * **Error isolation**: Bulk operations continue despite individual failures **Site Classification**: **SOURCE Site**: Via :py:func:`~ccat_data_transfer.deletion_manager.is_source_site` * Has at least one location with :py:attr:`~ccat_ops_db.models.DataLocation.location_type` = ``SOURCE`` * Example: Telescope site in Chile **LTA Site**: Via :py:func:`~ccat_data_transfer.deletion_manager.is_lta_site` * Has at least one location with :py:attr:`~ccat_ops_db.models.DataLocation.location_type` = ``LONG_TERM_ARCHIVE`` * Example: Data centers in Germany, USA **Disk Monitoring Integration**: * Buffer status stored in Redis: ``buffer_monitor:{location_name}`` * Queried via :py:func:`~ccat_data_transfer.deletion_manager.get_buffer_status_for_location` * Threshold checking via :py:func:`~ccat_data_transfer.deletion_manager.should_delete_based_on_buffer_status`: * SOURCE locations: Delete if disk > 80% * BUFFER locations: Delete if disk > 85% * Disk usage from Redis: ``disk_usage:{location_name}:percent_used`` **Configuration**: * ``DELETION_MANAGER_SLEEP_TIME``: Manager polling interval * **No hardcoded retention periods** in deletion manager code * **Disk thresholds**: * SOURCE: 80% * BUFFER: 85% * (Other thresholds may exist in buffer_monitor module) * Deletion policies encoded in deletion condition functions * Queue routing: Dynamic per location **Error Handling & Retry Logic**: * Base class: :py:class:`~ccat_data_transfer.deletion_manager.DeletionTask` * On individual failure in bulk operation: * Log error * Increment ``attempt_count`` * Reset status to ``PRESENT`` * Continue with next item * On retry: :py:meth:`~ccat_data_transfer.deletion_manager.DeletionTask.reset_state_on_failure` * Status: ``PRESENT`` * Increment ``attempt_count`` * Clear failure message * Add deletion log entry * On permanent failure: :py:meth:`~ccat_data_transfer.deletion_manager.DeletionTask.mark_permanent_failure` * Status: ``FAILED`` * Add final deletion log entry * Storage-specific errors: * ``FileNotFoundError``: Log as already deleted, continue * ``OSError`` on disk: Raise, trigger retry * S3 errors: Raise, trigger retry * Database errors: Rollback, retry operation **Safety Features**: * **Multiple verification steps**: Discovery → marking → worker verification * **Row-level locking**: Prevents race conditions * **Never delete last copy**: Conditions check for copies in LTA * **Require LTA presence**: SOURCE/BUFFER deletions only if LTA copy exists * **Audit trail**: :py:class:`~ccat_ops_db.models.DeletionLog` records all deletion events * **Context logging**: Extended context via :py:func:`~ccat_data_transfer.deletion_manager.get_physical_copy_context` * **Polymorphic type safety**: Explicit type checking and loading * **Transaction isolation**: Commit per location to limit lock duration * **Error isolation**: Failures don't stop entire batch **Monitoring & Observability**: * Structured logging via :py:func:`~ccat_data_transfer.logging_utils.get_structured_logger` * Key log events: * ``Checking if package can be deleted from [location type]`` * ``Package marked for deletion`` * ``Scheduled bulk [package/file] deletion`` * ``Starting physical copy deletion`` * ``Physical file deleted`` * ``Bulk [package/file] deletion completed`` * Extended context logging: * Package/transfer status * Long-term archive transfer details * Physical copy verification status * Site and location information * Redis pub/sub notifications via ``transfer:overview`` channel: * ``physical_copy_deletion_in_progress`` * ``physical_copy_deletion_completed`` * ``physical_copy_deletion_failed`` * ``bulk_raw_data_package_deletion_scheduled`` * ``bulk_raw_data_file_deletion_completed`` * ``staged_raw_data_files_bulk_deletion_scheduled`` * Bulk operation statistics: * ``successful_deletions``, ``failed_deletions``, ``total_deletions`` **Important Notes**: * Deletion logic is **site-aware**: Different rules for SOURCE vs LTA sites * **No 1000-item batch limit**: Processes all eligible items per location * **Multi-site coordination**: RawDataFiles may be deleted from different hosts than their packages * **DELETION_POSSIBLE**: Intermediate state allows conditional evaluation * **STAGED**: Special package status in PROCESSING (files extracted, archive removed) * **Two-phase SOURCE deletion**: Enables retention policy and disk threshold evaluation * **Queue routing**: Every deletion task routed to correct worker via location * **Storage polymorphism**: Supports Disk, S3, Tape (placeholder) deletion * **Transaction scope**: Per-location commits avoid database lock contention * **Error resilience**: Individual failures don't stop bulk operations **Workflow Examples**: **Example 1: RawDataPackage deletion from SOURCE site buffer**: 1. Package successfully archived to LTA location in Germany 2. Deletion manager discovers package in SOURCE site buffer (Chile) 3. Checks: Package exists in LTA? Yes → Can delete 4. Marks RawDataPackagePhysicalCopy as ``DELETION_SCHEDULED`` 5. Marks all RawDataFile physical copies as ``DELETION_POSSIBLE`` 6. Schedules bulk package deletion task to Chile worker 7. Schedules bulk file deletion task to SOURCE location worker (may be different) 8. Worker deletes package tar.gz file 9. Evaluates ``DELETION_POSSIBLE`` files based on buffer status 10. Deletes eligible RawDataFiles **Example 2: Staged files cleanup from PROCESSING**: 1. Scientist's staging job completes, marked inactive 2. Deletion manager finds STAGED package in PROCESSING 3. Verifies no active staging jobs reference the package 4. Schedules bulk deletion of all RawDataFile physical copies 5. Worker deletes individual files (preserving directory structure) 6. RawDataPackagePhysicalCopy remains as STAGED (archive already removed) **Example 3: DataTransferPackage from LTA site buffer**: 1. DataTransferPackage successfully transferred to all other LTA sites 2. Deletion manager checks secondary routes 3. Verifies completed and unpacked transfers to all expected LTA sites 4. Marks DataTransferPackagePhysicalCopy as ``DELETION_SCHEDULED`` 5. Schedules single deletion task to LTA site worker 6. Worker deletes transfer package archive Pipeline Coordination --------------------- **State Transitions** :py:class:`~ccat_ops_db.models.RawDataPackage` move through states as pipeline progresses: .. code-block:: text WAITING → TRANSFERRING → ARCHIVED (→ FAILED) Triggers: • WAITING: Package exists at SOURCE only • TRANSFERRING: Added to DataTransferPackage • ARCHIVED: Exists at LTA location • FAILED: "failed" - any status failed **Database as Coordinator** The database serves as the coordination point: * Managers query for work * Workers update results * State changes trigger next stage * Transactions ensure consistency **Manager Sleep Cycles** Managers run in loops: .. code-block:: python while True: try: process_work(session) except Exception as e: log_error(e) finally: time.sleep(MANAGER_SLEEP_TIME) Sleep times balance: * Responsiveness (short sleep → faster reactions) * Database load (long sleep → less querying) **Parallel Execution** Multiple stages run concurrently: * Stage 1 packages new files * Stage 2 creates data transfer packages * Stage 3 transfers packages * Stage 4 unpacks packages * Stage 5 archives packages * Stage 6 stages packages * Stage 7 deletes old packages This pipeline parallelism maximizes throughput. Error Recovery -------------- **Resumability** All operations are designed to be resumable: * Database records track state * Workers check current state before acting * Partial progress preserved * Retrying is safe (idempotent where possible) **Retry Logic** :py:class:`ccat_data_transfer.setup_celery_app.CCATBaseTask` provides: * Automatic retry with exponential backoff * Maximum retry counts per operation * Circuit breaker for permanent failures * Administrator notifications **Recovery Service** :py:mod:`ccat_data_transfer.recovery_service_runner` monitors for stalled tasks: * Detects missing heartbeats * Resets task state for retry * Alerts on repeated failures Next Steps ---------- * :doc:`routing` - Detailed queue discovery and task routing * :doc:`monitoring` - Health checks, metrics, and recovery systems * :doc:`lifecycle` - In-depth deletion policies and retention management