Core Concepts#

Documentation Verified Last checked: 2025-10-16 Reviewer: Christof Buchbender

The Data Transfer System is built on several foundational concepts that work together to enable automated, distributed data management. This section explains each concept and how they relate.

Sites#

A Site represents a physical location where data can be stored and processed. Sites are geographically distributed across the CCAT collaboration.

Current Sites:

  • CCAT (Chile): Observatory site where data originates

  • Cologne (Germany): Primary development and long-term archive site

  • Optionally others e.g. Cornell (USA): Optional other sites e.g. US-based long-term archive and processing site

Database Model: ccat_ops_db.models.Site

Key Attributes:

  • name: Full site name (e.g., “CCAT Observatory”)

  • short_name: Used in queue names (e.g., “ccat”, “cologne”, “us”)

  • location: Geographic information for coordination

Purpose:

Sites group related storage locations and provide geographic context for routing decisions. When data needs to be replicated, the system ensures copies exist at multiple sites for redundancy.

Data Locations#

A DataLocation represents a specific storage location at a site. Each location has a type that defines its role in the data pipeline.

Database Model: ccat_ops_db.models.DataLocation

Location Types#

ccat_ops_db.models.LocationType defines four types:

SOURCE

Where data originates (e.g. telescope instrument computers)

  • Raw data files created here by instruments

  • Packaged and moved to buffers

  • Examples: primecam_raw_data, chai_raw_data

BUFFER

Intermediate staging areas for transfers

  • Aggregation point before site-to-site transfers

  • Temporary storage during pipeline processing

  • Each site typically has one or more buffers

  • Failover is supported by having multiple buffers with different priorities

  • Examples: output_buffer, input_buffer

LONG_TERM_ARCHIVE

Permanent data storage locations

  • Final destination for all data

  • Multiple copies across sites for redundancy

  • May use S3, tape, or high-capacity disk storage

  • Supported storage is S3

  • Examples: long_term_archive

PROCESSING

Temporary locations for scientific data analysis

  • Data staged here from archives when needed

  • Cleaned up after processing completes

  • Examples: ramses_processing

Storage Technologies#

Each DataLocation uses a specific storage technology:

DiskDataLocation

ccat_ops_db.models.DiskDataLocation

  • Traditional filesystem storage (local or network-mounted)

  • Example: /data/ccat/buffer on Cologne servers

S3DataLocation

ccat_ops_db.models.S3DataLocation

  • Object storage (e.g. AWS S3 or compatible)

  • Example: CCAT Project on DataStorage.NRW

TapeDataLocation

ccat_ops_db.models.TapeDataLocation

  • Tape library systems (future capability)

  • Not currently implemented

Priority and Failover#

Multiple locations of the same type can exist at a site:

# Example: Multiple buffers at Cologne
cologne_buffer_primary   (priority=1, active=True)
cologne_buffer_secondary (priority=2, active=True)
cologne_buffer_backup    (priority=3, active=False)

The system uses:

  • Priority (lower number = higher priority): Determines which location to use first

  • Active flag: Allows temporarily disabling locations for maintenance

Implementation: ccat_data_transfer.queue_discovery.QueueDiscoveryService.get_primary_buffer_for_site()

Data Packages#

Raw data files are grouped into ccat_ops_db.models.RawDataPackage for efficient transfer and management.

RawDataFile#

ccat_ops_db.models.RawDataFile

Individual files created by instruments:

  • Original filename and size

  • Checksum for integrity verification

  • Relationship to observation (ExecutedObsUnit)

  • Relationship to instrument component (InstrumentModule)

RawDataPackage#

ccat_ops_db.models.RawDataPackage

Logical grouping of related raw data files:

  • Typically all files from one observation

  • Created at SOURCE locations by ccat_data_transfer.raw_data_package_manager

  • Archived as tar files for efficient storage and transfer

  • Tracked independently through pipeline stages

States: ccat_ops_db.models.Status

  • PENDING: Exists only at source, not yet packaged

  • SCHEDULED: Packaging scheduled for celery task

  • IN_PROGRESS: Packaging in progress, i.e. celery task is running

  • COMPLETED: Packaging completed, i.e. celery task completed successfully

  • FAILED: Packaging failed, i.e. celery task failed

DataTransferPackage#

ccat_ops_db.models.DataTransferPackage

Temporary container bundling multiple RawDataPackages for efficient transfer:

  • Aggregates packages up to a size limit (e.g., 100 GB)

  • Created at BUFFER locations by ccat_data_transfer.data_transfer_package_manager

  • Tar archive containing multiple RawDataPackage tar files

  • Deleted after successful unpacking at destination

Purpose: Optimize network efficiency by amortizing transfer overhead across many packages.

Physical Copy Tracking#

ccat_ops_db.models.PhysicalCopy (and subclasses)

Tracks where each file/package physically exists:

  • Every file in the system has PhysicalCopy records for each location it exists at

  • Status indicates current state (PRESENT, DELETED, etc.)

  • Enables complete audit trail of data movement

  • Used by deletion manager to determine cleanup eligibility

Example:

# RawDataPackage "obs_001" exists at three locations

PhysicalCopy(package=obs_001, location=ccat_buffer, status=DELETED)
PhysicalCopy(package=obs_001, location=cologne_lta, status=PRESENT)
PhysicalCopy(package=obs_001, location=cornell_lta, status=PRESENT)

Operations#

An Operation is a unit of work performed by the data transfer system. Operations are defined by ccat_data_transfer.operation_types.OperationType.

Operation Types#

Operation

Description

Primary Location

RAW_DATA_PACKAGE_CREATION

Package raw files into tar archives

SOURCE

DATA_TRANSFER_PACKAGE_CREATION

Bundle packages for transfer

BUFFER

DATA_TRANSFER

Move packages between sites

BUFFER

DATA_TRANSFER_UNPACKING

Extract transferred archives

BUFFER

LONG_TERM_ARCHIVE_TRANSFER

Move to permanent storage

LTA

STAGING

Copy to processing locations

PROCESSING

DELETION

Remove temporary files

ALL

MONITORING

Disk usage and health checks

ALL

Operation-Location Matrix#

Not all operations apply to all location types. The system uses ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE to determine which operations are valid for each location:

SOURCE locations:
• RAW_DATA_PACKAGE_CREATION
• DELETION
• MONITORING

BUFFER locations:
• DATA_TRANSFER_PACKAGE_CREATION
• DATA_TRANSFER
• DATA_TRANSFER_UNPACKING
• DELETION
• MONITORING

LONG_TERM_ARCHIVE locations:
• LONG_TERM_ARCHIVE_TRANSFER
• MONITORING

PROCESSING locations:
• STAGING
• DELETION
• MONITORING

Managers#

Managers are Python processes that orchestrate the data pipeline by:

  1. Scanning the database for work to be done

  2. Creating database records for new operations

  3. Submitting Celery tasks to appropriate queues

  4. Running in loops with configurable sleep intervals

Key Characteristic: Managers need database access but NOT direct data access. They can run anywhere (typically centrally in Cologne).

Manager Examples#

Raw Data Package Manager

ccat_data_transfer.raw_data_package_manager

  • Finds unpackaged RawDataFiles

  • Groups files by observation

  • Creates RawDataPackage records

  • Submits packaging tasks to SOURCE location queues

Transfer Manager

ccat_data_transfer.transfer_manager

  • Finds DataTransferPackages ready to transfer

  • Determines routes using round-robin

  • Creates DataTransfer records

  • Submits transfer tasks to BUFFER location queues

Deletion Manager

ccat_data_transfer.deletion_manager

  • Finds packages eligible for deletion

  • Checks retention policies

  • Ensures data safely archived before deletion

  • Submits deletion tasks to appropriate queues

Health Checks#

All managers integrate with ccat_data_transfer.health_check.HealthCheck:

  • Register service startup

  • Send periodic heartbeats

  • Report when stopping

  • Enables monitoring of service health

Celery Workers#

Celery Workers are distributed processes that execute actual work:

  1. Listen to queues for specific location/operation combinations

  2. Perform file operations (copy, checksum, delete, etc.)

  3. Update database with results

  4. Must run on machines with access to the data location

