# Core Concepts ```{eval-rst} .. verified:: 2025-10-16 :reviewer: Christof Buchbender ``` The Data Transfer System is built on several foundational concepts that work together to enable automated, distributed data management. This section explains each concept and how they relate. ## Sites A **Site** represents a physical location where data can be stored and processed. Sites are geographically distributed across the CCAT collaboration. **Current Sites:** - **CCAT (Chile)**: Observatory site where data originates - **Cologne (Germany)**: Primary development and long-term archive site - **Optionally others e.g. Cornell (USA)**: Optional other sites e.g. US-based long-term archive and processing site **Database Model**: {py:class}`ccat_ops_db.models.Site` **Key Attributes:** - `name`: Full site name (e.g., "CCAT Observatory") - `short_name`: Used in queue names (e.g., "ccat", "cologne", "us") - `location`: Geographic information for coordination **Purpose:** Sites group related storage locations and provide geographic context for routing decisions. When data needs to be replicated, the system ensures copies exist at multiple sites for redundancy. ## Data Locations A **DataLocation** represents a specific storage location at a site. Each location has a type that defines its role in the data pipeline. **Database Model**: {py:class}`ccat_ops_db.models.DataLocation` ### Location Types {py:class}`ccat_ops_db.models.LocationType` defines four types: **SOURCE** : Where data originates (e.g. telescope instrument computers) - Raw data files created here by instruments - Packaged and moved to buffers - Examples: `primecam_raw_data`, `chai_raw_data` **BUFFER** : Intermediate staging areas for transfers - Aggregation point before site-to-site transfers - Temporary storage during pipeline processing - Each site typically has one or more buffers - Failover is supported by having multiple buffers with different priorities - Examples: `output_buffer`, `input_buffer` **LONG_TERM_ARCHIVE** : Permanent data storage locations - Final destination for all data - Multiple copies across sites for redundancy - May use S3, tape, or high-capacity disk storage - Supported storage is S3 - Examples: `long_term_archive` **PROCESSING** : Temporary locations for scientific data analysis - Data staged here from archives when needed - Cleaned up after processing completes - Examples: `ramses_processing` ### Storage Technologies Each DataLocation uses a specific storage technology: **DiskDataLocation** : {py:class}`ccat_ops_db.models.DiskDataLocation` - Traditional filesystem storage (local or network-mounted) - Example: `/data/ccat/buffer` on Cologne servers **S3DataLocation** : {py:class}`ccat_ops_db.models.S3DataLocation` - Object storage (e.g. AWS S3 or compatible) - Example: CCAT Project on DataStorage.NRW **TapeDataLocation** : {py:class}`ccat_ops_db.models.TapeDataLocation` - Tape library systems (future capability) - Not currently implemented ### Priority and Failover Multiple locations of the same type can exist at a site: ```python # Example: Multiple buffers at Cologne cologne_buffer_primary (priority=1, active=True) cologne_buffer_secondary (priority=2, active=True) cologne_buffer_backup (priority=3, active=False) ``` The system uses: - **Priority** (lower number = higher priority): Determines which location to use first - **Active** flag: Allows temporarily disabling locations for maintenance Implementation: {py:meth}`ccat_data_transfer.queue_discovery.QueueDiscoveryService.get_primary_buffer_for_site` ## Data Packages Raw data files are grouped into {py:class}`ccat_ops_db.models.RawDataPackage` for efficient transfer and management. ### RawDataFile {py:class}`ccat_ops_db.models.RawDataFile` Individual files created by instruments: - Original filename and size - Checksum for integrity verification - Relationship to observation (ExecutedObsUnit) - Relationship to instrument component (InstrumentModule) ### RawDataPackage {py:class}`ccat_ops_db.models.RawDataPackage` Logical grouping of related raw data files: - Typically all files from one observation - Created at SOURCE locations by {py:mod}`ccat_data_transfer.raw_data_package_manager` - Archived as tar files for efficient storage and transfer - Tracked independently through pipeline stages **States**: {py:class}`ccat_ops_db.models.Status` - `PENDING`: Exists only at source, not yet packaged - `SCHEDULED`: Packaging scheduled for celery task - `IN_PROGRESS`: Packaging in progress, i.e. celery task is running - `COMPLETED`: Packaging completed, i.e. celery task completed successfully - `FAILED`: Packaging failed, i.e. celery task failed ### DataTransferPackage {py:class}`ccat_ops_db.models.DataTransferPackage` Temporary container bundling multiple RawDataPackages for efficient transfer: - Aggregates packages up to a size limit (e.g., 100 GB) - Created at BUFFER locations by {py:mod}`ccat_data_transfer.data_transfer_package_manager` - Tar archive containing multiple RawDataPackage tar files - Deleted after successful unpacking at destination **Purpose**: Optimize network efficiency by amortizing transfer overhead across many packages. ### Physical Copy Tracking {py:class}`ccat_ops_db.models.PhysicalCopy` (and subclasses) Tracks where each file/package physically exists: - Every file in the system has PhysicalCopy records for each location it exists at - Status indicates current state (PRESENT, DELETED, etc.) - Enables complete audit trail of data movement - Used by deletion manager to determine cleanup eligibility Example: ```python # RawDataPackage "obs_001" exists at three locations PhysicalCopy(package=obs_001, location=ccat_buffer, status=DELETED) PhysicalCopy(package=obs_001, location=cologne_lta, status=PRESENT) PhysicalCopy(package=obs_001, location=cornell_lta, status=PRESENT) ``` ## Operations An **Operation** is a unit of work performed by the data transfer system. Operations are defined by {py:class}`ccat_data_transfer.operation_types.OperationType`. ### Operation Types ```{eval-rst} .. list-table:: :header-rows: 1 :widths: 30 50 20 * - Operation - Description - Primary Location * - ``RAW_DATA_PACKAGE_CREATION`` - Package raw files into tar archives - SOURCE * - ``DATA_TRANSFER_PACKAGE_CREATION`` - Bundle packages for transfer - BUFFER * - ``DATA_TRANSFER`` - Move packages between sites - BUFFER * - ``DATA_TRANSFER_UNPACKING`` - Extract transferred archives - BUFFER * - ``LONG_TERM_ARCHIVE_TRANSFER`` - Move to permanent storage - LTA * - ``STAGING`` - Copy to processing locations - PROCESSING * - ``DELETION`` - Remove temporary files - ALL * - ``MONITORING`` - Disk usage and health checks - ALL ``` ### Operation-Location Matrix Not all operations apply to all location types. The system uses {py:data}`ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE` to determine which operations are valid for each location: ```text SOURCE locations: • RAW_DATA_PACKAGE_CREATION • DELETION • MONITORING BUFFER locations: • DATA_TRANSFER_PACKAGE_CREATION • DATA_TRANSFER • DATA_TRANSFER_UNPACKING • DELETION • MONITORING LONG_TERM_ARCHIVE locations: • LONG_TERM_ARCHIVE_TRANSFER • MONITORING PROCESSING locations: • STAGING • DELETION • MONITORING ``` ## Managers **Managers** are Python processes that orchestrate the data pipeline by: 1. Scanning the database for work to be done 2. Creating database records for new operations 3. Submitting Celery tasks to appropriate queues 4. Running in loops with configurable sleep intervals **Key Characteristic**: Managers need database access but NOT direct data access. They can run anywhere (typically centrally in Cologne). ### Manager Examples **Raw Data Package Manager** : {py:mod}`ccat_data_transfer.raw_data_package_manager` - Finds unpackaged RawDataFiles - Groups files by observation - Creates RawDataPackage records - Submits packaging tasks to SOURCE location queues **Transfer Manager** : {py:mod}`ccat_data_transfer.transfer_manager` - Finds DataTransferPackages ready to transfer - Determines routes using round-robin - Creates DataTransfer records - Submits transfer tasks to BUFFER location queues **Deletion Manager** : {py:mod}`ccat_data_transfer.deletion_manager` - Finds packages eligible for deletion - Checks retention policies - Ensures data safely archived before deletion - Submits deletion tasks to appropriate queues ### Health Checks All managers integrate with {py:class}`ccat_data_transfer.health_check.HealthCheck`: - Register service startup - Send periodic heartbeats - Report when stopping - Enables monitoring of service health ## Celery Workers **Celery Workers** are distributed processes that execute actual work: 1. Listen to queues for specific location/operation combinations 2. Perform file operations (copy, checksum, delete, etc.) 3. Update database with results 4. Must run on machines with access to the data location **Key Characteristic**: Workers need direct data access. They run on servers/computers where storage is mounted. ### Worker Assignment Workers are assigned to queues by location: ```bash # Cologne buffer worker handles all buffer operations celery worker -Q cologne_buffer_data_transfer_package_creation, cologne_buffer_data_transfer, cologne_buffer_data_transfer_unpacking, cologne_buffer_deletion # CCAT telescope worker handles source operations celery worker -Q ccat_telescope_computer_raw_data_package_creation, ccat_telescope_computer_deletion ``` ### Task Implementation Workers execute tasks defined as Celery functions. Example from {py:mod}`ccat_data_transfer.raw_data_package_manager`: ```python @app.task(base=make_celery_task()) def create_raw_data_package(package_id, verbose=False): """ Create tar archive from raw data files. Updates database with checksum and status. """ # Implementation details... ``` All tasks inherit from {py:func}`ccat_data_transfer.setup_celery_app.make_celery_task()` which provides: - Automatic session management - Heartbeat tracking - Retry logic - Error handling ## Queues **Queues** are named channels in Redis where tasks are placed for workers to consume. Queue names are automatically generated from the database. ### Queue Naming Convention ```text {site_short_name}_{data_location_name}_{operation_type} Examples: ccat_telescope_computer_raw_data_package_creation cologne_buffer_data_transfer cologne_lta_long_term_archive_transfer ramses_processing_staging ``` This convention: - Makes queue purpose self-documenting - Enables automatic worker assignment - Prevents configuration drift - Simplifies debugging (queue name tells you exactly what it does) ### Queue Discovery {py:class}`ccat_data_transfer.queue_discovery.QueueDiscoveryService` Automatically discovers queues from the database: 1. Query all active {py:class}`~ccat_ops_db.models.DataLocation` records 2. For each location, determine applicable operations from {py:data}`~ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE` 3. Generate queue names using the naming convention 4. Configure Celery routing to map tasks to queues This happens at: - Application startup - When new locations are added to the database - When the `list-queues` CLI command is run Implementation: {py:meth}`ccat_data_transfer.queue_discovery.QueueDiscoveryService.discover_all_queues` ## Routes and Routing **Routes** define how data flows between sites and locations. ### Automatic Route Discovery The system discovers routes by analyzing site topology: **Primary Routes** (SOURCE → LTA) : From any SOURCE site to all LTA sites - Created automatically by {py:func}`ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes` - Distributes data from telescope to archives - Uses round-robin for load balancing **Secondary Routes** (LTA → LTA) : Between all LTA sites - Created automatically by {py:func}`ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes` - Ensures redundant copies at all archive sites - Also uses round-robin ### Round-Robin State Round-robin distribution state is tracked in Redis: ```python redis_key = f"round_robin:{source_site.short_name}" last_index = redis.get(redis_key) next_index = (last_index + 1) % num_lta_sites redis.set(redis_key, next_index) ``` This ensures even distribution across LTA sites over time while surviving service restarts. ### Custom Routes {py:class}`ccat_ops_db.models.DataTransferRoute` Manual route overrides for special cases: - `DIRECT`: Skip buffers, transfer directly to destination - `RELAY`: Route through intermediate site - `CUSTOM`: Specific location-to-location override These are not used in production currently but available for operational flexibility. ## Data Flow Summary Putting it all together: ```{eval-rst} .. mermaid:: flowchart TD A[Instrument creates file] --> B[Manager detects file] B --> C[Manager creates package] C --> D[Worker transfer to BUFFER] D --> E[Manager creates transfer] E --> F[Manager determines route] F --> G[Worker transfers data] G --> H[Manager submits unpack] H --> I[Worker unpacks & verifies] I --> J[Manager submits archive] J --> K[Worker moves to LTA] K --> L[State: ARCHIVED] L --> M[Cleanup] classDef instrument fill:#fff4e6,stroke:#e65100,stroke-width:2px classDef manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px classDef worker fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef archived fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px class A instrument class B,C,E,F,H,J manager class D,G,I,K worker class L archived class M manager ``` Each step involves: - Database state update - Task submission to appropriate queue - Worker execution at correct location - Result verification and recording ## Next Steps - {doc}`pipeline` - Detailed look at each of the 7 pipeline stages - {doc}`routing` - Deep dive into queue discovery and task routing - {doc}`monitoring` - How the system tracks health and recovers from failures