Philosophy & Design Principles ============================== .. verified:: 2025-10-16 :reviewer: Christof Buchbender The Data Transfer System embodies several key design principles that drive its architecture and implementation. Understanding these principles helps explain why the system works the way it does. Core Philosophy --------------- **Distributed by Design** The system operates across multiple sites spanning continents. Rather than centralizing control, each site operates semi-autonomously with coordination through shared state (database and Redis). This approach: * Enables flexible deployment topologies **Data Safety First** Data integrity is non-negotiable. The system employs defense-in-depth: * Data Integrity Checks at every stage of the pipeline * Physical copy tracking for audit trails * Never delete source data until verified in long-term archive **Eventual Consistency** Operations may complete at different times across sites, but the system converges to a consistent state: * Database records reflect intent and desired state * Celery tasks perform actual work asynchronously * Retry and recovery mechanisms ensure completion **Declarative Configuration** The system configuration is data-driven rather than hard-coded: * Sites and storage locations defined in the database * Queue routing automatically discovered from data models * Transfer routes computed from site topology * New sites/locations added without code changes Design Patterns --------------- Manager/Worker Duality ~~~~~~~~~~~~~~~~~~~~~~ Every pipeline stage follows the same pattern: .. mermaid:: graph TD Manager["Manager
(Python Process)"] Workers["Celery Workers
(Distributed Processes)"] Manager -->|Submit tasks to queues| Workers style Manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px style Workers fill:#f3e5f5,stroke:#4a148c,stroke-width:2px **Manager responsibilities:** * Scans database for work to be done * Prepares operations (creates DB records, validates) * Submits Celery tasks to appropriate queues * Runs anywhere with database access **Worker responsibilities:** * Listen to queues for their location/operation type * Perform actual work (copy files, verify checksums, etc.) * Update database with results * Must run on machines with data access The following sequence diagram shows the detailed communication flow: .. mermaid:: sequenceDiagram participant M as Manager (Python Process) participant Q as Task Queues participant W as Celery Workers participant D as Database participant F as File System Note over M: Scans database for work to be done M->>D: Query for pending operations D-->>M: Return work items Note over M: Prepares operations (creates DB records, validates) M->>D: Create operation records M->>D: Validate requirements Note over M: Submits Celery tasks to appropriate queues M->>Q: Submit tasks to queues Note over W: Listen to queues for their location/operation type Q->>W: Deliver task Note over W: Fetch work data from database W->>D: Query operation details D-->>W: Return operation data Note over W: Perform actual work (copy files, verify checksums, etc.) W->>F: Access file system W->>F: Work on files, transfer between locations, verify checksums, etc., Note over W: Update database with results W->>D: Update operation status W->>D: Record completion/results **Example**: :py:mod:`ccat_data_transfer.raw_data_package_manager` * **Manager** (:py:func:`~ccat_data_transfer.raw_data_package_manager.raw_data_package_manager_service`): Finds unpackaged files, creates :py:class:`~ccat_ops_db.models.RawDataPackage` records, submits tasks * **Worker** (:py:func:`~ccat_data_transfer.raw_data_package_manager.create_raw_data_packages`): Executes tar creation, checksum calculation, file movement This separation allows: * Managers to run centrally (e.g., in Cologne) * Workers to run where data physically exists (telescope computers, processing nodes) * Horizontal scaling of workers without changing managers * Managers to continue scheduling even if workers are temporarily offline Automatic Queue Discovery ~~~~~~~~~~~~~~~~~~~~~~~~~ Rather than hard-coding queue names, the system generates them from database models: .. code-block:: text Queue Name = {site_short_name}_{data_location_name}_{operation_type} Examples: • ccat_telescope_computer_raw_data_package_creation • cologne_buffer_data_transfer • cologne_lta_long_term_archive_transfer Implementation: :py:class:`ccat_data_transfer.queue_discovery.QueueDiscoveryService` This approach: * Eliminates configuration drift between database and workers * Makes queue structure self-documenting * Enables dynamic worker assignment * Simplifies addition of new sites/locations **How it works:** 1. :py:meth:`~ccat_data_transfer.queue_discovery.QueueDiscoveryService.discover_all_queues`: Queries database for active :py:class:`~ccat_ops_db.models.DataLocation` records 2. For each location, determines applicable operations based on :py:data:`~ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE` 3. Generates queue names and configures Celery routing 4. Workers bind to queues matching their accessible storage Location-Based Task Routing ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Tasks are routed to workers based on data location rather than operation type. A worker e.g. for ``cologne_buffer`` handles operations for that buffer (transfers, deletions, packaging) rather than specializing by operation type. **Rationale:** * Workers need physical access to storage - location is the critical constraint **Implementation**: :py:func:`ccat_data_transfer.queue_discovery.route_task_by_location` Polymorphic Transfer Support ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The system supports multiple storage backends (disk, S3, tape) and automatically selects appropriate transfer methods: .. code-block:: # From ccat_data_transfer.queue_discovery.get_transfer_task Disk → Disk: BBCP (high-performance parallel transfer) Disk → S3: S3 upload S3 → Disk: S3 download Disk → Tape: (not implemented yet, but extensible for future tape systems) Each transfer method is a separate Celery task, selected based on source/destination storage types. This polymorphism enables: * Adding new storage technologies without changing calling code * Optimizing transfer strategy per technology combination * Testing with fast local copies (``cp``) before deploying BBCP Circuit Breaker Pattern ~~~~~~~~~~~~~~~~~~~~~~~~ To prevent cascading failures and resource exhaustion, operations that repeatedly fail trigger circuit breakers Pseudocode: .. code-block:: python if consecutive_failures > threshold: mark_permanent_failure(operation) pause_retry_attempts() notify_administrators() Implementation in custom task classes (see :py:func:`~ccat_data_transfer.setup_celery_app.make_celery_task`) This prevents: * Infinite retry loops consuming resources * Queue backlogs from permanently failed items * Silent failures that go unnoticed Architectural Decisions ----------------------- Why Celery? ~~~~~~~~~~~ **Decision**: Use `Celery `_ for distributed task execution **Rationale**: * Mature distributed task queue * Built-in retry logic and error handling * Redis backend provides fast coordination * Supports complex routing and prioritization * Large Python ecosystem integration **Trade-offs**: * Additional infrastructure dependency (Redis) * Learning curve for team * Debugging distributed tasks is harder than sequential code **Alternatives considered**: Apache Airflow (too heavy), Kubernetes Jobs (no built-in retry logic, no kubernetes used in production) Why Manager/Worker Split? ~~~~~~~~~~~~~~~~~~~~~~~~~~ **Decision**: Separate scheduling (managers) from execution (workers) **Rationale**: * Scheduling requires database access but not data access * Execution requires data access * Managers can run centrally for simplified deployment * Workers scale independently based on data location workload * Decouples business logic (what to do) from execution (how to do it) **Trade-offs**: * More moving parts to monitor * State coordination through database might add latency (not yet noticed in production) * Code split across manager/worker modules **Alternatives considered**: Combined process (simpler but less scalable), separate services (more overhead) Why Database-First State Management? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Decision**: Use OpsDB (CCAT Operations Database) with `PostgreSQL `_ through `SQLAlchemy `_ as source of truth for system state **Rationale**: * ACID transactions ensure consistency * Relational model fits data relationships naturally * Enables complex queries for routing and discovery * Survives Redis restarts (caches are ephemeral) * Provides audit trail and historical record **Trade-offs**: * Database becomes bottleneck * Need connection pooling and optimization * Cannot achieve sub-millisecond latency like REDIS would **Alternatives considered**: Redis-first (loses data on restart, relational model is better suited for our data) Why Round-Robin Distribution? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Decision**: Distribute transfers across LTA sites using round-robin. THis is build in for future growth. We start with only one LTA site in Cologne but might add more in the future. **Rationale**: * Allows to send out data from the observatory only once and then sync on cheaper links between downstream archives. * Stateful (Redis tracks last used site) but recoverable **Trade-offs**: * May send data to slower sites * Requires Redis coordination Performance Considerations -------------------------- **Parallel Processing** * BBCP uses multiple TCP streams for high-throughput transfers * Celery workers process multiple tasks concurrently **Checksum Trade-offs** * xxHash64: Fast for large files, used during transfers (used for data transfer and data verification) * SHA256: Cryptographic security, used for long-term archive verification Scalability Strategy -------------------- **Horizontal Scaling** * Add Celery workers to increase throughput * No coordination needed between workers processing different queues **Future Growth** The design accommodates: * Additional sites (new rows in ``Site`` table) * New storage technologies (implement transfer task, add to routing) * Increased data rates (add workers, optimize transfer protocols) * New pipeline stages (implement manager + worker, define operations) Next Steps ---------- Now that you understand the philosophy, explore: * :doc:`concepts` - Key concepts and data models * :doc:`pipeline` - How the 7 stages work together * :doc:`routing` - Queue discovery and task routing in detail