Routing & Queue Discovery#

Documentation Verified Last checked: 2025-11-05 Reviewer: Christof Buchbender

The Data Transfer System uses dynamic queue discovery and intelligent routing to ensure work reaches the right workers without manual configuration. This section explains how the system automatically determines where tasks should execute.

Celery tasks can be routed to specific queues. A celery worker can be configured to listen to specific (one or many) queues. A worker needs physical access to the data it is working with.

The Routing Problem#

  • Multiple sites (Chile, Germany, USA) with different storage locations

  • Each location requires specific workers with data access

  • Operations vary by location type (SOURCE, BUFFER, LTA, PROCESSING)

  • New sites and locations added without code changes

  • Workers may come online/offline dynamically

Queue Naming Convention#

Queue names follow a strict three-part convention:

{site_short_name}_{data_location_name}_{operation_type}

Components:
• site_short_name:    From Site.short_name (e.g., "ccat", "cologne", "us")
• data_location_name: From DataLocation.name (e.g., "buffer", "lta", "telescope_computer")
• operation_type:     From OperationType enum (e.g., "raw_data_package_creation")

Examples:

ccat_telescope_computer_raw_data_package_creation
└─┬┘ └──────────┬─────┘ └──────────┬────────────┘
Site    Location Name         Operation Type

cologne_buffer_data_transfer
cologne_buffer_data_transfer_unpacking
cologne_buffer_deletion
cologne_lta_long_term_archive_transfer
us_lta_long_term_archive_transfer
ramses_processing_staging

Benefits:

  • Self-Documenting: Queue name tells you exactly what it does and where

  • No Configuration Drift: Queues generated from database, always in sync

  • Predictable: Easy to determine queue name for any location/operation

  • Debuggable: Logs show clear queue names

Queue Discovery Service#

ccat_data_transfer.queue_discovery.QueueDiscoveryService

This service automatically discovers all required queues from the database.

Discovery Algorithm#

Implementation: discover_all_queues()

    def discover_all_queues(self) -> List[str]:
        """Discover all required queues from database."""
        locations = (
            self.session.query(DataLocation)
            .filter(DataLocation.active == True)  # noqa: E712
            .all()
        )
        queues = []
        for location in locations:
            queue_base = f"{location.site.short_name}_{location.name}"
            print(f"Queue base: {location.location_type}")
            operations = QUEUE_OPERATIONS_BY_LOCATION_TYPE.get(
                location.location_type, []
            )

            for operation in operations:
                print(f"Operation: {operation}")
                queues.append(f"{queue_base}_{operation.value}")

        return queues

Operation Matrix:

The mapping ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE defines which operations apply to each location type:

QUEUE_OPERATIONS_BY_LOCATION_TYPE = {
    LocationType.SOURCE: [
        OperationType.RAW_DATA_PACKAGE_CREATION,
        OperationType.DELETION,
        # OperationType.ARCHIVE,
        OperationType.MONITORING,
    ],
    LocationType.BUFFER: [
        OperationType.DATA_TRANSFER_PACKAGE_CREATION,
        OperationType.DATA_TRANSFER_UNPACKING,
        OperationType.DATA_TRANSFER,
        OperationType.DELETION,
        # OperationType.ARCHIVE,
        OperationType.MONITORING,
    ],
    LocationType.LONG_TERM_ARCHIVE: [
        # OperationType.ARCHIVE,
        OperationType.LONG_TERM_ARCHIVE_TRANSFER,
        OperationType.MONITORING,
    ],
    LocationType.PROCESSING: [
        OperationType.STAGING,
        OperationType.DELETION,
        OperationType.MONITORING,
    ],
}

This ensures:

  • Only valid operations for each location type

  • Consistent across all sites

  • Easy to add new operation types

Celery Configuration#

Queue discovery happens at application startup in ccat_data_transfer.setup_celery_app:

def configure_dynamic_queues(session: sessionmaker) -> None:
    """Configure dynamic queues from database and merge with static configuration."""
    logger = get_structured_logger(__name__)
    logger.info("Configuring dynamic Celery queues from database")

    # Discover queues from database
    discovery_service = QueueDiscoveryService(session)
    dynamic_queue_names = discovery_service.discover_all_queues()

    # Create dynamic queue objects
    dynamic_queues = []
    for queue_name in dynamic_queue_names:
        queue = Queue(
            queue_name,
            data_transfer_exchange,
            routing_key=queue_name,
            queue_arguments={"x-queue-type": "classic"},
        )
        dynamic_queues.append(queue)

    # Merge static and dynamic queues
    all_queues = list(STATIC_QUEUES) + dynamic_queues
    app.conf.task_queues = tuple(all_queues)

    # Create dynamic routes
    dynamic_routes = {}
    for queue_name in dynamic_queue_names:
        dynamic_routes[f"ccat:data_transfer:{queue_name}"] = {
            "queue": queue_name,
            "routing_key": queue_name,
        }

    # Merge static and dynamic routes
    all_routes = {**STATIC_ROUTES, **dynamic_routes}
    app.conf.task_routes = all_routes

    logger.info(f"Configured {len(all_queues)} queues ({len(dynamic_queues)} dynamic)")

This happens:

  • When workers start

  • When managers start

  • When the CLI command list-queues runs

How Routing Integrates with Managers#

The routing system is called by managers at key decision points in the data pipeline:

1. RawDataPackage Creation

When raw_data_package_manager_service() creates a package, it determines the queue:

# In raw_data_package_manager.py
source_location = package.source_location

queue = route_task_by_location(
    OperationType.RAW_DATA_PACKAGE_CREATION,
    source_location
)

create_raw_data_package.apply_async(
    args=[package.id],
    queue=queue
)

2. DataTransferPackage Assembly

When data_transfer_package_manager.py completes a package, it creates primary transfers:

# In data_transfer_package_manager.py
def handle_completed_package(session, package, source_buffer):
    # Discover routes and pick LTA site via round-robin
    create_primary_data_transfers(session, package, source_buffer)

This calls:

3. Secondary Replication

The create_secondary_data_transfers() service periodically runs:

# Scheduled task in data-transfer-package-manager
create_secondary_data_transfers(session)

This:

  • Calls discover_secondary_routes() - Get all LTA↔LTA routes

  • Scans all completed packages

  • Creates transfers to LTA sites that don’t have the package yet

Key Insight:

Managers never hard-code queue names or routing decisions. They always:

  1. Query the database for current sites/locations

  2. Call routing functions to determine queues

  3. Submit tasks to dynamically determined queues

This separation allows the routing logic to evolve without changing manager code.

Task Routing#

Once queues exist, tasks must be routed to the appropriate queue.

Route Determination#

ccat_data_transfer.queue_discovery.route_task_by_location()

Given an operation and data location, determine the queue:

def route_task_by_location(
    operation_type: OperationType, data_location: DataLocation
) -> str:
    """Route task to appropriate queue based on operation and location."""
    queue_base = f"{data_location.site.short_name}_{data_location.name}"
    return f"{queue_base}_{operation_type.value}"

Usage in Managers:

# In raw_data_package_manager.py

# Find location where package will be created
source_location = package.source_location

# Determine queue
queue = route_task_by_location(
    OperationType.RAW_DATA_PACKAGE_CREATION,
    source_location
)

# Submit task
create_raw_data_package.apply_async(
    args=[package.id],
    queue=queue
)

This ensures:

  • Task sent to queue matching worker’s location

  • Worker has access to necessary storage

  • No hard-coded queue names in manager code

Transfer Task Selection#

Transfer operations are special - they involve two locations (source and destination). The system selects the appropriate transfer task based on storage types.

ccat_data_transfer.queue_discovery.get_transfer_task()

