Philosophy & Design Principles#

Documentation Verified Last checked: 2025-10-16 Reviewer: Christof Buchbender

The Data Transfer System embodies several key design principles that drive its architecture and implementation. Understanding these principles helps explain why the system works the way it does.

Core Philosophy#

Distributed by Design

The system operates across multiple sites spanning continents. Rather than centralizing control, each site operates semi-autonomously with coordination through shared state (database and Redis). This approach:

  • Enables flexible deployment topologies

Data Safety First

Data integrity is non-negotiable. The system employs defense-in-depth:

  • Data Integrity Checks at every stage of the pipeline

  • Physical copy tracking for audit trails

  • Never delete source data until verified in long-term archive

Eventual Consistency

Operations may complete at different times across sites, but the system converges to a consistent state:

  • Database records reflect intent and desired state

  • Celery tasks perform actual work asynchronously

  • Retry and recovery mechanisms ensure completion

Declarative Configuration

The system configuration is data-driven rather than hard-coded:

  • Sites and storage locations defined in the database

  • Queue routing automatically discovered from data models

  • Transfer routes computed from site topology

  • New sites/locations added without code changes

Design Patterns#

Manager/Worker Duality#

Every pipeline stage follows the same pattern:

        graph TD
    Manager["Manager<br/>(Python Process)"]
    Workers["Celery Workers<br/>(Distributed Processes)"]

    Manager -->|Submit tasks to queues| Workers

    style Manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px
    style Workers fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
    

Manager responsibilities:

  • Scans database for work to be done

  • Prepares operations (creates DB records, validates)

  • Submits Celery tasks to appropriate queues

  • Runs anywhere with database access

Worker responsibilities:

  • Listen to queues for their location/operation type

  • Perform actual work (copy files, verify checksums, etc.)

  • Update database with results

  • Must run on machines with data access

The following sequence diagram shows the detailed communication flow:

        sequenceDiagram
    participant M as Manager (Python Process)
    participant Q as Task Queues
    participant W as Celery Workers
    participant D as Database
    participant F as File System

    Note over M: Scans database for work to be done
    M->>D: Query for pending operations
    D-->>M: Return work items

    Note over M: Prepares operations (creates DB records, validates)
    M->>D: Create operation records
    M->>D: Validate requirements

    Note over M: Submits Celery tasks to appropriate queues
    M->>Q: Submit tasks to queues

    Note over W: Listen to queues for their location/operation type
    Q->>W: Deliver task

    Note over W: Fetch work data from database
    W->>D: Query operation details
    D-->>W: Return operation data

    Note over W: Perform actual work (copy files, verify checksums, etc.)
    W->>F: Access file system
    W->>F: Work on files, transfer between locations, verify checksums, etc.,

    Note over W: Update database with results
    W->>D: Update operation status
    W->>D: Record completion/results
    

Example: ccat_data_transfer.raw_data_package_manager

This separation allows:

  • Managers to run centrally (e.g., in Cologne)

  • Workers to run where data physically exists (telescope computers, processing nodes)

  • Horizontal scaling of workers without changing managers

  • Managers to continue scheduling even if workers are temporarily offline

Automatic Queue Discovery#

Rather than hard-coding queue names, the system generates them from database models:

Queue Name = {site_short_name}_{data_location_name}_{operation_type}

Examples:
• ccat_telescope_computer_raw_data_package_creation
• cologne_buffer_data_transfer
• cologne_lta_long_term_archive_transfer

Implementation: ccat_data_transfer.queue_discovery.QueueDiscoveryService

This approach:

  • Eliminates configuration drift between database and workers

  • Makes queue structure self-documenting

  • Enables dynamic worker assignment

  • Simplifies addition of new sites/locations

How it works:

  1. discover_all_queues(): Queries

    database for active DataLocation records

  2. For each location, determines applicable operations based on QUEUE_OPERATIONS_BY_LOCATION_TYPE

  3. Generates queue names and configures Celery routing

  4. Workers bind to queues matching their accessible storage

Location-Based Task Routing#

Tasks are routed to workers based on data location rather than operation type. A worker e.g. for cologne_buffer handles operations for that buffer (transfers, deletions, packaging) rather than specializing by operation type.

