from typing import List, Optional
import datetime
import logging
import subprocess
from sqlalchemy.orm import Session
from sqlalchemy import and_
import os
import shutil
import tempfile
from .database import DatabaseConnection
from .setup_celery_app import app, make_celery_task
from .decorators import track_metrics
from .utils import get_s3_client, get_s3_key_for_package
from ccat_ops_db import models
from .logging_utils import get_structured_logger
from .queue_discovery import route_task_by_location
from .operation_types import OperationType
logger = get_structured_logger(__name__)
[docs]
class StagingTask(make_celery_task()):
"""Base class for staging tasks with common functionality."""
[docs]
def __init__(self):
super().__init__()
self.max_retries = 3
[docs]
def get_retry_count(self, session, staging_job_id):
"""Get the current retry count for a staging job."""
staging_job = session.query(models.StagingJob).get(staging_job_id)
return staging_job.retry_count if staging_job else 0
[docs]
def reset_state_on_failure(self, session, staging_job_id, exc):
"""Reset the state of a staging job on failure."""
staging_job = session.query(models.StagingJob).get(staging_job_id)
if staging_job:
staging_job.status = models.Status.PENDING
staging_job.failure_error_message = str(exc)
if hasattr(staging_job, "retry_count"):
staging_job.retry_count += 1
session.commit()
[docs]
def mark_permanent_failure(self, session, staging_job_id, exc):
"""Mark a staging job as permanently failed."""
staging_job = session.query(models.StagingJob).get(staging_job_id)
if staging_job:
staging_job.status = models.Status.FAILED
staging_job.failure_error_message = str(exc)
if hasattr(staging_job, "retry_count"):
staging_job.retry_count += 1
session.commit()
[docs]
def get_operation_info(self, args, kwargs):
"""Get information about the operation for logging."""
staging_job_id = args[0] if args else kwargs.get("staging_job_id")
return {
"staging_job_id": staging_job_id,
"operation_type": "staging",
}
@app.task(
base=StagingTask,
name="ccat:data_transfer:staging",
bind=True,
)
def stage_data_task(
self,
staging_job_id: int,
session: Optional[Session] = None,
) -> None:
"""Celery task to stage data using dynamic queue routing."""
if session is None:
with self.session_scope() as session:
return _stage_data_internal(session, staging_job_id)
return _stage_data_internal(session, staging_job_id)
@track_metrics(
operation_type="staging",
additional_tags={
"transfer_method": "s3",
},
)
def _stage_data_internal(session: Session, staging_job_id: int) -> None:
"""Internal function to handle the staging process for all packages in a job."""
staging_job = _get_staging_job(session, staging_job_id)
if not staging_job:
logger.error(f"Staging job {staging_job_id} not found")
return
try:
start_time = datetime.datetime.now()
_log_staging_start(staging_job)
# Add comprehensive debug logging for staging job configuration
logger.debug("Staging job configuration:")
logger.debug(f" - Job ID: {staging_job.id}")
logger.debug(f" - Status: {staging_job.status}")
logger.debug(f" - Origin location ID: {staging_job.origin_data_location_id}")
logger.debug(
f" - Origin location: {staging_job.origin_data_location.name} (type: {type(staging_job.origin_data_location).__name__})"
)
logger.debug(
f" - Destination location ID: {staging_job.destination_data_location_id}"
)
logger.debug(
f" - Destination location: {staging_job.destination_data_location.name} (type: {type(staging_job.destination_data_location).__name__})"
)
logger.debug(f" - Number of packages: {len(staging_job.raw_data_packages)}")
# Log details about each package
for i, package in enumerate(staging_job.raw_data_packages):
logger.debug(
f" - Package {i+1}: ID={package.id}, relative_path='{package.relative_path}'"
)
logger.debug(f" Number of raw data files: {len(package.raw_data_files)}")
for j, file in enumerate(package.raw_data_files):
logger.debug(
f" - File {j+1}: ID={file.id}, relative_path='{file.relative_path}', size={file.size}"
)
# Track success/failure for each package
package_results = {}
# Process each package in the job
for raw_data_package in staging_job.raw_data_packages:
try:
# Skip if already staged
if _check_existing_copies(session, staging_job, raw_data_package.id):
logger.info(
f"Skipping package {raw_data_package.id} - already exists at destination"
)
package_results[raw_data_package.id] = True
continue
# Get the physical copy from the origin location
logger.debug(
f"Looking for physical copy in origin location {staging_job.origin_data_location_id} for package {raw_data_package.id}"
)
logger.debug(
f"Origin location details: {staging_job.origin_data_location.name} (type: {type(staging_job.origin_data_location).__name__})"
)
physical_copy = _get_physical_copy(
session,
staging_job.origin_data_location_id,
raw_data_package.id,
)
if not physical_copy:
raise ValueError(
f"No physical copy found in origin location {staging_job.origin_data_location_id} "
f"for package {raw_data_package.id}"
)
logger.debug(
f"Found physical copy: ID={physical_copy.id}, Status={physical_copy.status}"
)
if hasattr(physical_copy, "full_path"):
logger.debug(f"Physical copy full path: {physical_copy.full_path}")
# Construct the final destination path for unpacked files
final_destination_path = _construct_destination_path(
staging_job.destination_data_location, raw_data_package
)
logger.debug(
f"Constructed final destination path: {final_destination_path}"
)
# For staging, we need to download to a temporary location first
# then unpack to the final destination
# Extract just the filename from the relative path to avoid double raw_data_packages
package_filename = os.path.basename(raw_data_package.relative_path)
temp_package_path = os.path.join(
staging_job.destination_data_location.path,
"raw_data_packages",
package_filename,
)
logger.debug(f"Temporary package path: {temp_package_path}")
# Execute the appropriate copy operation to temporary location
result = _execute_polymorphic_copy(physical_copy, temp_package_path)
if not result:
raise ValueError(f"Failed to stage package {raw_data_package.id}")
# Unpack if needed
if temp_package_path.endswith(".tar.gz"):
_unpack_file(temp_package_path, final_destination_path)
# Verify files - use the final destination path where files were unpacked
_check_raw_data_files(raw_data_package, final_destination_path)
# Create physical copy record for RawDataFiles (these stay in processing)
if result:
_create_raw_data_file_physical_copies(
session,
staging_job,
raw_data_package,
final_destination_path,
True,
)
# Mark RawDataPackage as STAGED and delete the physical package file
if result:
_mark_package_as_staged_and_cleanup(
session, staging_job, raw_data_package, final_destination_path
)
package_results[raw_data_package.id] = True
logger.info(f"Successfully staged package {raw_data_package.id}")
except Exception as e:
logger.error(f"Error staging package {raw_data_package.id}: {str(e)}")
package_results[raw_data_package.id] = False
# Continue with next package instead of failing the entire job
continue
end_time = datetime.datetime.now()
# Update overall job status
all_success = all(package_results.values())
if all_success:
staging_job.status = models.Status.COMPLETED
else:
staging_job.status = models.Status.FAILED
failed_packages = [
pkg_id for pkg_id, success in package_results.items() if not success
]
staging_job.failure_error_message = (
f"Failed to stage packages: {failed_packages}"
)
staging_job.start_time = start_time
staging_job.end_time = end_time
session.commit()
except Exception as e:
logger.error(f"Error in staging job {staging_job_id}: {str(e)}")
staging_job.status = models.Status.FAILED
staging_job.failure_error_message = str(e)
session.commit()
raise
def _get_physical_copy(
session: Session, data_location_id: int, raw_data_package_id: int
) -> Optional[models.PhysicalCopy]:
"""Get the physical copy for a raw data package in a specific data location."""
logger.debug(
f"Looking for physical copy: data_location_id={data_location_id}, raw_data_package_id={raw_data_package_id}"
)
# First, let's see what physical copies exist for this package
all_copies = (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package_id
)
.all()
)
logger.debug(
f"Found {len(all_copies)} total physical copies for package {raw_data_package_id}:"
)
for copy in all_copies:
logger.debug(
f" - Copy ID: {copy.id}, Location ID: {copy.data_location_id}, Status: {copy.status}"
)
if hasattr(copy, "full_path"):
logger.debug(f" Full path: {copy.full_path}")
# Now look for the specific one we need
physical_copy = (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
and_(
models.RawDataPackagePhysicalCopy.data_location_id == data_location_id,
models.RawDataPackagePhysicalCopy.raw_data_package_id
== raw_data_package_id,
models.RawDataPackagePhysicalCopy.status
== models.PhysicalCopyStatus.PRESENT,
)
)
.first()
)
if physical_copy:
logger.debug(
f"Found matching physical copy: ID={physical_copy.id}, Status={physical_copy.status}"
)
if hasattr(physical_copy, "full_path"):
logger.debug(f"Full path: {physical_copy.full_path}")
else:
logger.debug(
f"No matching physical copy found for location {data_location_id} and package {raw_data_package_id}"
)
return physical_copy
def _construct_destination_path(
destination_location: models.DataLocation, raw_data_package: models.RawDataPackage
) -> str:
"""Construct the destination path for staged data."""
logger.debug("Constructing destination path:")
logger.debug(
f" - Destination location type: {type(destination_location).__name__}"
)
logger.debug(f" - Destination location name: {destination_location.name}")
logger.debug(
f" - Raw data package relative path: {raw_data_package.relative_path}"
)
if isinstance(destination_location, models.DiskDataLocation):
# For staging, we want to unpack directly into the destination location root
# This will recreate the hierarchical structure from the source location
# The tar file contains the full relative paths (e.g., CHAI/LFA/filename.fits)
# so unpacking into the root will create the correct hierarchy
path = destination_location.path
logger.debug(f" - Destination location path: {destination_location.path}")
logger.debug(f" - Final constructed path: {path}")
return path
else:
logger.debug(
f" - Unsupported destination location type: {type(destination_location)}"
)
raise ValueError(
f"Unsupported destination location type: {type(destination_location)}"
)
def _execute_polymorphic_copy(
physical_copy: models.RawDataPackagePhysicalCopy,
destination_path: str,
) -> bool:
"""Execute the copy operation based on the storage types of source and destination."""
try:
# Get the source location from the physical copy
source_location = physical_copy.data_location
# For now, we'll assume the destination is a disk location for staging
# In the future, this could be extended to support staging to other storage types
if isinstance(source_location, models.S3DataLocation):
return _execute_s3_download(physical_copy, destination_path)
# elif isinstance(source_location, models.DiskDataLocation):
# # Check if this is a local or remote copy
# if source_location.host and source_location.host != "localhost":
# return _execute_remote_copy(physical_copy, destination_path)
# else:
# return _execute_local_copy(physical_copy, destination_path)
# elif isinstance(source_location, models.TapeDataLocation):
# return _execute_tape_to_disk_copy(physical_copy, destination_path)
else:
raise ValueError(
f"Unsupported source location type: {type(source_location)}"
)
except Exception as e:
logger.error(f"Error executing polymorphic copy: {str(e)}")
raise
def _execute_tape_to_disk_copy(
physical_copy: models.RawDataPackagePhysicalCopy,
destination_path: str,
) -> bool:
"""Execute copy from tape to disk."""
try:
# This is a placeholder for tape operations
# In a real implementation, this would use tape library commands
logger.warning("Tape to disk copy not yet implemented")
raise NotImplementedError("Tape to disk copy not yet implemented")
except Exception as e:
logger.error(f"Tape to disk copy failed: {str(e)}")
raise
def _execute_remote_copy(
physical_copy: models.RawDataPackagePhysicalCopy,
destination_path: str,
) -> bool:
"""Execute the remote copy using the physical copy's full path."""
try:
source_location = physical_copy.data_location
if not isinstance(source_location, models.DiskDataLocation):
raise ValueError(f"Expected DiskDataLocation, got {type(source_location)}")
remote_host = source_location.host
remote_user = source_location.user or "ccat"
remote_path = f"{remote_user}@{remote_host}:{physical_copy.full_path}"
# Create destination directory if it doesn't exist
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
subprocess.run(["scp", remote_path, destination_path], check=True)
return True
except Exception as e:
logger.error(f"Remote copy failed: {str(e)}")
raise
def _execute_local_copy(
physical_copy: models.RawDataPackagePhysicalCopy,
destination_path: str,
) -> bool:
"""Execute the local copy using the physical copy's full path."""
try:
# Create destination directory if it doesn't exist
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
shutil.copy(physical_copy.full_path, destination_path)
return True
except Exception as e:
logger.error(f"Local copy failed: {str(e)}")
raise
def _execute_s3_download(
physical_copy: models.RawDataPackagePhysicalCopy,
destination_path: str,
) -> bool:
"""Execute the S3 download using the physical copy's location information.
Routes to either boto3 or Coscine download based on configuration.
"""
from .config.config import ccat_data_transfer_settings
if ccat_data_transfer_settings.s3_method == "boto3":
return _execute_boto3_s3_download(physical_copy, destination_path)
elif ccat_data_transfer_settings.s3_method == "coscine":
return _execute_coscine_s3_download(physical_copy, destination_path)
else:
raise ValueError(
f"Unsupported s3_method: {ccat_data_transfer_settings.s3_method}"
)
def _execute_boto3_s3_download(
physical_copy: models.RawDataPackagePhysicalCopy,
destination_path: str,
) -> bool:
"""Execute the S3 download using boto3."""
try:
source_location = physical_copy.data_location
if not isinstance(source_location, models.S3DataLocation):
raise ValueError(f"Expected S3DataLocation, got {type(source_location)}")
s3_client = get_s3_client()
# Get bucket name and object key from the S3 location
bucket_name = source_location.bucket_name
# Use the shared function to construct the S3 key consistently
object_key = get_s3_key_for_package(
source_location, physical_copy.raw_data_package
)
logger.debug(
"s3_download_details",
bucket_name=bucket_name,
object_key=object_key,
destination_path=destination_path,
)
# Create the destination directory if it doesn't exist
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
# Download the file
s3_client.download_file(bucket_name, object_key, destination_path)
return True
except Exception as e:
logger.error(f"S3 download failed: {str(e)}")
raise
def _execute_coscine_s3_download(
physical_copy: models.RawDataPackagePhysicalCopy,
destination_path: str,
) -> bool:
"""Execute the S3 download using Coscine API.
Parameters
----------
physical_copy : models.RawDataPackagePhysicalCopy
The physical copy record containing source location information
destination_path : str
Local path where the file should be downloaded
Returns
-------
bool
True if download was successful
Raises
------
ValueError
If Coscine configuration is not set or download fails
"""
import coscine
from .config.config import ccat_data_transfer_settings
try:
# Check if COSCINE is configured
if (
ccat_data_transfer_settings.coscine_api_token == "none"
or ccat_data_transfer_settings.coscine_project == "none"
or ccat_data_transfer_settings.coscine_resource == "none"
):
raise ValueError(
"COSCINE configuration is not set. Please configure COSCINE_API_TOKEN, "
"COSCINE_PROJECT, and COSCINE_RESOURCE in settings.toml the values of the variables are: "
f"COSCINE_API_TOKEN: {ccat_data_transfer_settings.coscine_api_token}, "
f"COSCINE_PROJECT: {ccat_data_transfer_settings.coscine_project}, "
f"COSCINE_RESOURCE: {ccat_data_transfer_settings.coscine_resource}"
)
source_location = physical_copy.data_location
if not isinstance(source_location, models.S3DataLocation):
raise ValueError(f"Expected S3DataLocation, got {type(source_location)}")
# Construct the object key (same as upload)
object_key = get_s3_key_for_package(
source_location, physical_copy.raw_data_package
)
# Replace colons in the key (same as upload does)
object_key = object_key.replace(":", "_")
logger.debug(
"coscine_download_details",
project=ccat_data_transfer_settings.coscine_project,
resource=ccat_data_transfer_settings.coscine_resource,
object_key=object_key,
destination_path=destination_path,
)
# Initialize Coscine client
client = coscine.ApiClient(ccat_data_transfer_settings.coscine_api_token)
project = client.project(ccat_data_transfer_settings.coscine_project)
resource = project.resource(ccat_data_transfer_settings.coscine_resource)
# Create the destination directory if it doesn't exist
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
# Download the specific file from the resource
# Note: Depending on the Coscine API version, this might need adjustment
# If there's a file-specific download method, use that instead
try:
# Try to download a specific file if the API supports it
file_obj = resource.file(object_key)
file_obj.download(destination_path)
except AttributeError:
# Fallback: Download entire resource to temp location and extract the file
with tempfile.TemporaryDirectory() as temp_dir:
resource.download(path=temp_dir)
# Find and copy the specific file to destination
source_file = os.path.join(temp_dir, object_key)
if not os.path.exists(source_file):
raise ValueError(
f"File {object_key} not found in downloaded resource"
)
shutil.copy(source_file, destination_path)
logger.info(
"coscine_download_successful",
object_key=object_key,
destination=destination_path,
)
return True
except Exception as e:
logger.error(
"coscine_download_failed",
error=str(e),
destination_path=destination_path,
)
raise ValueError(f"COSCINE S3 download failed: {str(e)}")
def _check_raw_data_files(
raw_data_package: models.RawDataPackage, destination_path: str
) -> None:
"""Check the completeness of the raw data files.
This function checks if all files from the raw data package exist in the destination
directory by comparing their full relative paths. The files should be unpacked with
their complete hierarchical structure (e.g., CHAI/LFA/filename.fits).
Parameters
----------
raw_data_package : models.RawDataPackage
The raw data package containing the list of expected files
destination_path : str
The root directory where files should be found
Raises
------
ValueError
If any expected files are missing or if there's an error scanning the directory
"""
try:
# Get the expected file paths from the raw data package
expected_files = {
file.relative_path for file in raw_data_package.raw_data_files
}
if not expected_files:
raise ValueError("No files listed in raw data package")
# For staging, the destination_path is the root directory where files are unpacked
# Files should be found at their full relative paths (e.g., CHAI/LFA/filename.fits)
search_path = destination_path
# Check for missing files by testing each expected path
missing_files = []
for expected_path in expected_files:
full_path = os.path.join(search_path, expected_path)
if not os.path.exists(full_path):
missing_files.append(expected_path)
else:
logger.debug(f"Found file: {expected_path}")
if missing_files:
raise ValueError(
f"Missing files in destination directory: {sorted(missing_files)}"
)
# Log success with some statistics
logger.info(
f"Successfully verified {len(expected_files)} files in {search_path}. "
f"Found all expected files with their hierarchical structure."
)
except Exception as e:
if isinstance(e, ValueError):
raise
raise ValueError(f"Error checking raw data files: {str(e)}")
def _unpack_file(file_path: str, destination_dir: str) -> None:
"""Unpack the file directly into the destination directory, preserving the hierarchical structure.
Parameters
----------
file_path : str
Path to the tar.gz file to unpack
destination_dir : str
Directory to unpack the files into
Raises
------
ValueError
If unpacking fails or if the file is not a tar.gz
"""
if not file_path.endswith(".tar.gz"):
raise ValueError(f"Expected .tar.gz file, got: {file_path}")
try:
# Create the destination directory if it doesn't exist
os.makedirs(destination_dir, exist_ok=True)
logger.debug(f"Unpacking {file_path} directly into {destination_dir}")
# First, let's see what's in the tar.gz file to understand the structure
list_result = subprocess.run(
["tar", "-tzf", file_path],
capture_output=True,
text=True,
check=True,
)
if list_result.returncode != 0:
raise ValueError(
f"Failed to list contents of {file_path}: {list_result.stderr}"
)
tar_contents = list_result.stdout.strip().split("\n")
logger.debug(f"Tar contents: {tar_contents}")
if not tar_contents or not tar_contents[0]:
raise ValueError(f"Tar file {file_path} appears to be empty")
# Extract the tar file directly into the destination directory
# The tar file contains the full relative paths (e.g., CHAI/LFA/filename.fits)
# so unpacking will recreate the correct hierarchical structure
result = subprocess.run(
[
"tar",
"-xzf",
file_path,
"-C",
destination_dir,
"--overwrite",
],
capture_output=True,
text=True,
check=True,
)
if result.returncode != 0:
raise ValueError(f"Failed to unpack {file_path}: {result.stderr}")
logger.info(f"Successfully unpacked {file_path} into {destination_dir}")
except subprocess.CalledProcessError as e:
raise ValueError(f"Failed to unpack {file_path}: {e.stderr}")
except Exception as e:
raise ValueError(f"Error unpacking {file_path}: {str(e)}")
def _get_staging_job(
session: Session, staging_job_id: int
) -> Optional[models.StagingJob]:
"""Get a staging job by ID."""
return session.query(models.StagingJob).get(staging_job_id)
def _check_package_staged(
session: Session, staging_job: models.StagingJob, raw_data_package_id: int
) -> bool:
"""Check if a specific package has been staged."""
return (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
and_(
models.RawDataPackagePhysicalCopy.raw_data_package_id
== raw_data_package_id,
models.RawDataPackagePhysicalCopy.data_location_id
== staging_job.destination_data_location_id,
models.RawDataPackagePhysicalCopy.status
== models.PhysicalCopyStatus.PRESENT,
)
)
.first()
is not None
)
def _create_physical_copy(
session: Session,
staging_job: models.StagingJob,
raw_data_package: models.RawDataPackage,
path: str,
success: bool,
) -> None:
"""Create a physical copy record for the staged data."""
if not success:
return
physical_copy = models.RawDataPackagePhysicalCopy(
raw_data_package_id=raw_data_package.id,
data_location_id=staging_job.destination_data_location_id,
status=(
models.PhysicalCopyStatus.PRESENT
if success
else models.PhysicalCopyStatus.DELETION_FAILED
),
created_at=datetime.datetime.now(datetime.timezone.utc),
)
session.add(physical_copy)
session.commit()
def _create_raw_data_file_physical_copies(
session: Session,
staging_job: models.StagingJob,
raw_data_package: models.RawDataPackage,
destination_path: str,
success: bool,
) -> None:
"""Create physical copy records for all RawDataFiles in the package.
After unpacking a RawDataPackage, we create physical copy records for each
RawDataFile so they can be tracked and deleted when no longer needed.
"""
if not success:
return
# Get the destination directory (where files were unpacked)
# For staging, the destination_path is already the directory where files are unpacked
_ = destination_path
# Create physical copies for each RawDataFile
for raw_data_file in raw_data_package.raw_data_files:
# Check if physical copy already exists
existing_copy = (
session.query(models.RawDataFilePhysicalCopy)
.filter(
and_(
models.RawDataFilePhysicalCopy.raw_data_file_id == raw_data_file.id,
models.RawDataFilePhysicalCopy.data_location_id
== staging_job.destination_data_location_id,
models.RawDataFilePhysicalCopy.status
== models.PhysicalCopyStatus.PRESENT,
)
)
.first()
)
if not existing_copy:
physical_copy = models.RawDataFilePhysicalCopy(
raw_data_file_id=raw_data_file.id,
data_location_id=staging_job.destination_data_location_id,
status=models.PhysicalCopyStatus.PRESENT,
created_at=datetime.datetime.now(datetime.timezone.utc),
)
session.add(physical_copy)
session.commit()
logger.info(
f"Created physical copies for {len(raw_data_package.raw_data_files)} RawDataFiles"
)
def _mark_package_as_staged_and_cleanup(
session: Session,
staging_job: models.StagingJob,
raw_data_package: models.RawDataPackage,
destination_path: str,
) -> None:
"""Mark RawDataPackage as STAGED and delete the physical package file.
After unpacking and creating RawDataFile physical copies, we mark the package
as STAGED and remove the physical package file to save space.
"""
# Find or create the RawDataPackage physical copy record
package_physical_copy = (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
and_(
models.RawDataPackagePhysicalCopy.raw_data_package_id
== raw_data_package.id,
models.RawDataPackagePhysicalCopy.data_location_id
== staging_job.destination_data_location_id,
)
)
.first()
)
if not package_physical_copy:
# Create new record if it doesn't exist
package_physical_copy = models.RawDataPackagePhysicalCopy(
raw_data_package_id=raw_data_package.id,
data_location_id=staging_job.destination_data_location_id,
status=models.PhysicalCopyStatus.STAGED,
created_at=datetime.datetime.now(datetime.timezone.utc),
)
session.add(package_physical_copy)
else:
# Update existing record to STAGED
package_physical_copy.status = models.PhysicalCopyStatus.STAGED
# Delete the physical package file
# For staging, the package file is stored in a temporary location
# We need to find where the original package file was downloaded
package_file_path = None
# Look for the package file in the destination location's raw_data_packages directory
if isinstance(staging_job.destination_data_location, models.DiskDataLocation):
# Use just the filename to match the temporary path construction
package_filename = os.path.basename(raw_data_package.relative_path)
package_file_path = os.path.join(
staging_job.destination_data_location.path,
"raw_data_packages",
package_filename,
)
if package_file_path and os.path.exists(package_file_path):
try:
os.remove(package_file_path)
logger.info(f"Deleted physical package file: {package_file_path}")
except OSError as e:
logger.warning(
f"Failed to delete physical package file {package_file_path}: {str(e)}"
)
else:
logger.debug(
f"Package file not found at expected location: {package_file_path}"
)
session.commit()
logger.info(f"Marked RawDataPackage {raw_data_package.id} as STAGED")
def _log_staging_start(staging_job: models.StagingJob) -> None:
"""Log the start of a staging operation."""
logger.info(
f"Starting staging job {staging_job.id} "
f"from {staging_job.origin_data_location.name} to {staging_job.destination_data_location.name}"
)
def _get_pending_staging_jobs(session: Session) -> List[models.StagingJob]:
"""Get all pending staging jobs."""
return (
session.query(models.StagingJob)
.filter(models.StagingJob.status == models.Status.PENDING)
.all()
)
def _check_existing_copies(
session: Session, staging_job: models.StagingJob, raw_data_package_id: int
) -> bool:
"""Check if a specific package already exists at the destination.
For RawDataPackages, we check if they are STAGED (unpacked and physical file removed)
rather than PRESENT, since we unpack packages and remove the physical file after staging.
"""
logger.debug(
f"Checking existing copies for package {raw_data_package_id} at destination location {staging_job.destination_data_location_id}"
)
# Check if the package is already staged (STAGED status means unpacked and ready)
package_staged = (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
and_(
models.RawDataPackagePhysicalCopy.raw_data_package_id
== raw_data_package_id,
models.RawDataPackagePhysicalCopy.data_location_id
== staging_job.destination_data_location_id,
models.RawDataPackagePhysicalCopy.status
== models.PhysicalCopyStatus.STAGED,
)
)
.first()
is not None
)
if package_staged:
logger.debug(f"Package {raw_data_package_id} already staged at destination")
return True
# Also check if package is currently present (in case it was staged but not yet marked as STAGED)
package_present = (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
and_(
models.RawDataPackagePhysicalCopy.raw_data_package_id
== raw_data_package_id,
models.RawDataPackagePhysicalCopy.data_location_id
== staging_job.destination_data_location_id,
models.RawDataPackagePhysicalCopy.status
== models.PhysicalCopyStatus.PRESENT,
)
)
.first()
is not None
)
if package_present:
logger.debug(f"Package {raw_data_package_id} is present at destination")
# Check that the file is really present and not empty
physical_copy = (
session.query(models.RawDataPackagePhysicalCopy)
.filter(
and_(
models.RawDataPackagePhysicalCopy.raw_data_package_id
== raw_data_package_id,
models.RawDataPackagePhysicalCopy.data_location_id
== staging_job.destination_data_location_id,
)
)
.first()
)
if (
os.path.exists(physical_copy.full_path)
and os.path.getsize(physical_copy.full_path) > 0
):
logger.debug(f"Package {raw_data_package_id} is present at destination")
return True
else:
logger.debug(
f"Package {raw_data_package_id} is empty at destination, different from database"
)
# Fix database entry - mark as deleted since file is gone
physical_copy.status = models.PhysicalCopyStatus.DELETED
session.commit()
return False
else:
logger.debug(f"Package {raw_data_package_id} does not exist at destination")
return False
def _process_staging_job(staging_job: models.StagingJob, session: Session) -> None:
"""Process a single staging job using dynamic queue routing."""
logger = get_structured_logger(__name__)
# Use dynamic queue routing based on destination location
queue_name = route_task_by_location(
OperationType.STAGING, staging_job.destination_data_location
)
# Apply the task using the unified staging function
stage_data_task.apply_async(
args=[staging_job.id],
queue=queue_name,
)
staging_job.status = models.Status.SCHEDULED
session.commit()
logger.debug(
"staging_job_scheduled",
staging_job_id=staging_job.id,
destination=staging_job.destination_data_location.name,
queue=queue_name,
)
[docs]
def process_staging_jobs(verbose: bool = False, session: Session = None) -> None:
"""Main function to process all pending staging jobs."""
if verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
if session is None:
logger.info("No session provided, creating new one")
db = DatabaseConnection()
session, _ = db.get_connection()
should_close_session = True
else:
should_close_session = False
logger.info(
f"Processing {len(_get_pending_staging_jobs(session))} pending staging jobs"
)
try:
pending_jobs = _get_pending_staging_jobs(session)
logger.info(f"Found {len(pending_jobs)} pending staging jobs")
for job in pending_jobs:
logger.info(f"Processing staging job {job.id}")
logger.info(f"Staging job status: {job.status}")
logger.info(f"Staging job retry count: {job.retry_count}")
logger.info(
f"Staging job failure error message: {job.failure_error_message}"
)
logger.info(f"Staging job origin location: {job.origin_data_location.name}")
logger.info(
f"Staging job destination location: {job.destination_data_location.name}"
)
logger.info(f"Staging job raw data packages: {job.raw_data_packages}")
try:
# Process the staging job
_process_staging_job(job, session)
logger.info(f"Staging job {job.id} scheduled")
except Exception as e:
logger.error(f"Error processing staging job {job.id}: {str(e)}")
job.status = models.Status.FAILED
job.failure_error_message = str(e)
session.commit()
finally:
if should_close_session:
session.close()
[docs]
def get_processing_locations_for_site(
session: Session, site: models.Site
) -> List[models.DataLocation]:
"""Get all processing locations for a specific site."""
return (
session.query(models.DataLocation)
.filter(
models.DataLocation.site_id == site.id,
models.DataLocation.location_type == models.LocationType.PROCESSING,
models.DataLocation.active == True, # noqa: E712
)
.all()
)
[docs]
def get_sites_with_processing_locations(session: Session) -> List[models.Site]:
"""Get all sites that have processing locations."""
return (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.PROCESSING,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)