def get_transfer_task(
    origin_location: DataLocation, dest_location: DataLocation
) -> str:
    """Select appropriate transfer task based on storage types."""
    if isinstance(origin_location, DiskDataLocation) and isinstance(
        dest_location, DiskDataLocation
    ):
        return "ccat:data_transfer:transfer_disk_to_disk"
    elif isinstance(origin_location, DiskDataLocation) and isinstance(
        dest_location, S3DataLocation
    ):
        return "ccat:data_transfer:transfer_disk_to_s3"
    elif isinstance(origin_location, S3DataLocation) and isinstance(
        dest_location, DiskDataLocation
    ):
        return "ccat:data_transfer:transfer_s3_to_disk"
    elif isinstance(origin_location, S3DataLocation) and isinstance(
        dest_location, S3DataLocation
    ):
        return "ccat:data_transfer:transfer_s3_to_s3"
    elif isinstance(origin_location, DiskDataLocation) and isinstance(
        dest_location, TapeDataLocation
    ):
        return "ccat:data_transfer:transfer_disk_to_tape"
    elif isinstance(origin_location, TapeDataLocation) and isinstance(
        dest_location, DiskDataLocation
    ):
        return "ccat:data_transfer:transfer_tape_to_disk"
    else:
        raise ValueError(
            f"Unsupported transfer combination: {origin_location.storage_type} -> {dest_location.storage_type}"
        )

Routing for Transfers:

Transfers route to the origin location’s queue:

# Transfer task executes at origin (push model)
queue = route_task_by_location(
    OperationType.DATA_TRANSFER,
    origin_location
)

This makes sense because:

  • Origin worker has source data to push

  • Origin controls transfer initiation

  • Destination worker handles unpacking separately

Worker Assignment#

Workers must bind to queues for locations they can access.

Manual Queue Selection#

When starting a worker, specify queues explicitly:

# Worker on Cologne buffer server
celery -A ccat_data_transfer.setup_celery_app worker \
    -Q cologne_buffer_data_transfer_package_creation,\
       cologne_buffer_data_transfer,\
       cologne_buffer_data_transfer_unpacking,\
       cologne_buffer_deletion

# Worker on CCAT telescope computer
celery -A ccat_data_transfer.setup_celery_app worker \
    -Q ccat_telescope_computer_raw_data_package_creation,\
       ccat_telescope_computer_deletion

Helper CLI Commands#

The system provides CLI commands to assist with worker assignment.

List All Queues:

ccat_data_transfer list-queues

List Queues for Specific Location:

ccat_data_transfer list-queues cologne_buffer

Site-to-Site Routing#

For inter-site transfers, the system must determine which sites should exchange data.

Route Discovery#

ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes()

Discovers primary routes from SOURCE sites to LTA sites:

def discover_automatic_routes(
    session: Session,
) -> List[Tuple[models.Site, models.Site]]:
    """
    Discover automatic routes from all source sites to all LTA sites.

    Parameters
    ----------
    session : sqlalchemy.orm.Session
        The database session.

    Returns
    -------
    List[Tuple[models.Site, models.Site]]
        List of (source_site, lta_site) tuples representing automatic routes.
    """
    # Get all sites with source locations
    source_sites = (
        session.query(models.Site)
        .join(models.DataLocation)
        .filter(
            models.DataLocation.location_type == models.LocationType.SOURCE,
            models.DataLocation.active == True,  # noqa: E712
        )
        .distinct()
        .all()
    )

    # Get all sites with LTA locations
    lta_sites = (
        session.query(models.Site)
        .join(models.DataLocation)
        .filter(
            models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
            models.DataLocation.active == True,  # noqa: E712
        )
        .distinct()
        .all()
    )

    automatic_routes = []
    for source_site in source_sites:
        for lta_site in lta_sites:
            if source_site.id != lta_site.id:  # Don't route to self
                automatic_routes.append((source_site, lta_site))

    logger.info(f"Discovered {len(automatic_routes)} automatic routes")
    return automatic_routes

Example:

Given:

  • SOURCE sites: CCAT (Chile)

  • LTA sites: Cologne (Germany), Cornell (USA)

Routes discovered:

  • CCAT → Cologne

  • CCAT → Cornell

Secondary Routes#

ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes()

After primary distribution, packages replicate between LTA sites to ensure redundancy:

