import logging
import os
import socket
import subprocess
import tarfile
import tempfile
import shutil
from typing import List, Dict, Tuple, Optional
from collections import defaultdict
from sqlalchemy.orm import Session
from ccat_ops_db import models
from .config.config import ccat_data_transfer_settings
from .database import DatabaseConnection
from .setup_celery_app import app, make_celery_task
from .utils import (
unique_id,
calculate_checksum,
)
from .exceptions import DatabaseOperationError, ConfigurationError
from .logging_utils import get_structured_logger
from .queue_discovery import route_task_by_location
from .health_check import HealthCheck
from .operation_types import OperationType
# Set up global logger
logger = get_structured_logger(__name__)
# Create enhanced task base class for raw data package operations
SQLAlchemyTask = make_celery_task()
[docs]
class RawDataPackageOperations(SQLAlchemyTask):
"""Base class for raw data package operations with error handling."""
operation_type = "raw_data_package"
max_retries = 3
[docs]
def get_retry_count(self, session, operation_id):
"""Get current retry count for a raw data package operation."""
try:
package = session.query(models.RawDataPackage).get(operation_id)
return package.retry_count if package else 0
except Exception:
return 0
[docs]
def reset_state_on_failure(self, session, operation_id, exc):
"""Reset raw data package state for retry."""
try:
package = session.query(models.RawDataPackage).get(operation_id)
if package:
package.status = models.Status.PENDING
package.retry_count += 1
session.commit()
logger.info(
"Raw data package reset for retry",
package_id=operation_id,
retry_count=package.retry_count,
)
except Exception as e:
logger.error(
"Failed to reset raw data package state",
package_id=operation_id,
error=str(e),
)
[docs]
def mark_permanent_failure(self, session, operation_id, exc):
"""Mark raw data package as permanently failed."""
try:
package = session.query(models.RawDataPackage).get(operation_id)
if package:
package.status = models.Status.FAILED
session.commit()
logger.error(
"Raw data package marked as permanently failed",
package_id=operation_id,
error=str(exc),
)
except Exception as e:
logger.error(
"Failed to mark raw data package as failed",
package_id=operation_id,
error=str(e),
)
@app.task(
base=RawDataPackageOperations,
name="ccat:raw_data_package:create_package",
bind=True,
)
def create_raw_data_package_task(
self,
raw_data_package_id: int,
source_location_id: int,
session: Optional[Session] = None,
) -> bool:
"""
Create a raw data package by assembling files from the source location.
Parameters
----------
self : celery.Task
The Celery task instance.
raw_data_package_id : int
The ID of the RawDataPackage to be processed.
source_location_id : int
The ID of the source DataLocation.
session : sqlalchemy.orm.Session, optional
A database session for use in testing environments.
Returns
-------
bool
True if the operation was successful.
"""
if session is None:
with self.session_scope() as session:
return _create_raw_data_package_internal(
session, raw_data_package_id, source_location_id
)
else:
return _create_raw_data_package_internal(
session, raw_data_package_id, source_location_id
)
def _create_raw_data_package_internal(
session: Session, raw_data_package_id: int, source_location_id: int
) -> bool:
"""
Internal function to create a raw data package.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
raw_data_package_id : int
The ID of the RawDataPackage to be processed.
source_location_id : int
The ID of the source DataLocation.
Returns
-------
bool
True if the operation was successful.
"""
logger.info(
f"Creating raw data package {raw_data_package_id} from source {source_location_id}"
)
try:
# Retrieve package and source location from database
raw_data_package = session.query(models.RawDataPackage).get(raw_data_package_id)
if not raw_data_package:
logger.error("Raw data package not found", package_id=raw_data_package_id)
return False
if raw_data_package.status == models.Status.COMPLETED:
logger.info(f"Raw data package {raw_data_package.name} already completed")
return True
source_location = session.query(models.DataLocation).get(source_location_id)
if not source_location:
logger.error("Source location not found", location_id=source_location_id)
return False
# Get primary buffer for this site
primary_buffer = get_primary_buffer_for_site(session, source_location.site)
if not primary_buffer:
logger.error(
f"No active buffer found for site {source_location.site.short_name}"
)
return False
# Get all raw data files for this package
raw_data_files = raw_data_package.raw_data_files
if not raw_data_files:
logger.warning(
"No raw data files found for package", package_id=raw_data_package_id
)
return False
# Create archive from raw data files
logger.info(f"Creating archive for {len(raw_data_files)} files")
# Check if we should create the package in the buffer directly
if is_same_host(primary_buffer):
logger.debug("Creating package in buffer directly")
physical_raw_data_file_temp_copies = []
# iterate over all relevant raw data files for this package
for raw_data_file in raw_data_files:
logger.debug(
f"Copying raw data file {raw_data_file.relative_path} to buffer {primary_buffer.name} {primary_buffer.host} {primary_buffer.user} {primary_buffer.path} from source {source_location.name} {source_location.host} {source_location.user} {source_location.path}"
)
# copy raw data files to buffer
# if both localhost and primary buffer are on the same host, copy the file to the buffer
source_path = os.path.join(
source_location.path, raw_data_file.relative_path
)
buffer_path = os.path.join(
primary_buffer.path, raw_data_file.relative_path
)
logger.debug(f"Source path: {source_path}")
logger.debug(f"Buffer path: {buffer_path}")
if is_same_host(source_location) and is_same_host(primary_buffer):
if source_path != buffer_path:
logger.debug(
f"Copying raw data file {raw_data_file.relative_path} to buffer {primary_buffer.name} {primary_buffer.host} {primary_buffer.user} {primary_buffer.path} from source {source_location.name} {source_location.host} {source_location.user} {source_location.path} directly"
)
# make sure the directory exists
os.makedirs(os.path.dirname(buffer_path), exist_ok=True)
shutil.copy2(source_path, buffer_path)
else:
logger.debug(
f"Raw data file {raw_data_file.relative_path} already in buffer {primary_buffer.name} {primary_buffer.host} {primary_buffer.user} {primary_buffer.path} from source {source_location.name} {source_location.host} {source_location.user} {source_location.path}"
)
else:
os.makedirs(os.path.dirname(buffer_path), exist_ok=True)
# copy the file to the buffer using scp
scp_command = [
"scp",
f"{source_location.user}@{source_location.host}:{source_path}",
f"{primary_buffer.user}@{primary_buffer.host}:{buffer_path}",
]
subprocess.run(
scp_command, check=True, capture_output=True, text=True
)
# create physical copy record at buffer location
physical_copy = models.RawDataFilePhysicalCopy(
raw_data_file=raw_data_file,
data_location=primary_buffer,
checksum=raw_data_file.checksum,
)
session.add(physical_copy)
physical_raw_data_file_temp_copies.append(physical_copy)
session.commit()
# Create package directly in buffer
archive_success = _create_package_archive(
session, raw_data_package, primary_buffer, raw_data_files
)
if not archive_success:
logger.error("Failed to create package in buffer")
return False
# Create physical copy record at buffer location
physical_copy = models.RawDataPackagePhysicalCopy(
raw_data_package=raw_data_package,
data_location=primary_buffer,
checksum=raw_data_package.checksum,
)
session.add(physical_copy)
for physical_raw_data_file_temp_copy in physical_raw_data_file_temp_copies:
# remove the temp copy we are local
os.remove(physical_raw_data_file_temp_copy.full_path)
session.delete(physical_raw_data_file_temp_copy)
elif is_same_host(source_location):
# we are on the source location, we can create the package directly
archive_success = _create_package_archive(
session, raw_data_package, source_location, raw_data_files
)
if not archive_success:
logger.error("Failed to create package in source location")
return False
# Create physical copy record at source location
source_physical_copy = models.RawDataPackagePhysicalCopy(
raw_data_package=raw_data_package,
data_location=source_location,
checksum=raw_data_package.checksum,
)
session.add(source_physical_copy)
session.commit()
# Now we have to transfer it to the buffer
# check if buffer is localhost
# Ensure buffer directory exists
source_path = os.path.join(
source_location.path, raw_data_package.relative_path
)
buffer_path = os.path.join(
primary_buffer.path, raw_data_package.relative_path
)
if is_same_host(primary_buffer):
os.makedirs(os.path.dirname(buffer_path), exist_ok=True)
else:
success, output = execute_remote_command(
primary_buffer.host,
primary_buffer.user,
f"mkdir -p {os.path.dirname(buffer_path)}",
)
if not success:
logger.error(f"Failed to create buffer directory: {output}")
return False
if is_same_host(primary_buffer):
# make sure the directory exists
os.makedirs(os.path.dirname(buffer_path), exist_ok=True)
shutil.copy2(source_path, buffer_path)
else:
# copy the package to the buffer using scp
# make sure the remote directory exists
os.makedirs(os.path.dirname(buffer_path), exist_ok=True)
scp_command = [
"scp",
"-r",
f"{source_location.user}@{source_location.host}:{source_path}",
f"{primary_buffer.user}@{primary_buffer.host}:{buffer_path}",
]
subprocess.run(scp_command, check=True, capture_output=True, text=True)
# create physical copy record at buffer location
buffer_physical_copy = models.RawDataPackagePhysicalCopy(
raw_data_package=raw_data_package,
data_location=primary_buffer,
checksum=raw_data_package.checksum,
)
session.add(buffer_physical_copy)
# remove the package from the source location
os.remove(source_path)
session.delete(source_physical_copy)
else:
logger.error(
"Cannot create package in buffer, not on same host as source or "
"buffer, remote to remote transfer is not supported, this celery "
"queue is receiving a task it is not intended to handle"
)
return False
# Update package status
raw_data_package.status = models.Status.COMPLETED
raw_data_package.state = models.PackageState.TRANSFERRING
session.commit()
logger.info(
f"Created and transferred raw data package {raw_data_package.name} with {len(raw_data_files)} files"
)
return True
except Exception as e:
logger.error(f"Error creating raw data package: {str(e)}")
session.rollback()
return False
[docs]
def is_same_host(data_location: models.DiskDataLocation) -> bool:
"""
Check if the current worker is on the same host as the DataLocation.
Parameters
----------
data_location : models.DiskDataLocation
The DataLocation to check against.
Returns
-------
bool
True if the worker is on the same host as the DataLocation.
"""
if data_location.host in ["localhost", "127.0.0.1"]:
return True
# Get the hostname of the current container
container_hostname = socket.gethostname()
logger.debug(f"Container hostname: {container_hostname}")
logger.debug(f"Data location host: {data_location.host}")
# If we're in development mode, treat all localhost as same host
if ccat_data_transfer_settings.DEVELOPMENT_MODE_LOCALHOST_ONLY:
logger.debug("Development mode, treating all localhost as same host")
return True
return container_hostname == data_location.host
[docs]
def execute_remote_command(host: str, user: str, command: str) -> Tuple[bool, str]:
"""
Execute a command on a remote host via SSH.
Parameters
----------
host : str
The remote host to execute the command on.
user : str
The user to execute the command as.
command : str
The command to execute.
Returns
-------
Tuple[bool, str]
A tuple containing (success, output/error message)
"""
try:
ssh_command = ["ssh", f"{user}@{host}", command]
result = subprocess.run(ssh_command, capture_output=True, text=True, check=True)
return True, result.stdout
except subprocess.CalledProcessError as e:
return False, e.stderr
def _create_package_archive(
session: Session,
raw_data_package: models.RawDataPackage,
source_location: models.DataLocation,
raw_data_files: List[models.RawDataFile],
) -> bool:
"""
Create a tar archive from raw data files.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
raw_data_package : models.RawDataPackage
The raw data package to create.
source_location : models.DataLocation
The source data location.
raw_data_files : List[models.RawDataFile]
List of raw data files to include.
Returns
-------
bool
True if successful, False otherwise.
"""
try:
# Ensure source_location is a DiskDataLocation
if not isinstance(source_location, models.DiskDataLocation):
raise ValueError("Source location must be a DiskDataLocation")
# Check if we're on the same host as the source location
if is_same_host(source_location):
# We can work directly with the files
return _create_package_archive_local(
session, raw_data_package, source_location, raw_data_files
)
else:
# We need to work remotely
return _create_package_archive_remote(
session, raw_data_package, source_location, raw_data_files
)
except Exception as e:
logger.error(f"Error creating package archive: {str(e)}")
return False
def _create_package_archive_local(
session: Session,
raw_data_package: models.RawDataPackage,
source_location: models.DiskDataLocation,
raw_data_files: List[models.RawDataFile],
) -> bool:
"""Create package archive when worker is on same host as source location."""
try:
# Ensure the package directory exists
package_dir = os.path.dirname(
os.path.join(source_location.path, raw_data_package.relative_path)
)
os.makedirs(package_dir, exist_ok=True)
archive_path = os.path.join(
source_location.path, raw_data_package.relative_path
)
total_size = 0
with tarfile.open(archive_path, "w:gz") as tar:
for raw_file in raw_data_files:
file_path = os.path.join(source_location.path, raw_file.relative_path)
if os.path.exists(file_path):
# Use the relative_path as the arcname to preserve the hierarchical structure
# This ensures the tar contains the same directory structure as the source
tar.add(file_path, arcname=raw_file.relative_path)
total_size += raw_file.size
logger.debug(f"Added file {raw_file.relative_path} to archive")
else:
logger.warning(f"Raw data file not found: {file_path}")
# Calculate checksum and update package
checksum = calculate_checksum(archive_path)
if checksum:
raw_data_package.checksum = checksum
raw_data_package.size = total_size
return True
else:
logger.error("Failed to calculate checksum for archive")
return False
except Exception as e:
logger.error(f"Error creating local package archive: {str(e)}")
return False
def _create_package_archive_remote(
session: Session,
raw_data_package: models.RawDataPackage,
source_location: models.DiskDataLocation,
raw_data_files: List[models.RawDataFile],
) -> bool:
"""Create package archive when worker is on different host than source location."""
try:
# Create a temporary directory for the archive
with tempfile.TemporaryDirectory() as _:
# Create the archive command
archive_path = os.path.join(
source_location.path, raw_data_package.relative_path
)
# Use tar with -C to change to source directory and preserve relative paths
# This ensures the tar contains the same directory structure as the source
tar_command = f"cd {source_location.path} && tar czf {archive_path} "
tar_command += " ".join(f'"{f.relative_path}"' for f in raw_data_files)
# Execute the command remotely
success, output = execute_remote_command(
source_location.host, source_location.user, tar_command
)
if not success:
logger.error(f"Failed to create remote archive: {output}")
return False
# Calculate checksum remotely
checksum_command = f"sha256sum {archive_path} | cut -d' ' -f1"
success, checksum = execute_remote_command(
source_location.host, source_location.user, checksum_command
)
if not success:
logger.error(f"Failed to calculate remote checksum: {checksum}")
return False
# Calculate total size remotely
size_command = f"du -b {archive_path} | cut -f1"
success, size_str = execute_remote_command(
source_location.host, source_location.user, size_command
)
if not success:
logger.error(f"Failed to get remote file size: {size_str}")
return False
# Update package with remote information
raw_data_package.checksum = checksum.strip()
raw_data_package.size = int(size_str.strip())
return True
except Exception as e:
logger.error(f"Error creating remote package archive: {str(e)}")
return False
[docs]
def get_unpackaged_raw_data_files(
session: Session, source_location: models.DataLocation
) -> List[models.RawDataFile]:
"""
Retrieve all raw data files from a source location that are not yet assigned to a package.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
source_location : models.DataLocation
The source data location to scan.
Returns
-------
List[models.RawDataFile]
List of unpackaged raw data files.
"""
logger.info(f"Scanning source location: {source_location.name}")
# Get all raw data files from this source location that are not yet packaged
unpackaged_files = (
session.query(models.RawDataFile)
.filter(
models.RawDataFile.source_location_id == source_location.id,
models.RawDataFile.raw_data_package_id.is_(None),
)
.all()
)
logger.info(
f"Found {len(unpackaged_files)} unpackaged files in {source_location.name}"
)
return unpackaged_files
[docs]
def group_files_by_execution_and_module(
raw_data_files: List[models.RawDataFile],
) -> Dict[Tuple[int, int], List[models.RawDataFile]]:
"""
Group raw data files by ExecutedObsUnit and InstrumentModule.
Parameters
----------
raw_data_files : List[models.RawDataFile]
List of raw data files to group.
Returns
-------
Dict[Tuple[int, int], List[models.RawDataFile]]
Dictionary mapping (executed_obs_unit_id, instrument_module_id) to list of files.
"""
grouped_files = defaultdict(list)
for raw_file in raw_data_files:
key = (raw_file.executed_obs_unit_id, raw_file.instrument_module_id)
grouped_files[key].append(raw_file)
logger.info(
f"Grouped {len(raw_data_files)} files into {len(grouped_files)} packages"
)
return dict(grouped_files)
[docs]
def get_primary_buffer_for_site(
session: Session, site: models.Site
) -> Optional[models.DataLocation]:
"""
Get the primary (highest priority) active buffer for a site.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
site : models.Site
The site to get the buffer for.
Returns
-------
Optional[models.DataLocation]
The primary buffer location, or None if no active buffer exists.
"""
buffer_location = (
session.query(models.DataLocation)
.filter(
models.DataLocation.site_id == site.id,
models.DataLocation.location_type == models.LocationType.BUFFER,
models.DataLocation.active == True, # noqa: E712
)
.order_by(models.DataLocation.priority.asc())
.first()
)
if buffer_location:
logger.debug(
f"Primary buffer for site {site.short_name}: {buffer_location.name}"
)
else:
logger.warning(f"No active buffer found for site {site.short_name}")
return buffer_location
[docs]
def create_raw_data_packages_for_location(
session: Session, source_location: models.DataLocation
) -> None:
"""
Create raw data packages for unpackaged files in a source location.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
source_location : models.DataLocation
The source location to process.
"""
logger.info(f"Processing source location: {source_location.name}")
# Get unpackaged files
unpackaged_files = get_unpackaged_raw_data_files(session, source_location)
if not unpackaged_files:
logger.debug(f"No unpackaged files found in {source_location.name}")
return
# Group files by execution and module
grouped_files = group_files_by_execution_and_module(unpackaged_files)
# Get primary buffer for this site
primary_buffer = get_primary_buffer_for_site(session, source_location.site)
if not primary_buffer:
raise ConfigurationError(
f"No active buffer found for site {source_location.site.short_name}"
)
# Create packages for each group
for (executed_obs_unit_id, instrument_module_id), files in grouped_files.items():
_create_raw_data_package_entry(
session,
files,
executed_obs_unit_id,
instrument_module_id,
source_location,
primary_buffer,
)
session.commit()
logger.info(
f"Created {len(grouped_files)} raw data packages for {source_location.name}"
)
def _create_raw_data_package_entry(
session: Session,
files: List[models.RawDataFile],
executed_obs_unit_id: int,
instrument_module_id: int,
source_location: models.DataLocation,
target_buffer: models.DataLocation,
) -> models.RawDataPackage:
"""
Create a RawDataPackage database entry and schedule the assembly task.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
files : List[models.RawDataFile]
List of files to include in the package.
executed_obs_unit_id : int
The executed observation unit ID.
instrument_module_id : int
The instrument module ID.
source_location : models.DataLocation
The source data location.
target_buffer : models.DataLocation
The target buffer location.
Returns
-------
models.RawDataPackage
The created raw data package.
"""
logger.info(f"Creating raw data package for {len(files)} files")
# Get executed obs unit and instrument module for naming
executed_obs_unit = session.query(models.ExecutedObsUnit).get(executed_obs_unit_id)
instrument_module = session.query(models.InstrumentModule).get(instrument_module_id)
if not executed_obs_unit or not instrument_module:
raise ValueError("Invalid executed_obs_unit_id or instrument_module_id")
# Generate unique package name and path
package_id = unique_id()
package_name = (
f"{executed_obs_unit.obs_unit.name}_{instrument_module.name}_{package_id}"
)
relative_path = f"raw_data_packages/{package_name}.tar.gz"
# Create RawDataPackage entry
raw_data_package = models.RawDataPackage(
name=package_name,
relative_path=relative_path,
executed_obs_unit_id=executed_obs_unit_id,
instrument_module_id=instrument_module_id,
obs_unit_id=executed_obs_unit.obs_unit_id,
status=models.Status.PENDING,
state=models.PackageState.WAITING,
size=sum(f.size for f in files),
checksum="", # Will be calculated during assembly
)
# Associate files with the package
for file in files:
file.raw_data_package = raw_data_package
session.add(raw_data_package)
session.flush() # Get the ID
# Schedule Celery task for package assembly
queue_name = route_task_by_location(
OperationType.RAW_DATA_PACKAGE_CREATION, source_location
)
create_raw_data_package_task.apply_async(
args=[raw_data_package.id, source_location.id], queue=queue_name
)
logger.info(
"Scheduled raw data package creation",
package_id=raw_data_package.id,
package_name=package_name,
file_count=len(files),
queue=queue_name,
)
return raw_data_package
[docs]
def get_sites_with_source_locations(session: Session) -> List[models.Site]:
"""
Get all sites that have active SOURCE data locations.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
List[models.Site]
List of sites with source locations.
"""
# PRint debug details for all sites before filtering
sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.SOURCE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
logger.debug(f"Found {len(sites)} sites with active source locations")
return sites
[docs]
def create_raw_data_packages(verbose: bool = False, session: Session = None) -> None:
"""
Scan all source locations and create raw data packages for unpackaged files.
This function manages the process of creating raw data packages by:
1. Finding all sites with active SOURCE data locations.
2. For each source location, finding unpackaged raw data files.
3. Grouping files by ExecutedObsUnit and InstrumentModule.
4. Creating RawDataPackage entries in the database.
5. Scheduling Celery tasks to handle the package assembly.
Args:
verbose (bool, optional): If True, sets logging level to DEBUG. Defaults to False.
session (Session, optional): Database session for testing. If None, creates new session.
Raises:
ConfigurationError: If no active buffer is found for a site.
DatabaseOperationError: If there's an error during database operations.
"""
if session is None:
db = DatabaseConnection()
session, _ = db.get_connection()
if verbose:
logger.setLevel(logging.DEBUG)
try:
# Get all sites with source locations
source_sites = get_sites_with_source_locations(session)
if not source_sites:
logger.info("No sites with source locations found")
return
logger.info(f"Processing {len(source_sites)} sites with source locations")
for site in source_sites:
logger.info(f"Processing site: {site.name}")
# Get all active source locations for this site
source_locations = (
session.query(models.DataLocation)
.filter(
models.DataLocation.site_id == site.id,
models.DataLocation.location_type == models.LocationType.SOURCE,
models.DataLocation.active == True, # noqa: E712
)
.all()
)
for source_location in source_locations:
try:
create_raw_data_packages_for_location(session, source_location)
except Exception as e:
logger.error(
f"Error processing source location {source_location.name}: {str(e)}"
)
session.rollback()
raise DatabaseOperationError(
f"Failed to create raw data packages for {source_location.name}: {str(e)}"
) from e
logger.info("Completed raw data package creation for all sites")
except Exception as e:
logger.exception("An error occurred while creating raw data packages")
raise RuntimeError("Failed to create raw data packages") from e
[docs]
def raw_data_package_manager_service(verbose: bool = False) -> None:
"""
Main service function for the raw data package manager.
This service continuously scans source locations for new raw data files
and creates packages for transfer to buffer locations.
Args:
verbose (bool): If True, sets logging level to DEBUG. Default is False.
"""
if verbose or ccat_data_transfer_settings.VERBOSE:
logger.setLevel(logging.DEBUG)
logger.debug("verbose_mode_enabled")
db = DatabaseConnection()
session, _ = db.get_connection()
# Initialize health check
health_check = HealthCheck(
service_type="raw_data_package",
service_name="raw_data_package_manager",
)
health_check.start()
try:
while True: # Main service loop
try:
logger.debug("Starting raw data package creation cycle")
create_raw_data_packages(verbose=verbose, session=session)
# Sleep for the configured interval
import time
time.sleep(
ccat_data_transfer_settings.RAW_DATA_PACKAGE_MANAGER_SLEEP_TIME
)
except Exception as e:
logger.error("service_loop_error", error=str(e))
import time
time.sleep(10) # Wait before retry
finally:
health_check.stop()
session.close()