Key Characteristic: Workers need direct data access. They run on servers/computers where storage is mounted.

Worker Assignment#

Workers are assigned to queues by location:

# Cologne buffer worker handles all buffer operations
celery worker -Q cologne_buffer_data_transfer_package_creation,
                 cologne_buffer_data_transfer,
                 cologne_buffer_data_transfer_unpacking,
                 cologne_buffer_deletion

# CCAT telescope worker handles source operations
celery worker -Q ccat_telescope_computer_raw_data_package_creation,
                 ccat_telescope_computer_deletion

Task Implementation#

Workers execute tasks defined as Celery functions. Example from ccat_data_transfer.raw_data_package_manager:

@app.task(base=make_celery_task())
def create_raw_data_package(package_id, verbose=False):
    """
    Create tar archive from raw data files.
    Updates database with checksum and status.
    """
    # Implementation details...

All tasks inherit from ccat_data_transfer.setup_celery_app.make_celery_task() which provides:

  • Automatic session management

  • Heartbeat tracking

  • Retry logic

  • Error handling

Queues#

Queues are named channels in Redis where tasks are placed for workers to consume. Queue names are automatically generated from the database.

Queue Naming Convention#

{site_short_name}_{data_location_name}_{operation_type}

Examples:
ccat_telescope_computer_raw_data_package_creation
cologne_buffer_data_transfer
cologne_lta_long_term_archive_transfer
ramses_processing_staging

This convention:

  • Makes queue purpose self-documenting

  • Enables automatic worker assignment

  • Prevents configuration drift

  • Simplifies debugging (queue name tells you exactly what it does)

Queue Discovery#

ccat_data_transfer.queue_discovery.QueueDiscoveryService

Automatically discovers queues from the database:

  1. Query all active DataLocation records

  2. For each location, determine applicable operations from QUEUE_OPERATIONS_BY_LOCATION_TYPE

  3. Generate queue names using the naming convention

  4. Configure Celery routing to map tasks to queues

This happens at:

  • Application startup

  • When new locations are added to the database

  • When the list-queues CLI command is run

Implementation: ccat_data_transfer.queue_discovery.QueueDiscoveryService.discover_all_queues()

Routes and Routing#

Routes define how data flows between sites and locations.

Automatic Route Discovery#

The system discovers routes by analyzing site topology:

Primary Routes (SOURCE → LTA)

From any SOURCE site to all LTA sites

Secondary Routes (LTA → LTA)

Between all LTA sites

Round-Robin State#

Round-robin distribution state is tracked in Redis:

redis_key = f"round_robin:{source_site.short_name}"
last_index = redis.get(redis_key)
next_index = (last_index + 1) % num_lta_sites
redis.set(redis_key, next_index)

This ensures even distribution across LTA sites over time while surviving service restarts.

Custom Routes#

ccat_ops_db.models.DataTransferRoute

Manual route overrides for special cases:

  • DIRECT: Skip buffers, transfer directly to destination

  • RELAY: Route through intermediate site

  • CUSTOM: Specific location-to-location override

These are not used in production currently but available for operational flexibility.

Data Flow Summary#

Putting it all together:

        flowchart TD
    A[Instrument creates file] --> B[Manager detects file]
    B --> C[Manager creates package]
    C --> D[Worker transfer to BUFFER]
    D --> E[Manager creates transfer]
    E --> F[Manager determines route]
    F --> G[Worker transfers data]
    G --> H[Manager submits unpack]
    H --> I[Worker unpacks & verifies]
    I --> J[Manager submits archive]
    J --> K[Worker moves to LTA]
    K --> L[State: ARCHIVED]
    L --> M[Cleanup]

    classDef instrument fill:#fff4e6,stroke:#e65100,stroke-width:2px
    classDef manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px
    classDef worker fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
    classDef archived fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px

    class A instrument
    class B,C,E,F,H,J manager
    class D,G,I,K worker
    class L archived
    class M manager
    

Each step involves:

  • Database state update

  • Task submission to appropriate queue

  • Worker execution at correct location

  • Result verification and recording

Next Steps#