def discover_secondary_routes(
    session: Session,
) -> List[Tuple[models.Site, models.Site]]:
    """
    Discover secondary routes between all LTA sites for data replication.

    Parameters
    ----------
    session : sqlalchemy.orm.Session
        The database session.

    Returns
    -------
    List[Tuple[models.Site, models.Site]]
        List of (lta_site, lta_site) tuples representing secondary routes.
    """
    # Get all sites with LTA locations
    lta_sites = (
        session.query(models.Site)
        .join(models.DataLocation)
        .filter(
            models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
            models.DataLocation.active == True,  # noqa: E712
        )
        .distinct()
        .all()
    )

    secondary_routes = []
    for origin_lta in lta_sites:
        for dest_lta in lta_sites:
            if origin_lta.id != dest_lta.id:  # Don't route to self
                secondary_routes.append((origin_lta, dest_lta))

    logger.info(f"Discovered {len(secondary_routes)} secondary routes")
    return secondary_routes

Example:

Given LTA sites: Cologne, Cornell

Routes discovered:

  • Cologne → Cornell

  • Cornell → Cologne

How Secondary Transfers Work:

Unlike primary transfers (which use round-robin), secondary transfers are opportunistic:

  1. The create_secondary_data_transfers() function periodically scans all completed DataTransferPackages

  2. For each package, it checks which LTA sites already have it (by looking at physical copies)

  3. It creates transfers from sites that have the package to sites that don’t have it

  4. Over time, this ensures every LTA site gets every package without explicit scheduling

This approach is more resilient than round-robin because:

  • Works even if some LTA sites are offline temporarily

  • Self-heals if transfers fail - next cycle will retry

  • No state to track (unlike round-robin index)

  • Naturally prioritizes packages that have the fewest copies

Example Flow:

T=0: Package arrives at Cologne (from CCAT via primary transfer)
T=1: create_secondary_data_transfers() runs
     → Sees Cologne has package, Cornell doesn't
     → Creates transfer: Cologne → Cornell
T=2: Transfer completes, both sites now have package
T=3: create_secondary_data_transfers() runs again
     → Both sites have it, no new transfers created

Round-Robin Distribution#

To balance load across LTA sites, the system distributes primary transfers using round-robin.

Note: Round-robin is only used for primary transfers (SOURCE → LTA). Secondary transfers (LTA → LTA) use opportunistic replication instead.

Algorithm#

ccat_data_transfer.data_transfer_package_manager.get_next_lta_site_round_robin()

For a SOURCE site with multiple LTA destinations:

def get_next_lta_site_round_robin(
    session: Session,
    source_site: models.Site,
    automatic_routes: List[Tuple[models.Site, models.Site]],
) -> Optional[models.Site]:
    """
    Get the next LTA site for round-robin distribution from a source site.

    Parameters
    ----------
    session : sqlalchemy.orm.Session
        The database session.
    source_site : models.Site
        The source site.
    automatic_routes : List[Tuple[models.Site, models.Site]]
        List of automatic routes.

    Returns
    -------
    Optional[models.Site]
        The next LTA site in round-robin order.
    """
    # Get all LTA sites that this source site can route to
    available_lta_sites = [
        lta_site
        for src_site, lta_site in automatic_routes
        if src_site.id == source_site.id
    ]

    if not available_lta_sites:
        logger.warning(
            f"No LTA sites available for source site {source_site.short_name}"
        )
        return None

    # Use Redis to track round-robin state
    redis_key = f"round_robin:source:{source_site.short_name}"
    current_index = redis_.get(redis_key)

    if current_index is None:
        current_index = 0
    else:
        current_index = int(current_index)

    # Get next LTA site
    next_lta_site = available_lta_sites[current_index]

    # Update round-robin index
    next_index = (current_index + 1) % len(available_lta_sites)
    redis_.set(redis_key, next_index)

    logger.debug(
        f"Round-robin: {source_site.short_name} -> {next_lta_site.short_name} (index {current_index})"
    )
    return next_lta_site

Example Sequence:

Given LTA sites: [Cologne, Cornell]

