import os
import logging
import json
import sqlalchemy
from ccat_ops_db import models
from .database import DatabaseConnection
from sqlalchemy.orm import Session
from typing import Tuple, Optional
from .setup_celery_app import app, make_celery_task
from .utils import (
create_local_folder,
calculate_checksum,
unpack_local,
get_redis_connection,
)
from .logging_utils import get_structured_logger
from .exceptions import UnpackError, ChecksumVerificationError, ArchiveCorruptionError
from .queue_discovery import route_task_by_location
from .operation_types import OperationType
# Use only task loggers
logger = get_structured_logger(__name__)
redis_ = get_redis_connection()
[docs]
class UnpackTask(make_celery_task()):
"""Base class for unpacking tasks."""
[docs]
def __init__(self):
super().__init__()
self.operation_type = "unpack"
self.max_retries = 3
[docs]
def get_retry_count(self, session, data_transfer_id):
"""Get current retry count for this unpack operation."""
data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
if data_transfer and hasattr(data_transfer, "unpack_retry_count"):
return data_transfer.unpack_retry_count
raise ValueError("Data transfer not found or unpack retry count not available")
[docs]
def reset_state_on_failure(self, session, data_transfer_id, exc):
"""Reset unpack state for retry."""
data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
if data_transfer:
data_transfer.unpack_status = models.Status.PENDING
for (
raw_data_package
) in data_transfer.data_transfer_package.raw_data_packages:
raw_data_package.state = models.PackageState.TRANSFERRING
data_transfer.unpack_failure_error_message = None
data_transfer.unpack_retry_count += 1
logger.info(
"Reset unpack for retry",
data_transfer_id=data_transfer_id,
unpack_retry_count=data_transfer.unpack_retry_count,
)
[docs]
def mark_permanent_failure(self, session, data_transfer_id, exc):
"""Mark unpack as permanently failed."""
data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
if data_transfer:
data_transfer.unpack_status = models.Status.FAILED
for (
raw_data_package
) in data_transfer.data_transfer_package.raw_data_packages:
raw_data_package.state = models.PackageState.FAILED
data_transfer.unpack_failure_error_message = str(exc)
logger.info(
"Marked unpack as permanently failed",
data_transfer_id=data_transfer_id,
error=str(exc),
)
[docs]
def get_operation_info(self, args, kwargs):
"""Get additional context for unpack tasks."""
if not args or len(args) == 0:
return {}
with self.session_scope() as session:
try:
data_transfer = session.query(models.DataTransfer).get(args[0])
if data_transfer:
return {
"destination_location": data_transfer.destination_location.name,
"package_id": str(data_transfer.data_transfer_package_id),
}
except Exception as e:
logger.error(f"Error getting unpack info: {e}")
return {}
@app.task(
base=UnpackTask,
name="ccat:data_transfer:unpack_data_transfer_package",
bind=True,
)
def unpack_data_transfer_package(
self,
data_transfer_id: int,
session: Session = None,
) -> bool:
"""
Unpack a data transfer package and verify its contents using dynamic queue routing.
Parameters
----------
self : celery.Task
The bound Celery task instance.
data_transfer_id : int
The ID of the data transfer to process.
session : Session, optional
An existing database session to use. If None, a new session will be created.
Returns
-------
bool
True if the unpacking and verification process was successful, False otherwise.
"""
if session is None:
with self.session_scope() as session:
return _unpack_data_transfer_package_internal(session, data_transfer_id)
else:
return _unpack_data_transfer_package_internal(session, data_transfer_id)
def _cleanup_corrupted_transfer(
session: Session, data_transfer: models.DataTransfer
) -> None:
"""
Clean up a corrupted transfer by removing files and resetting database state.
This function:
1. Removes the corrupted archive file from destination
2. Schedules deletion of the source archive file on the primary archive
3. Deregisters the raw data packages from the data transfer package
4. Deletes the data transfer package from the database
5. Raises an ArchiveCorruptionError to trigger proper error handling
Raises
------
ArchiveCorruptionError
After cleanup is complete, to trigger proper error handling in the task system
"""
logger.info(
"Cleaning up corrupted transfer",
transfer_id=data_transfer.id,
package_id=data_transfer.data_transfer_package_id,
)
# Get the destination path using the new location system
destination_path = _get_location_path(
data_transfer.destination_location, data_transfer.data_transfer_package
)
# Find the physical copy for the destination location
dest_physical_copy = (
session.query(models.DataTransferPackagePhysicalCopy)
.filter_by(
data_transfer_package_id=data_transfer.data_transfer_package_id,
data_location_id=data_transfer.destination_location_id,
status=models.Status.COMPLETED,
)
.first()
)
if dest_physical_copy:
try:
# Mark for deletion and schedule task
dest_physical_copy.deletion_status = models.Status.SCHEDULED
session.add(dest_physical_copy)
session.commit()
redis_.publish(
"transfer:overview",
json.dumps(
{"type": "corrupted_transfer_cleanup", "data": data_transfer.id}
),
)
# Schedule deletion task using dynamic queue routing
from .deletion_manager import delete_physical_copy
queue_name = route_task_by_location(
OperationType.DELETION, data_transfer.destination_location
)
result = delete_physical_copy.apply_async(
args=[dest_physical_copy.id],
queue=queue_name,
)
# Wait for task completion with a timeout
result.get(timeout=300) # 5 minute timeout
logger.info(
"Successfully deleted corrupted archive from destination",
physical_copy_id=dest_physical_copy.id,
path=destination_path,
)
destination_deletion_successful = True
except Exception as e:
logger.error(
"Failed to delete corrupted archive from destination",
physical_copy_id=dest_physical_copy.id,
path=destination_path,
error=str(e),
)
destination_deletion_successful = False
else:
logger.warning(
"Could not find physical copy for destination location",
path=destination_path,
package_id=data_transfer.data_transfer_package_id,
location_id=data_transfer.destination_location_id,
)
destination_deletion_successful = False
# Schedule deletion of source archive file if it's a secondary transfer In the new
# architecture, we determine if it's a secondary transfer by checking if it is a
# transfer between two LTA sites
lta_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(models.DataLocation.buffer_type == models.BufferType.LTA)
.all()
)
lta_sites_ids = [site.id for site in lta_sites]
is_secondary_transfer = (
data_transfer.origin_location.site_id in lta_sites_ids
and data_transfer.destination_location.site_id in lta_sites_ids
)
if is_secondary_transfer:
source_path = _get_location_path(
data_transfer.origin_location, data_transfer.data_transfer_package
)
# Find or create the physical copy for the source location
source_physical_copy = (
session.query(models.DataTransferPackagePhysicalCopy)
.filter_by(
data_transfer_package_id=data_transfer.data_transfer_package_id,
data_location_id=data_transfer.origin_location_id,
status=models.Status.COMPLETED,
)
.first()
)
if source_physical_copy:
# Mark for deletion and schedule task
source_physical_copy.deletion_status = models.Status.SCHEDULED
session.add(source_physical_copy)
session.commit()
redis_.publish(
"transfer:overview",
json.dumps(
{"type": "corrupted_transfer_cleanup", "data": data_transfer.id}
),
)
# Schedule deletion task using dynamic queue routing
from .deletion_manager import delete_physical_copy
try:
queue_name = route_task_by_location(
OperationType.DELETION, data_transfer.origin_location
)
result = delete_physical_copy.apply_async(
args=[source_physical_copy.id],
queue=queue_name,
)
# Wait for task completion with a timeout
result.get(timeout=300) # 5 minute timeout
logger.info(
"Successfully deleted corrupted archive from source",
physical_copy_id=source_physical_copy.id,
path=source_path,
)
primary_deletion_successful = True
except Exception as e:
logger.error(
"Failed to delete corrupted archive from source",
physical_copy_id=source_physical_copy.id,
path=source_path,
error=str(e),
)
primary_deletion_successful = False
# Continue with cleanup even if deletion fails
# The deletion manager will retry the deletion later
else:
logger.warning(
"Could not find physical copy for source location",
path=source_path,
package_id=data_transfer.data_transfer_package_id,
location_id=data_transfer.origin_location_id,
)
primary_deletion_successful = False
else:
# This is a primary transfer, no source cleanup needed
primary_deletion_successful = True
# Deregister raw data packages from the data transfer package
package = data_transfer.data_transfer_package
raw_data_packages = package.raw_data_packages.copy()
for raw_package in raw_data_packages:
raw_package.data_transfer_package_id = None
session.add(raw_package)
# Delete the data transfer package
# we can only delete the package if both deletions were successful
# otherwise the automatic cleanup will retry the deletion later and needs the
# information from the package
if primary_deletion_successful and destination_deletion_successful:
session.delete(package)
else:
logger.info(
"Keeping data transfer package for retry",
transfer_id=data_transfer.id,
package_id=data_transfer.data_transfer_package_id,
primary_deletion_successful=primary_deletion_successful,
destination_deletion_successful=destination_deletion_successful,
)
# Commit the cleanup changes
session.commit()
redis_.publish(
"transfer:overview",
json.dumps({"type": "corrupted_transfer_cleanup", "data": data_transfer.id}),
)
logger.info(
"Completed cleanup of corrupted transfer",
transfer_id=data_transfer.id,
package_id=data_transfer.data_transfer_package_id,
)
# Raise the error to trigger proper error handling in the task system
raise ArchiveCorruptionError(
"Archive corruption detected - transfer package will be recreated",
archive_path=destination_path,
transfer_id=data_transfer.id,
)
def _unpack_data_transfer_package_internal(
session: Session, data_transfer_id: int
) -> None:
"""
Internal function to unpack a data transfer package and verify its contents.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
data_transfer_id : int
The ID of the data transfer to process.
Raises
------
FileNotFoundError
If the archive file or destination directory is not found.
UnpackError
If the unpacking process fails.
ChecksumVerificationError
If the checksum verification fails.
ArchiveCorruptionError
If the archive is corrupted or incomplete.
"""
data_transfer = _get_data_transfer(session, data_transfer_id)
if data_transfer is None:
raise ValueError(f"Data transfer not found: {data_transfer_id}")
archive_path, destination = _get_paths(data_transfer)
logger.info(f"Unpacking {archive_path} to {destination}")
try:
create_local_folder(destination)
except OSError as e:
raise FileNotFoundError(f"Failed to create destination directory: {str(e)}")
try:
success, error = unpack_local(archive_path, destination)
if not success:
raise UnpackError(f"Failed to unpack archive: {error}")
except ArchiveCorruptionError as e:
logger.error(
"Archive corruption detected", transfer_id=data_transfer_id, error=str(e)
)
_cleanup_corrupted_transfer(session, data_transfer)
raise
except Exception as e:
raise UnpackError(f"Error during unpacking: {str(e)}")
try:
if not _verify_checksums(data_transfer, destination):
raise ChecksumVerificationError("Checksum verification failed")
except Exception as e:
raise ChecksumVerificationError(f"Error during checksum verification: {str(e)}")
# If we get here, everything succeeded
_update_data_transfer_status(session, data_transfer, True)
def _get_data_transfer(
session: Session, data_transfer_id: int
) -> Optional[models.DataTransfer]:
"""Retrieve the DataTransfer object from the database."""
try:
return session.query(models.DataTransfer).filter_by(id=data_transfer_id).one()
except sqlalchemy.orm.exc.NoResultFound:
logger.error("data_transfer_not_found", data_transfer_id=data_transfer_id)
except sqlalchemy.orm.exc.MultipleResultsFound:
logger.error("multiple_data_transfers_found", data_transfer_id=data_transfer_id)
return None
def _get_paths(data_transfer: models.DataTransfer) -> Tuple[str, str]:
"""Get the archive path and destination for unpacking using the new location system."""
archive_path = _get_location_path(
data_transfer.destination_location, data_transfer.data_transfer_package
)
# Get the raw data path from the destination location
if isinstance(data_transfer.destination_location, models.DiskDataLocation):
# For disk locations, use a subdirectory for raw data
# This follows the pattern from the configuration files
destination = os.path.join(
data_transfer.destination_location.path,
)
else:
# For non-disk locations, we need to determine the appropriate path
# This might need to be configurable per location type
raise ValueError(
f"Unpacking not yet supported for storage type: {data_transfer.destination_location.storage_type}"
)
return archive_path, destination
def _get_location_path(
data_location: models.DataLocation,
data_transfer_package: models.DataTransferPackage,
) -> str:
"""
Get the full path for a data transfer package at a specific location.
Parameters
----------
data_location : models.DataLocation
The data location.
data_transfer_package : models.DataTransferPackage
The data transfer package.
Returns
-------
str
The full path to the package at this location.
"""
if isinstance(data_location, models.DiskDataLocation):
return os.path.join(data_location.path, data_transfer_package.relative_path)
elif isinstance(data_location, models.S3DataLocation):
return f"{data_location.prefix}{data_transfer_package.relative_path}"
elif isinstance(data_location, models.TapeDataLocation):
return os.path.join(
data_location.mount_path, data_transfer_package.relative_path
)
else:
raise ValueError(f"Unsupported storage type: {data_location.storage_type}")
def _verify_checksums(data_transfer: models.DataTransfer, destination: str) -> bool:
"""Verify the checksums of the unpacked files."""
for raw_data_package in data_transfer.data_transfer_package.raw_data_packages:
extracted_file_path = os.path.join(destination, raw_data_package.relative_path)
local_checksum = calculate_checksum(extracted_file_path)
if local_checksum is None:
logger.error("checksum_calculation_failed", file_path=extracted_file_path)
return False
if raw_data_package.checksum != local_checksum:
logger.error(
"checksum_verification_failed",
file_path=extracted_file_path,
expected=raw_data_package.checksum,
actual=local_checksum,
)
return False
logger.info(
"checksum_verified", file_path=extracted_file_path, checksum=local_checksum
)
return True
def _update_data_transfer_status(
session: Session,
data_transfer: models.DataTransfer,
success: bool,
) -> None:
"""Update the data transfer status in the database."""
try:
if success:
logger.info(
"verification_completed",
status="success",
data_transfer_id=data_transfer.id,
)
data_transfer.unpack_status = models.Status.COMPLETED
# Add physical copies for each raw data package
for (
raw_data_package
) in data_transfer.data_transfer_package.raw_data_packages:
# Get the raw data path from the destination location
if isinstance(
data_transfer.destination_location, models.DiskDataLocation
):
# For disk locations, use a subdirectory for raw data
# This follows the pattern from the configuration files
_ = os.path.join(
data_transfer.destination_location.path,
"raw_data_packages",
raw_data_package.relative_path,
)
else:
# For non-disk locations, we need to determine the appropriate path
raise ValueError(
f"Unpacking not yet supported for storage type: {data_transfer.destination_location.storage_type}"
)
# Create the physical copy
physical_copy = models.RawDataPackagePhysicalCopy(
raw_data_package=raw_data_package,
data_location=data_transfer.destination_location,
checksum=raw_data_package.checksum,
)
# Add to the session explicitly
session.add(physical_copy)
# Also add to the relationship for consistency
raw_data_package.physical_copies.append(physical_copy)
logger.info(
"package_added_to_location",
data_transfer_id=data_transfer.id,
package_id=raw_data_package.id,
)
logger.info(
"packages_added_to_location",
data_transfer_id=data_transfer.id,
package_count=len(
data_transfer.data_transfer_package.raw_data_packages
),
)
# Commit the transaction
session.commit()
# Publish to Redis after successful commit
redis_.publish(
"transfer:overview",
json.dumps({"type": "unpack_completed", "data": data_transfer.id}),
)
logger.info(
"unpack_status_updated_successfully",
data_transfer_id=data_transfer.id,
status="COMPLETED",
)
else:
# Handle failure case - mark as failed
logger.error(
"verification_failed",
status="failed",
data_transfer_id=data_transfer.id,
)
data_transfer.unpack_status = models.Status.FAILED
session.commit()
# Publish to Redis after commit
redis_.publish(
"transfer:overview",
json.dumps({"type": "unpack_failed", "data": data_transfer.id}),
)
except Exception as e:
# Log the error and rollback the transaction
logger.error(
"failed_to_update_data_transfer_status",
data_transfer_id=data_transfer.id,
error=str(e),
success=success,
)
session.rollback()
raise
def _add_raw_data_packages_to_location(data_transfer: models.DataTransfer) -> None:
"""Add raw data packages to the destination location."""
for raw_data_package in data_transfer.data_transfer_package.raw_data_packages:
if raw_data_package not in data_transfer.destination_location.raw_data_packages:
data_transfer.destination_location.raw_data_packages.append(
raw_data_package
)
def _get_pending_unpackpings(session: Session) -> list[models.DataTransfer]:
return (
session.query(models.DataTransfer)
.filter(models.DataTransfer.status == models.Status.COMPLETED)
.filter(models.DataTransfer.unpack_status == models.Status.PENDING)
.all()
)
def _schedule_unpack_task(data_transfer: models.DataTransfer, session: Session) -> None:
try:
data_transfer.unpack_status = models.Status.SCHEDULED
# Use dynamic queue routing based on destination location
queue_name = route_task_by_location(
OperationType.DATA_TRANSFER_UNPACKING, data_transfer.destination_location
)
logger.info(
"scheduling_unpack",
data_transfer_id=data_transfer.id,
location=data_transfer.destination_location.name,
queue=queue_name,
)
unpack_data_transfer_package.apply_async(
args=[data_transfer.id], queue=queue_name
)
session.commit()
redis_.publish(
"transfer:overview",
json.dumps({"type": "unpack_scheduled", "data": data_transfer.id}),
)
except Exception as e:
logger.error(
"scheduling_failed", data_transfer_id=data_transfer.id, error=str(e)
)
raise
[docs]
def unpack_and_verify_files(verbose: bool = False, session: Session = None) -> None:
"""
Unpack transferred files and verify their xxHash checksums.
This function retrieves all completed data transfers that are pending unpacking,
and schedules Celery tasks to unpack and verify each package.
Parameters
----------
verbose : bool, optional
If True, sets logging level to DEBUG. Default is False.
session : Session, optional
An existing database session to use. If None, a new session will be created.
Returns
-------
None
Raises
------
SQLAlchemyError
If there's an issue with database operations.
"""
if verbose:
logger.setLevel(logging.DEBUG)
should_close_session = False
if session is None:
db = DatabaseConnection()
session, _ = db.get_connection()
should_close_session = True
try:
pending_data_unpackings = _get_pending_unpackpings(session)
if len(pending_data_unpackings) > 0:
logger.info("pending_unpacks_found", count=len(pending_data_unpackings))
else:
logger.debug("no_pending_unpacks", count=0)
for data_transfer in pending_data_unpackings:
try:
_schedule_unpack_task(data_transfer, session)
except Exception as e:
logger.error(
"Failed to schedule unpack task",
data_transfer_id=data_transfer.id,
error=str(e),
)
continue
finally:
if should_close_session:
session.close()