import logging
import os
import json
from typing import List, Optional, Any, Tuple, Dict
from sqlalchemy.orm import Session, aliased
from ccat_ops_db import models
from .config.config import ccat_data_transfer_settings
from .utils import (
get_redis_connection,
unique_id,
calculate_checksum,
create_archive,
generate_readable_filename,
)
from .setup_celery_app import app, make_celery_task
from .exceptions import (
DatabaseOperationError,
)
from .database import DatabaseConnection
from .logging_utils import get_structured_logger
from .queue_discovery import route_task_by_location
from .operation_types import OperationType
from .health_check import HealthCheck
# Set up global logger
logger = get_structured_logger(__name__)
redis_ = get_redis_connection()
[docs]
class DataTransferPackageOperations(make_celery_task()):
"""Base class for data transfer package operations with error handling."""
operation_type = "data_transfer_package"
max_retries = 3
[docs]
def get_retry_count(self, session, operation_id):
"""Get current retry count for a data transfer package operation."""
try:
package = session.query(models.DataTransferPackage).get(operation_id)
return package.retry_count if package else 0
except Exception:
return 0
[docs]
def reset_state_on_failure(self, session, operation_id, exc):
"""Reset data transfer package state for retry."""
try:
package = session.query(models.DataTransferPackage).get(operation_id)
if package:
package.status = models.Status.PENDING
package.retry_count += 1
package.failure_error_message = None
# Reset associated raw data packages
for raw_package in package.raw_data_packages:
raw_package.state = models.PackageState.TRANSFERRING
session.commit()
redis_.publish(
"transfer:overview",
json.dumps({"type": "transfer_package_reset", "data": package.id}),
)
logger.info(
"Data transfer package reset for retry",
package_id=operation_id,
retry_count=package.retry_count,
)
except Exception as e:
logger.error(
"Failed to reset data transfer package state",
package_id=operation_id,
error=str(e),
)
[docs]
def mark_permanent_failure(self, session, operation_id, exc):
"""Mark data transfer package as permanently failed."""
try:
package = session.query(models.DataTransferPackage).get(operation_id)
if package:
package.status = models.Status.FAILED
package.failure_error_message = str(exc)
# Mark associated raw data packages as failed
for raw_package in package.raw_data_packages:
raw_package.state = models.PackageState.FAILED
session.commit()
redis_.publish(
"transfer:overview",
json.dumps({"type": "transfer_package_failed", "data": package.id}),
)
logger.error(
"Data transfer package marked as permanently failed",
package_id=operation_id,
error=str(exc),
)
except Exception as e:
logger.error(
"Failed to mark data transfer package as failed",
package_id=operation_id,
error=str(e),
)
[docs]
def get_operation_info(self, args, kwargs):
"""Get additional context for package tasks."""
if len(args) < 2:
return {}
return {
"package_id": str(args[0]),
"buffer_location_id": str(args[1]),
}
@app.task(
base=DataTransferPackageOperations,
name="ccat:data_transfer_package:create_package",
bind=True,
)
def create_data_transfer_package_task(
self,
data_transfer_package_id: int,
buffer_location_id: int,
session: Optional[Session] = None,
) -> bool:
"""
Create a data transfer package by assembling raw data packages from the buffer location.
Parameters
----------
self : celery.Task
The Celery task instance.
data_transfer_package_id : int
The ID of the DataTransferPackage to be processed.
buffer_location_id : int
The ID of the buffer DataLocation.
session : sqlalchemy.orm.Session, optional
A database session for use in testing environments.
Returns
-------
bool
True if the operation was successful.
"""
if session is None:
with self.session_scope() as session:
return _create_data_transfer_package_internal(
session, data_transfer_package_id, buffer_location_id
)
else:
return _create_data_transfer_package_internal(
session, data_transfer_package_id, buffer_location_id
)
def _create_data_transfer_package_internal(
session: Session, data_transfer_package_id: int, buffer_location_id: int
) -> bool:
"""
Internal function to create a data transfer package.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
data_transfer_package_id : int
The ID of the DataTransferPackage to be processed.
buffer_location_id : int
The ID of the buffer DataLocation.
Returns
-------
bool
True if the operation was successful.
"""
logger.info(
f"Creating data transfer package {data_transfer_package_id} from buffer {buffer_location_id}"
)
try:
# Retrieve package and buffer location from database
data_transfer_package = session.query(models.DataTransferPackage).get(
data_transfer_package_id
)
if not data_transfer_package:
logger.error(
"Data transfer package not found", package_id=data_transfer_package_id
)
return False
if data_transfer_package.status == models.Status.COMPLETED:
logger.info(
f"Data transfer package {data_transfer_package.file_name} already completed"
)
return True
data_transfer_package.status = models.Status.IN_PROGRESS
session.commit()
buffer_location = session.query(models.DataLocation).get(buffer_location_id)
if not buffer_location:
logger.error("Buffer location not found", location_id=buffer_location_id)
return False
# Get all raw data packages for this transfer package
raw_data_packages = data_transfer_package.raw_data_packages
if not raw_data_packages:
logger.warning(
"No raw data packages found for transfer package",
package_id=data_transfer_package_id,
)
return False
# Create archive from raw data packages
logger.info(f"Creating archive for {len(raw_data_packages)} raw data packages")
archive_success = _create_transfer_package_archive(
session, data_transfer_package, buffer_location, raw_data_packages
)
if archive_success:
# Update package status
data_transfer_package.status = models.Status.COMPLETED
# Create physical copy record at buffer location
physical_copy = models.DataTransferPackagePhysicalCopy(
data_transfer_package=data_transfer_package,
data_location=buffer_location,
checksum=data_transfer_package.checksum,
)
session.add(physical_copy)
session.commit()
redis_.publish(
"transfer:overview",
json.dumps(
{
"type": "transfer_package_created",
"data": data_transfer_package.id,
}
),
)
logger.info(
f"Created data transfer package {data_transfer_package.file_name} with {len(raw_data_packages)} packages"
)
return True
else:
logger.error(
"Failed to create package archive", package_id=data_transfer_package_id
)
return False
except Exception as e:
logger.error(f"Error creating data transfer package: {str(e)}")
session.rollback()
return False
def _create_transfer_package_archive(
session: Session,
data_transfer_package: models.DataTransferPackage,
buffer_location: models.DataLocation,
raw_data_packages: List[models.RawDataPackage],
) -> bool:
"""
Create an archive from raw data packages.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
data_transfer_package : models.DataTransferPackage
The data transfer package to create.
buffer_location : models.DataLocation
The buffer data location.
raw_data_packages : List[models.RawDataPackage]
List of raw data packages to include.
Returns
-------
bool
True if successful, False otherwise.
"""
try:
# Ensure the package directory exists
package_dir = os.path.dirname(
os.path.join(buffer_location.path, data_transfer_package.relative_path)
)
os.makedirs(package_dir, exist_ok=True)
archive_path = os.path.join(
buffer_location.path, data_transfer_package.relative_path
)
total_size = 0
# Use the existing create_archive function which handles different formats
create_archive(raw_data_packages, archive_path, buffer_location.path)
# Calculate total size and checksum
total_size = sum(package.size for package in raw_data_packages)
checksum = calculate_checksum(archive_path)
if checksum:
data_transfer_package.checksum = checksum
data_transfer_package.size = total_size
return True
else:
logger.error("Failed to calculate checksum for archive")
return False
except Exception as e:
logger.error(f"Error creating transfer package archive: {str(e)}")
return False
[docs]
def get_unpackaged_raw_data_packages_in_buffers(
session: Session,
) -> Dict[models.DataLocation, List[models.RawDataPackage]]:
"""
Get all raw data packages in buffer locations that are not yet part of any data transfer package.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
Dict[models.DataLocation, List[models.RawDataPackage]]
Dictionary mapping buffer locations to lists of unpackaged raw data packages.
"""
logger.info("Scanning all buffer locations for unpackaged raw data packages")
# Get all active buffer locations
buffer_locations = (
session.query(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.BUFFER,
models.DataLocation.active == True, # noqa: E712
)
.all()
)
logger.debug(buffer_locations)
unpackaged_by_buffer = {}
for buffer_location in buffer_locations:
# Find raw data packages in this buffer that are not in any data transfer package
# We check physical copies to see what's actually in this buffer
logger.debug(
f"Checking buffer location: {buffer_location.name} (id: {buffer_location.id})"
)
# Now do the full query with all conditions
unpackaged_packages = (
session.query(models.RawDataPackage)
.join(models.RawDataPackagePhysicalCopy)
.filter(
models.RawDataPackagePhysicalCopy.data_location_id
== buffer_location.id,
models.RawDataPackage.data_transfer_package_id == None, # noqa: E711
models.RawDataPackagePhysicalCopy.status
== models.PhysicalCopyStatus.PRESENT,
# models.RawDataPackagePhysicalCopy.deletion_status
# == models.Status.PENDING,
)
.distinct()
.all()
)
logger.debug(f"Found {len(unpackaged_packages)} packages matching all criteria")
if unpackaged_packages:
unpackaged_by_buffer[buffer_location] = unpackaged_packages
logger.info(
f"Found {len(unpackaged_packages)} unpackaged raw data packages in buffer {buffer_location.name}"
)
total_packages = sum(len(packages) for packages in unpackaged_by_buffer.values())
logger.info(f"Total unpackaged raw data packages found: {total_packages}")
return unpackaged_by_buffer
[docs]
def group_packages_for_transfer(
raw_data_packages: List[models.RawDataPackage],
max_package_size: Optional[int] = None,
upper_percentage: Optional[float] = 1.1,
lower_percentage: Optional[float] = 0.9,
) -> List[List[models.RawDataPackage]]:
"""
Group raw data packages into data transfer packages of a specified size range.
Parameters
----------
raw_data_packages : List[models.RawDataPackage]
List of raw data packages to be grouped.
max_package_size : int, optional
Maximum size of a data transfer package in bytes. If None, uses the value from settings.
upper_percentage : float, optional
Upper percentage of max_package_size to start a new package. Default is 1.10 (110%).
lower_percentage : float, optional
Lower percentage of max_package_size to consider a package ready. Default is 0.9 (90%).
Returns
-------
List[List[models.RawDataPackage]]
A list of lists, where each inner list represents a data transfer package
containing raw data packages that, when combined, are within the specified size range.
"""
logger.info("Grouping raw data packages into data transfer packages")
if max_package_size is None:
# Convert GB to bytes for internal use
max_package_size = int(
ccat_data_transfer_settings.MAXIMUM_DATA_TRANSFER_PACKAGE_SIZE_GB
* 1024
* 1024
* 1024
)
sorted_packages = sorted(raw_data_packages, key=lambda x: x.size, reverse=True)
data_transfer_package_list = []
current_package = []
current_size = 0
for package in sorted_packages:
if package.size >= max_package_size * lower_percentage:
# Large package goes alone
data_transfer_package_list.append([package])
logger.info(
f"Large raw data package {package.name} added as a separate transfer package"
)
elif (current_size + package.size) > max_package_size * upper_percentage:
# Would exceed upper limit, start new package
if current_package:
data_transfer_package_list.append(current_package)
logger.info(
"Data transfer package reached optimal size -> begin new package"
)
current_package, current_size = [], 0
current_package.append(package)
current_size += package.size
else:
# Add to current package
current_package.append(package)
current_size += package.size
# Log progress
if current_package:
package_ready_percentage = (current_size / max_package_size) * 100
logger.debug(
f"Package size status: {package_ready_percentage:.0f}% of max size"
)
# Add final package if it meets minimum size
if current_package and current_size >= max_package_size * lower_percentage:
data_transfer_package_list.append(current_package)
logger.info(
f"Final package created with {len(current_package)} raw data packages"
)
logger.info(
f"Grouped {len(raw_data_packages)} packages into {len(data_transfer_package_list)} transfer packages"
)
return data_transfer_package_list
[docs]
def discover_automatic_routes(
session: Session,
) -> List[Tuple[models.Site, models.Site]]:
"""
Discover automatic routes from all source sites to all LTA sites.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
List[Tuple[models.Site, models.Site]]
List of (source_site, lta_site) tuples representing automatic routes.
"""
# Get all sites with source locations
source_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.SOURCE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
# Get all sites with LTA locations
lta_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
automatic_routes = []
for source_site in source_sites:
for lta_site in lta_sites:
if source_site.id != lta_site.id: # Don't route to self
automatic_routes.append((source_site, lta_site))
logger.info(f"Discovered {len(automatic_routes)} automatic routes")
return automatic_routes
[docs]
def discover_secondary_routes(
session: Session,
) -> List[Tuple[models.Site, models.Site]]:
"""
Discover secondary routes between all LTA sites for data replication.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
List[Tuple[models.Site, models.Site]]
List of (lta_site, lta_site) tuples representing secondary routes.
"""
# Get all sites with LTA locations
lta_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
secondary_routes = []
for origin_lta in lta_sites:
for dest_lta in lta_sites:
if origin_lta.id != dest_lta.id: # Don't route to self
secondary_routes.append((origin_lta, dest_lta))
logger.info(f"Discovered {len(secondary_routes)} secondary routes")
return secondary_routes
[docs]
def find_route_overrides(session: Session) -> List[models.DataTransferRoute]:
"""
Find manual route overrides defined in the DataTransferRoute table.
This is a placeholder function for future implementation of route overrides.
Currently just reports what overrides exist without acting on them.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
List[models.DataTransferRoute]
List of manual route overrides (not yet implemented).
"""
# Get all manual route overrides
route_overrides = session.query(models.DataTransferRoute).all()
if route_overrides:
logger.info(
f"Found {len(route_overrides)} route overrides (not yet implemented)"
)
for route in route_overrides:
logger.debug(
f"Route override: {route.origin_site.short_name} -> {route.destination_site.short_name} "
f"(type: {route.route_type}, method: {route.transfer_method})"
)
else:
logger.debug("No route overrides found")
return route_overrides
[docs]
def get_primary_buffer_for_site(
session: Session, site: models.Site
) -> Optional[models.DataLocation]:
"""
Get the primary (highest priority) active buffer for a site.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
site : models.Site
The site to get the buffer for.
Returns
-------
Optional[models.DataLocation]
The primary buffer location, or None if no active buffer exists.
"""
buffer_location = (
session.query(models.DataLocation)
.filter(
models.DataLocation.site_id == site.id,
models.DataLocation.location_type == models.LocationType.BUFFER,
models.DataLocation.active == True, # noqa: E712
)
.order_by(models.DataLocation.priority.asc())
.first()
)
if buffer_location:
logger.debug(
f"Primary buffer for site {site.short_name}: {buffer_location.name}"
)
else:
logger.warning(f"No active buffer found for site {site.short_name}")
return buffer_location
[docs]
def get_next_lta_site_round_robin(
session: Session,
source_site: models.Site,
automatic_routes: List[Tuple[models.Site, models.Site]],
) -> Optional[models.Site]:
"""
Get the next LTA site for round-robin distribution from a source site.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
source_site : models.Site
The source site.
automatic_routes : List[Tuple[models.Site, models.Site]]
List of automatic routes.
Returns
-------
Optional[models.Site]
The next LTA site in round-robin order.
"""
# Get all LTA sites that this source site can route to
available_lta_sites = [
lta_site
for src_site, lta_site in automatic_routes
if src_site.id == source_site.id
]
if not available_lta_sites:
logger.warning(
f"No LTA sites available for source site {source_site.short_name}"
)
return None
# Use Redis to track round-robin state
redis_key = f"round_robin:source:{source_site.short_name}"
current_index = redis_.get(redis_key)
if current_index is None:
current_index = 0
else:
current_index = int(current_index)
# Get next LTA site
next_lta_site = available_lta_sites[current_index]
# Update round-robin index
next_index = (current_index + 1) % len(available_lta_sites)
redis_.set(redis_key, next_index)
logger.debug(
f"Round-robin: {source_site.short_name} -> {next_lta_site.short_name} (index {current_index})"
)
return next_lta_site
[docs]
def create_primary_data_transfers(
session: Session,
data_transfer_package: models.DataTransferPackage,
source_buffer: models.DataLocation,
) -> None:
"""
Create primary data transfers for a completed DataTransferPackage using automatic route discovery.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
data_transfer_package : models.DataTransferPackage
The completed data transfer package.
source_buffer : models.DataLocation
The source buffer location.
"""
logger.info(
f"Creating primary data transfers for package {data_transfer_package.id}"
)
# Discover automatic routes
automatic_routes = discover_automatic_routes(session)
# Find which site this buffer belongs to
source_site = source_buffer.site
# Get the next LTA site using round-robin
destination_lta_site = get_next_lta_site_round_robin(
session, source_site, automatic_routes
)
if not destination_lta_site:
logger.error(
f"No destination LTA site available for source site {source_site.short_name}"
)
return
# Get the primary buffer for the destination LTA site
destination_buffer = get_primary_buffer_for_site(session, destination_lta_site)
if not destination_buffer:
logger.error(
f"No active buffer found for destination LTA site {destination_lta_site.short_name}"
)
return
# Create the primary data transfer
data_transfer = models.DataTransfer(
data_transfer_package=data_transfer_package,
origin_location=source_buffer,
destination_location=destination_buffer,
data_transfer_method="bbcp", # Default method, could be configurable
status=models.Status.PENDING,
)
session.add(data_transfer)
session.commit()
logger.info(
f"Created primary data transfer from site [{source_buffer.site.short_name}] buffer [{source_buffer.name}] to site [{destination_buffer.site.short_name}] buffer [{destination_buffer.name}] "
f"for package {data_transfer_package.id}"
)
[docs]
def create_secondary_data_transfers(session: Session) -> None:
"""
Create secondary data transfers between LTA sites for completed DataTransferPackages.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
"""
logger.info("Creating secondary data transfers between LTA sites")
# Discover secondary routes
secondary_routes = discover_secondary_routes(session)
# Find completed packages that need secondary transfers
completed_packages = (
session.query(models.DataTransferPackage)
.filter_by(status=models.Status.COMPLETED)
.all()
)
new_transfers_count = 0
for package in completed_packages:
# Find which LTA sites already have this package
existing_locations = set()
for physical_copy in package.physical_copies:
if physical_copy.data_location.location_type == models.LocationType.BUFFER:
if physical_copy.data_location.site.id in [
lta[0].id for lta in secondary_routes
]:
existing_locations.add(physical_copy.data_location.site.id)
# Create transfers to LTA sites that don't have the package yet
for origin_lta, dest_lta in secondary_routes:
if (
origin_lta.id in existing_locations
and dest_lta.id not in existing_locations
):
# Check if transfer already exists
origin_location = aliased(models.DataLocation)
destination_location = aliased(models.DataLocation)
existing_transfer = (
session.query(models.DataTransfer)
.join(
origin_location,
models.DataTransfer.origin_location_id == origin_location.id,
)
.join(
destination_location,
models.DataTransfer.destination_location_id
== destination_location.id,
)
.filter(
models.DataTransfer.data_transfer_package_id == package.id,
origin_location.site_id == origin_lta.id,
destination_location.site_id == dest_lta.id,
)
.first()
)
if not existing_transfer:
origin_buffer = get_primary_buffer_for_site(session, origin_lta)
dest_buffer = get_primary_buffer_for_site(session, dest_lta)
if origin_buffer and dest_buffer:
secondary_transfer = models.DataTransfer(
data_transfer_package=package,
origin_location=origin_buffer,
destination_location=dest_buffer,
data_transfer_method="bbcp",
status=models.Status.PENDING,
)
session.add(secondary_transfer)
new_transfers_count += 1
logger.debug(
f"Created secondary transfer from {origin_lta.short_name} to {dest_lta.short_name} "
f"for package {package.id}"
)
if new_transfers_count > 0:
session.commit()
logger.info(f"Created {new_transfers_count} secondary data transfers")
else:
logger.debug("No new secondary transfers needed")
[docs]
def create_data_transfer_packages_for_buffer(
session: Session,
buffer_location: models.DataLocation,
raw_data_packages: List[models.RawDataPackage],
) -> None:
"""
Create data transfer packages for raw data packages in a buffer location.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
buffer_location : models.DataLocation
The buffer location.
raw_data_packages : List[models.RawDataPackage]
List of raw data packages to process.
"""
logger.info(
f"Creating data transfer packages for {len(raw_data_packages)} packages in buffer {buffer_location.name}"
)
# Group packages into appropriately sized transfer packages
transfer_package_groups = group_packages_for_transfer(raw_data_packages)
for group in transfer_package_groups:
_create_data_transfer_package_entry(session, group, buffer_location)
session.commit()
logger.info(
f"Created {len(transfer_package_groups)} data transfer packages for buffer {buffer_location.name}"
)
def _create_data_transfer_package_entry(
session: Session,
raw_data_packages: List[models.RawDataPackage],
buffer_location: models.DataLocation,
) -> models.DataTransferPackage:
"""
Create a DataTransferPackage database entry and schedule the assembly task.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
raw_data_packages : List[models.RawDataPackage]
List of raw data packages to include.
buffer_location : models.DataLocation
The buffer location.
Returns
-------
models.DataTransferPackage
The created data transfer package.
"""
logger.info(
f"Creating data transfer package for {len(raw_data_packages)} raw data packages"
)
# Generate unique package name and path
package_id = unique_id()
readable_name = generate_readable_filename(
raw_data_packages[0], # Use the first package for metadata
package_id[:8], # Use only first 8 chars of hash for brevity
file_type="transfer",
extension="tar", # Always use tar.gz extension for consistency
)
# Create DataTransferPackage entry
data_transfer_package = models.DataTransferPackage(
hash_id=package_id,
file_name=readable_name,
status=models.Status.PENDING,
retry_count=0,
origin_location=buffer_location,
size=sum(pkg.size for pkg in raw_data_packages),
checksum="", # Will be calculated during assembly
relative_path=f"data_transfer_packages/{readable_name}",
)
# Associate raw data packages with the transfer package
for raw_package in raw_data_packages:
raw_package.data_transfer_package = data_transfer_package
raw_package.state = models.PackageState.TRANSFERRING
session.add(data_transfer_package)
session.commit() # Get the ID
# Schedule Celery task for package assembly
logger.info(
"Created data transfer package",
package_id=data_transfer_package.id,
package_name=readable_name,
raw_package_count=len(raw_data_packages),
)
return data_transfer_package
[docs]
def schedule_data_transfer_package_creation(session: Session) -> None:
"""
Schedule data transfer package creation for all pending data transfer packages.
"""
pending_data_transfer_packages = (
session.query(models.DataTransferPackage)
.filter_by(status=models.Status.PENDING)
.all()
)
if not pending_data_transfer_packages:
logger.info("No pending data transfer packages found")
return
for package in pending_data_transfer_packages:
buffer_location = package.origin_location
queue_name = route_task_by_location(
OperationType.DATA_TRANSFER_PACKAGE_CREATION, buffer_location
)
create_data_transfer_package_task.apply_async(
args=[package.id, buffer_location.id], queue=queue_name
)
logger.info(
f"Scheduled data transfer package creation for package {package.id} and queue {queue_name}"
)
package.status = models.Status.SCHEDULED
session.commit()
logger.info(
f"Scheduled {len(pending_data_transfer_packages)} data transfer packages"
)
[docs]
def create_data_transfer_packages(
verbose: bool = False, session: Session = None
) -> None:
"""
Scan all buffer locations and create data transfer packages for unpackaged raw data packages.
This function manages the process of creating data transfer packages by:
1. Finding all buffer locations with unpackaged raw data packages.
2. Grouping packages into appropriately sized transfer packages.
3. Creating DataTransferPackage entries in the database.
4. Creating primary and secondary data transfers using automatic route discovery.
5. Scheduling Celery tasks to handle the package assembly.
Args:
verbose (bool, optional): If True, sets logging level to DEBUG. Defaults to False.
session (Session, optional): Database session for testing. If None, creates new session.
Raises:
ConfigurationError: If no active buffer is found for a site.
DatabaseOperationError: If there's an error during database operations.
"""
if session is None:
db = DatabaseConnection()
session, _ = db.get_connection()
if verbose:
logger.setLevel(logging.DEBUG)
try:
# Report any route overrides (placeholder for future implementation)
find_route_overrides(session)
# Get all unpackaged raw data packages by buffer location
unpackaged_by_buffer = get_unpackaged_raw_data_packages_in_buffers(session)
if unpackaged_by_buffer:
logger.info(
f"Processing {len(unpackaged_by_buffer)} buffer locations with unpackaged data"
)
# Create transfer packages for each buffer location
for buffer_location, raw_data_packages in unpackaged_by_buffer.items():
try:
create_data_transfer_packages_for_buffer(
session, buffer_location, raw_data_packages
)
except Exception as e:
logger.error(
f"Error processing buffer location {buffer_location.name}: {str(e)}"
)
session.rollback()
raise DatabaseOperationError(
f"Failed to create data transfer packages for {buffer_location.name}: {str(e)}"
) from e
schedule_data_transfer_package_creation(session)
else:
logger.info("No unpackaged raw data packages found in any buffer")
# Always create primary data transfers for completed packages (regardless of new packages)
_create_primary_transfers_for_completed_packages(session)
# Always create secondary data transfers between LTA sites (regardless of new packages)
create_secondary_data_transfers(session)
logger.info("Completed data transfer package creation and transfer scheduling")
except Exception as e:
logger.exception("An error occurred while creating data transfer packages")
raise RuntimeError("Failed to create data transfer packages") from e
def _create_primary_transfers_for_completed_packages(session: Session) -> None:
"""
Create primary data transfers for all completed DataTransferPackages that don't have transfers yet.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
"""
logger.info("Creating primary transfers for completed packages")
# Find completed packages without any transfers
completed_packages = (
session.query(models.DataTransferPackage)
.filter_by(status=models.Status.COMPLETED)
.filter(~models.DataTransferPackage.data_transfers.any())
.all()
)
if not completed_packages:
logger.debug("No completed packages without transfers found")
return
logger.info(
f"Found {len(completed_packages)} completed packages needing primary transfers"
)
for package in completed_packages:
# Find the source buffer location for this package
source_buffer = None
for physical_copy in package.physical_copies:
if physical_copy.data_location.location_type == models.LocationType.BUFFER:
# Check if this buffer belongs to a source site
source_locations = (
session.query(models.DataLocation)
.filter(
models.DataLocation.site_id
== physical_copy.data_location.site_id,
models.DataLocation.location_type == models.LocationType.SOURCE,
models.DataLocation.active == True, # noqa: E712
)
.first()
)
if source_locations:
source_buffer = physical_copy.data_location
break
if source_buffer:
try:
create_primary_data_transfers(session, package, source_buffer)
except Exception as e:
logger.error(
f"Error creating primary transfer for package {package.id}: {str(e)}"
)
# Continue with other packages
continue
else:
logger.warning(f"No source buffer found for package {package.id}")
[docs]
def data_transfer_package_manager_service(verbose: bool = False) -> None:
"""
Main service function for the data transfer package manager.
This service continuously scans buffer locations for unpackaged raw data packages
and creates transfer packages with automatic route discovery.
Args:
verbose (bool): If True, sets logging level to DEBUG. Default is False.
"""
if verbose or ccat_data_transfer_settings.VERBOSE:
logger.setLevel(logging.DEBUG)
logger.debug("verbose_mode_enabled")
db = DatabaseConnection()
session, _ = db.get_connection()
# Initialize health check
health_check = HealthCheck(
service_type="data_transfer_package",
service_name="data_transfer_package_manager",
)
health_check.start()
try:
while True: # Main service loop
try:
logger.debug("Starting data transfer package creation cycle")
create_data_transfer_packages(verbose=verbose, session=session)
# Sleep for the configured interval
import time
time.sleep(ccat_data_transfer_settings.PACKAGE_MANAGER_SLEEP_TIME)
except Exception as e:
logger.error("service_loop_error", error=str(e))
import time
time.sleep(10) # Wait before retry
finally:
health_check.stop()
session.close()
# Legacy functions for compatibility (can be removed after full migration)
[docs]
def create_primary_data_transfer_packages(
verbose: bool = False, session: Session = None
) -> None:
"""
Legacy wrapper function for backward compatibility.
This function redirects to the new create_data_transfer_packages function.
Can be removed once all callers are updated.
"""
logger.warning(
"Using legacy create_primary_data_transfer_packages function. Please update to use create_data_transfer_packages."
)
create_data_transfer_packages(verbose=verbose, session=session)
def _create_primary_data_transfer_packages_internal(session: Session) -> None:
"""
Legacy internal function for backward compatibility.
This function redirects to the new implementation.
Can be removed once all callers are updated.
"""
logger.warning(
"Using legacy _create_primary_data_transfer_packages_internal function. Please update to use create_data_transfer_packages."
)
create_data_transfer_packages(session=session)
# Additional utility functions for the new architecture
[docs]
def get_sites_with_buffer_locations(session: Session) -> List[models.Site]:
"""
Get all sites that have active BUFFER data locations.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
List[models.Site]
List of sites with buffer locations.
"""
sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.BUFFER,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
logger.debug(f"Found {len(sites)} sites with active buffer locations")
return sites
[docs]
def validate_site_configuration(session: Session) -> bool:
"""
Validate that all sites have proper configuration for data transfer.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
bool
True if configuration is valid, False otherwise.
"""
logger.info("Validating site configuration for data transfer")
# Check that all source sites have buffers
source_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.SOURCE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
for site in source_sites:
buffer = get_primary_buffer_for_site(session, site)
if not buffer:
logger.error(f"Source site {site.short_name} has no active buffer location")
return False
# Check that all LTA sites have buffers
lta_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
for site in lta_sites:
buffer = get_primary_buffer_for_site(session, site)
if not buffer:
logger.error(f"LTA site {site.short_name} has no active buffer location")
return False
logger.info("Site configuration validation passed")
return True
[docs]
def get_transfer_statistics(session: Session) -> Dict[str, Any]:
"""
Get statistics about the current transfer system state.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
Dict[str, Any]
Dictionary containing transfer statistics.
"""
stats = {}
# Count unpackaged raw data packages
unpackaged_by_buffer = get_unpackaged_raw_data_packages_in_buffers(session)
stats["unpackaged_packages_by_buffer"] = {
loc.name: len(packages) for loc, packages in unpackaged_by_buffer.items()
}
stats["total_unpackaged_packages"] = sum(
len(packages) for packages in unpackaged_by_buffer.values()
)
# Count pending transfer packages
pending_packages = (
session.query(models.DataTransferPackage)
.filter_by(status=models.Status.PENDING)
.count()
)
stats["pending_transfer_packages"] = pending_packages
# Count pending transfers
pending_transfers = (
session.query(models.DataTransfer)
.filter_by(status=models.Status.PENDING)
.count()
)
stats["pending_transfers"] = pending_transfers
# Automatic routes
automatic_routes = discover_automatic_routes(session)
stats["automatic_routes_count"] = len(automatic_routes)
secondary_routes = discover_secondary_routes(session)
stats["secondary_routes_count"] = len(secondary_routes)
# Route overrides
route_overrides = find_route_overrides(session)
stats["route_overrides_count"] = len(route_overrides)
logger.debug(f"Transfer statistics: {stats}")
return stats