Transfer 1: CCAT → Cologne  (index 0)
Transfer 2: CCAT → Cornell  (index 1)
Transfer 3: CCAT → Cologne  (index 0)
Transfer 4: CCAT → Cornell  (index 1)
...

This ensures:

  • Even distribution over time

  • No single site overwhelmed

  • Fair to all LTA sites

  • State survives service restarts (Redis persistence)

Redis Key Format:

The round-robin state is stored in Redis with the key:

round_robin:source:{site_short_name}

Examples:
round_robin:source:ccat
round_robin:source:apex

Custom Routing (Not Implemented)#

The ccat_ops_db.models.DataTransferRoute model exists in the database schema, but custom routing is not currently implemented in the transfer logic.

DataTransferRoute Model#

ccat_ops_db.models.DataTransferRoute

The ccat_ops_db.models.DataTransferRoute model allows specifying custom routes, but the find_route_overrides() function currently only logs these records without acting on them:

def find_route_overrides(session: Session) -> List[models.DataTransferRoute]:
    """
    Find manual route overrides defined in the DataTransferRoute table.

    This is a placeholder function for future implementation of route overrides.
    Currently just reports what overrides exist without acting on them.

    Parameters
    ----------
    session : sqlalchemy.orm.Session
        The database session.

    Returns
    -------
    List[models.DataTransferRoute]
        List of manual route overrides (not yet implemented).
    """
    # Get all manual route overrides
    route_overrides = session.query(models.DataTransferRoute).all()

    if route_overrides:
        logger.info(
            f"Found {len(route_overrides)} route overrides (not yet implemented)"
        )
        for route in route_overrides:
            logger.debug(
                f"Route override: {route.origin_site.short_name} -> {route.destination_site.short_name} "
                f"(type: {route.route_type}, method: {route.transfer_method})"
            )
    else:
        logger.debug("No route overrides found")

    return route_overrides

Intended Route Types (when implemented):

  • DIRECT: Skip intermediate steps, go directly

  • RELAY: Route through specific intermediate site

  • CUSTOM: Override normal location selection

Intended Use Cases (when implemented):

  • Testing specific routes

  • Temporary routing during maintenance

  • Optimizing for specific network paths

  • Manual intervention during issues

Current Workaround:

To manually control routing today, you must:

  1. Temporarily deactivate unwanted LTA sites in the database (set active=false)

  2. Let automatic route discovery use remaining sites

  3. Reactivate sites when ready

This is not ideal and proper custom routing should be implemented for production use.

CLI Tools#

List All Queues:

ccat_data_transfer list-queues

Shows all dynamically discovered queues based on active DataLocations.

List Queues for Specific Location:

ccat_data_transfer list-queues cologne_buffer

Shows only queues for the specified location.

List All Locations:

ccat_data_transfer list-locations

Output shows sites, locations, and their types:

**CCAT Observatory - ccat**
- telescope_computer (SOURCE)
- buffer (BUFFER)

**University of Cologne - cologne**
- buffer (BUFFER)
- lta (LONG_TERM_ARCHIVE)
- processing (PROCESSING)

**Cornell University - us**
- buffer (BUFFER)
- lta (LONG_TERM_ARCHIVE)

Note: There is no show-routes command. To see routing in action, check the logs from the data-transfer-package-manager or query the database directly (see Database Queries section below).

Database Queries#

For deeper investigation:

-- Show all active locations and their queues
SELECT
    s.short_name as site,
    dl.name as location,
    dl.location_type,
    dl.priority,
    dl.active
FROM data_location dl
JOIN site s ON dl.site_id = s.id
WHERE dl.active = true
ORDER BY s.short_name, dl.location_type, dl.priority;

-- Show recent transfers and their routes
SELECT
    dt.id,
    origin.name as origin,
    dest.name as destination,
    dt.status,
    dt.start_time
FROM data_transfer dt
JOIN data_location origin ON dt.origin_location_id = origin.id
JOIN data_location dest ON dt.destination_location_id = dest.id
ORDER BY dt.start_time DESC
LIMIT 100;

-- Show automatic routes discovered for a specific source site
SELECT DISTINCT
    source_site.short_name as source,
    lta_site.short_name as lta_destination
FROM site source_site
CROSS JOIN site lta_site
WHERE EXISTS (
    SELECT 1 FROM data_location dl1
    WHERE dl1.site_id = source_site.id
    AND dl1.location_type = 'SOURCE'
    AND dl1.active = true
)
AND EXISTS (
    SELECT 1 FROM data_location dl2
    WHERE dl2.site_id = lta_site.id
    AND dl2.location_type = 'LONG_TERM_ARCHIVE'
    AND dl2.active = true
)
AND source_site.id != lta_site.id;

Adding New Sites/Locations#

The beauty of database-driven routing: adding sites requires no code changes.

Process#

  1. Add Site to Database:

    new_site = Site(
        name="New Observatory",
        short_name="newobs",
        location="Antarctica"
    )
    session.add(new_site)
    
  2. Add Data Locations:

    source = DiskDataLocation(
        name="telescope_computer",
        site=new_site,
        location_type=LocationType.SOURCE,
        path="/data/raw",
        active=True,
        priority=1
    )
    
    buffer = DiskDataLocation(
        name="buffer",
        site=new_site,
        location_type=LocationType.BUFFER,
        path="/data/buffer",
        active=True,
        priority=1
    )
    
    session.add_all([source, buffer])
    
  3. Restart Services:

    Queue discovery runs at startup, will find new locations

  4. Start Workers:

    # On new site's servers
    celery worker -Q newobs_telescope_computer_raw_data_package_creation,\
                     newobs_telescope_computer_deletion,\
                     newobs_buffer_data_transfer_package_creation,\
                     newobs_buffer_data_transfer
    
  5. Routes Automatically Created:

    • discover_automatic_routes finds new SOURCE→LTA routes

    • discover_secondary_routes adds LTA←→new_lta routes (if LTA location added)

    • Round-robin includes new site in rotation

That’s it! No code changes, just configuration.

Troubleshooting Routing#

Problem: Tasks not being processed

Checks:

  1. Is the location active? SELECT * FROM data_location WHERE name = '...'

  2. Does a worker exist for this queue? celery inspect active_queues

  3. Is the queue name correct? ccat_data_transfer list-queues

  4. Is the operation valid for this location type? Check QUEUE_OPERATIONS_BY_LOCATION_TYPE

Problem: Tasks going to wrong queue

Checks:

  1. Verify queue name generation logic

  2. Check route_task_by_location call site

  3. Examine manager logs for routing decisions

  4. Ensure database has correct site/location names

Problem: Uneven distribution across LTA sites (primary transfers)

Checks:

  1. Verify round-robin state in Redis: redis-cli GET round_robin:source:ccat

  2. Check that all LTA sites are active

  3. Examine recent transfers: are they balanced?

  4. Reset round-robin if needed: redis-cli DEL round_robin:source:ccat

Problem: Packages not replicating to all LTA sites (secondary transfers)

Checks:

  1. Verify secondary routes are discovered: Check logs for discover_secondary_routes

  2. Check if create_secondary_data_transfers() is running periodically

  3. Verify all LTA sites have active buffer locations

  4. Check for failed transfers: SELECT * FROM data_transfer WHERE status = 'FAILED'

  5. Look for packages stuck with physical copies at only one LTA site:

    -- Find packages that exist at fewer than all LTA sites
    SELECT
        dtp.id,
        dtp.name,
        COUNT(DISTINCT dl.site_id) as sites_with_copy
    FROM data_transfer_package dtp
    JOIN data_transfer_package_physical_copy pc ON pc.data_transfer_package_id = dtp.id
    JOIN data_location dl ON pc.data_location_id = dl.id
    WHERE dl.location_type = 'BUFFER'
    AND pc.status = 'COMPLETED'
    GROUP BY dtp.id, dtp.name
    HAVING COUNT(DISTINCT dl.site_id) < (
        SELECT COUNT(DISTINCT site_id)
        FROM data_location
        WHERE location_type = 'LONG_TERM_ARCHIVE'
        AND active = true
    );
    

Next Steps#