Rationale:

  • Workers need physical access to storage - location is the critical constraint

Implementation: ccat_data_transfer.queue_discovery.route_task_by_location()

Polymorphic Transfer Support#

The system supports multiple storage backends (disk, S3, tape) and automatically selects appropriate transfer methods:

# From ccat_data_transfer.queue_discovery.get_transfer_task

Disk → Disk:  BBCP (high-performance parallel transfer)
Disk → S3:    S3 upload
S3 → Disk:    S3 download
Disk → Tape:  (not implemented yet, but extensible for future tape systems)

Each transfer method is a separate Celery task, selected based on source/destination storage types. This polymorphism enables:

  • Adding new storage technologies without changing calling code

  • Optimizing transfer strategy per technology combination

  • Testing with fast local copies (cp) before deploying BBCP

Circuit Breaker Pattern#

To prevent cascading failures and resource exhaustion, operations that repeatedly fail trigger circuit breakers Pseudocode:

if consecutive_failures > threshold:
    mark_permanent_failure(operation)
    pause_retry_attempts()
    notify_administrators()

Implementation in custom task classes (see make_celery_task())

This prevents:

  • Infinite retry loops consuming resources

  • Queue backlogs from permanently failed items

  • Silent failures that go unnoticed

Architectural Decisions#

Why Celery?#

Decision: Use Celery for distributed task execution

Rationale:

  • Mature distributed task queue

  • Built-in retry logic and error handling

  • Redis backend provides fast coordination

  • Supports complex routing and prioritization

  • Large Python ecosystem integration

Trade-offs:

  • Additional infrastructure dependency (Redis)

  • Learning curve for team

  • Debugging distributed tasks is harder than sequential code

Alternatives considered: Apache Airflow (too heavy), Kubernetes Jobs (no built-in retry logic, no kubernetes used in production)

Why Manager/Worker Split?#

Decision: Separate scheduling (managers) from execution (workers)

Rationale:

  • Scheduling requires database access but not data access

  • Execution requires data access

  • Managers can run centrally for simplified deployment

  • Workers scale independently based on data location workload

  • Decouples business logic (what to do) from execution (how to do it)

Trade-offs:

  • More moving parts to monitor

  • State coordination through database might add latency (not yet noticed in production)

  • Code split across manager/worker modules

Alternatives considered: Combined process (simpler but less scalable), separate services (more overhead)

Why Database-First State Management?#

Decision: Use OpsDB (CCAT Operations Database) with PostgreSQL through SQLAlchemy as source of truth for system state

Rationale:

  • ACID transactions ensure consistency

  • Relational model fits data relationships naturally

  • Enables complex queries for routing and discovery

  • Survives Redis restarts (caches are ephemeral)

  • Provides audit trail and historical record

Trade-offs:

  • Database becomes bottleneck

  • Need connection pooling and optimization

  • Cannot achieve sub-millisecond latency like REDIS would

Alternatives considered: Redis-first (loses data on restart, relational model is better suited for our data)

Why Round-Robin Distribution?#

Decision: Distribute transfers across LTA sites using round-robin. THis is build in for future growth. We start with only one LTA site in Cologne but might add more in the future.

Rationale:

  • Allows to send out data from the observatory only once and then sync on cheaper links between downstream archives.

  • Stateful (Redis tracks last used site) but recoverable

Trade-offs:

  • May send data to slower sites

  • Requires Redis coordination

Performance Considerations#

Parallel Processing

  • BBCP uses multiple TCP streams for high-throughput transfers

  • Celery workers process multiple tasks concurrently

Checksum Trade-offs

  • xxHash64: Fast for large files, used during transfers (used for data transfer and data verification)

  • SHA256: Cryptographic security, used for long-term archive verification

Scalability Strategy#

Horizontal Scaling

  • Add Celery workers to increase throughput

  • No coordination needed between workers processing different queues

Future Growth

The design accommodates:

  • Additional sites (new rows in Site table)

  • New storage technologies (implement transfer task, add to routing)

  • Increased data rates (add workers, optimize transfer protocols)

  • New pipeline stages (implement manager + worker, define operations)

Next Steps#

Now that you understand the philosophy, explore: