Source code for ccat_data_transfer.archive_manager

import datetime
import os
import subprocess
import hashlib
import base64
import json
from typing import Tuple, Optional
import time
import tempfile
import coscine

from sqlalchemy.orm import Session

from ccat_ops_db import models

from .database import DatabaseConnection
from .setup_celery_app import app, make_celery_task
from .utils import get_s3_client, get_redis_connection, get_s3_key_for_package
from .logging_utils import get_structured_logger
from .exceptions import ScheduleError
from .metrics import HousekeepingMetrics
from .queue_discovery import route_task_by_location
from .operation_types import OperationType

# Use only task loggers
logger = get_structured_logger(__name__)

redis_ = get_redis_connection()
# s3_client = boto3.client(
#     "s3",
#     endpoint_url=ccat_data_transfer_settings.s3_endpoint_url,
#     aws_access_key_id=ccat_data_transfer_settings.s3_access_key_id,
#     aws_secret_access_key=ccat_data_transfer_settings.s3_secret_access_key,
#     region_name=ccat_data_transfer_settings.s3_region_name,
#     config=boto3.session.Config(signature_version="s3v4"),
# )


[docs] class LongTermArchiveTask(make_celery_task()): """Base class for long term archive tasks."""
[docs] def __init__(self): super().__init__() self.operation_type = "long_term_archive"
[docs] def get_retry_count(self, session, operation_id): """Get current retry count for this operation.""" long_term_archive_transfer = session.query(models.LongTermArchiveTransfer).get( operation_id ) if long_term_archive_transfer: return long_term_archive_transfer.attempt_count return 0
[docs] def reset_state_on_failure(self, session, long_term_archive_transfer_id, exc): """Reset long term archive transfer state for retry.""" long_term_archive_transfer = session.query(models.LongTermArchiveTransfer).get( long_term_archive_transfer_id ) if long_term_archive_transfer: long_term_archive_transfer.status = models.Status.PENDING long_term_archive_transfer.raw_data_package.state = ( models.PackageState.TRANSFERRING ) long_term_archive_transfer.failure_error_message = None long_term_archive_transfer.attempt_count += 1 logger.info( "Reset long term archive transfer for retry", long_term_archive_transfer_id=long_term_archive_transfer_id, attempt_count=long_term_archive_transfer.attempt_count, ) _add_transfer_log( session, long_term_archive_transfer, f"Transfer failed, scheduling retry: {str(exc)}", ) session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "long_term_archive_transfer_reset", "data": long_term_archive_transfer_id, } ), )
[docs] def mark_permanent_failure(self, session, long_term_archive_transfer_id, exc): """Mark long term archive transfer as permanently failed.""" long_term_archive_transfer = session.query(models.LongTermArchiveTransfer).get( long_term_archive_transfer_id ) if long_term_archive_transfer: long_term_archive_transfer.status = models.Status.FAILED long_term_archive_transfer.raw_data_package.state = ( models.PackageState.FAILED ) long_term_archive_transfer.failure_error_message = str(exc) logger.info( "Marked long term archive transfer as permanently failed", long_term_archive_transfer_id=long_term_archive_transfer_id, ) _add_transfer_log( session, long_term_archive_transfer, f"Transfer permanently failed after {long_term_archive_transfer.attempt_count} attempts: {str(exc)}", ) session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "long_term_archive_transfer_failed", "data": long_term_archive_transfer_id, } ), )
[docs] def get_operation_info(self, args, kwargs): """Get additional context for long term archive tasks.""" if not args or len(args) == 0: return {} with self.session_scope() as session: try: long_term_archive_transfer = session.query( models.LongTermArchiveTransfer ).get(args[0]) if long_term_archive_transfer: return { "transfer_id": str(long_term_archive_transfer.id), "site_name": ( long_term_archive_transfer.site.short_name if long_term_archive_transfer.site else None ), "package_id": str( long_term_archive_transfer.raw_data_package_id ), "attempt_count": long_term_archive_transfer.attempt_count, "status": ( long_term_archive_transfer.status.value if long_term_archive_transfer.status else None ), } except Exception as e: logger.error(f"Error getting long term archive transfer info: {e}") return {}
@app.task( base=LongTermArchiveTask, name="ccat:data_transfer:long_term_archive", bind=True, ) def send_data_to_long_term_archive( self, long_term_archive_transfer_id: int, session: Session = None ) -> None: """ Transfers raw data package to the long term archive using dynamic queue routing. Parameters ---------- self : celery.Task The Celery task instance. long_term_archive_transfer_id : int The ID of the LongTermArchiveTransfer object in the database. Returns ------- None Notes ----- - Fetches the LongTermArchiveTransfer object from the database. - Uses dynamic queue routing based on the destination location. - Executes the transfer command to move the data. - Updates the LongTermArchiveTransfer status and logs in the database. """ logger.info( "send_data_to_long_term_archive", long_term_archive_transfer_id=long_term_archive_transfer_id, ) if session is None: with self.session_scope() as session: return _send_data_to_long_term_archive_internal( session, long_term_archive_transfer_id ) else: return _send_data_to_long_term_archive_internal( session, long_term_archive_transfer_id ) def _send_data_to_long_term_archive_internal( session: Session, long_term_archive_transfer_id: int ) -> None: """Send data to long term archive using the new DataLocation system.""" start_time = datetime.datetime.now() long_term_archive_transfer = _get_long_term_archive_transfer( session, long_term_archive_transfer_id ) source_url, destination_url, destination_bucket = _construct_transfer_urls( long_term_archive_transfer ) _execute_transfer( session, source_url, long_term_archive_transfer.raw_data_package_id, destination_url, destination_bucket, long_term_archive_transfer, ) end_time = datetime.datetime.now() duration = end_time - start_time _mark_transfer_successful( session, long_term_archive_transfer, duration, destination_url, ) logger.info( "long_term_archive_transfer_completed", long_term_archive_transfer_id=long_term_archive_transfer_id, duration=str(duration), ) redis_.publish( "transfer:overview", json.dumps( { "type": "long_term_archive_transfer_completed", "data": long_term_archive_transfer_id, } ), ) def _get_long_term_archive_transfer( session: Session, long_term_archive_transfer_id: int ) -> models.LongTermArchiveTransfer: """Retrieve the LongTermArchiveTransfer object from the database.""" try: long_term_archive_transfer = session.get( models.LongTermArchiveTransfer, long_term_archive_transfer_id ) if not long_term_archive_transfer: raise ValueError( f"Long term archive transfer {long_term_archive_transfer_id} not found" ) logger.info( "long_term_archive_transfer_found", long_term_archive_transfer_id=long_term_archive_transfer_id, ) return long_term_archive_transfer except Exception as e: logger.error( "long_term_archive_transfer_not_found", long_term_archive_transfer_id=long_term_archive_transfer_id, error=str(e), ) raise def _validate_long_term_archive_path( long_term_archive_transfer: models.LongTermArchiveTransfer, ) -> bool: """Validate the long term archive raw data package path.""" if ( not long_term_archive_transfer.data_archive.long_term_archive_raw_data_package_path ): logger.error( "missing_long_term_archive_path", long_term_archive_transfer_id=long_term_archive_transfer.id, archive_name=long_term_archive_transfer.data_archive.name, ) return False return True def _construct_transfer_urls( long_term_archive_transfer: models.LongTermArchiveTransfer, ) -> Tuple[str, str, str]: """Construct the source and destination URLs using the new DataLocation system.""" # Get the source and destination locations source_location = long_term_archive_transfer.origin_data_location destination_location = long_term_archive_transfer.destination_data_location if not source_location or not destination_location: raise ValueError( "Source or destination location not set for long term archive transfer" ) # Get the raw data package raw_data_package = long_term_archive_transfer.raw_data_package # Construct source path based on source location type if isinstance(source_location, models.DiskDataLocation): source_url = os.path.join(source_location.path, raw_data_package.relative_path) elif isinstance(source_location, models.S3DataLocation): # For source S3 locations, use the location's bucket name source_url = ( f"s3://{source_location.bucket_name}/{raw_data_package.relative_path}" ) else: raise ValueError( f"Unsupported source location type: {source_location.storage_type}" ) # Construct destination path based on destination location type if isinstance(destination_location, models.DiskDataLocation): destination_url = os.path.join( destination_location.path, raw_data_package.relative_path ) destination_bucket = None elif isinstance(destination_location, models.S3DataLocation): # Use the shared function to construct the S3 key consistently destination_url = get_s3_key_for_package(destination_location, raw_data_package) destination_bucket = destination_location.bucket_name else: raise ValueError( f"Unsupported destination location type: {destination_location.storage_type}" ) logger.debug( "transfer_paths_new", source_path=source_url, destination_path=destination_url, source_location_type=source_location.storage_type.value, destination_location_type=destination_location.storage_type.value, ) return source_url, destination_url, destination_bucket def _get_s3_metadata( session: Session, raw_data_package: models.RawDataPackage, file_size: int, ) -> dict: """Generate comprehensive metadata for S3 upload from database models. This function extracts metadata from the database models and constructs a dictionary suitable for S3 object metadata. It follows a hierarchical structure for different types of metadata (obs_, file_, archive_, etc.). Database Relationships: - RawDataPackage -> ObsUnit -> Source (FixedSource or other) - RawDataPackage -> ObsUnit -> ExecutedObsUnit - RawDataPackage -> ObsUnit -> PrimaryInstrumentModuleConfiguration -> InstrumentModule -> Instrument - RawDataPackage -> ObsUnit -> ObservingProgram - RawDataPackage -> ObsUnit -> SubObservingProgram Parameters ---------- session : Session SQLAlchemy database session raw_data_package : models.RawDataPackage The raw data package being uploaded file_size : int Size of the file in bytes Returns ------- dict Dictionary of metadata key-value pairs for S3 upload """ # Get observation information obs_unit = raw_data_package.obs_unit executed_obs_unit = ( obs_unit.executed_obs_units[0] if obs_unit.executed_obs_units else None ) # Get source information source = obs_unit.source if isinstance(source, models.FixedSource): ra_deg = source.ra_deg dec_deg = source.dec_deg target_name = source.name else: ra_deg = None dec_deg = None target_name = source.name # Get instrument information instrument = ( obs_unit.primary_instrument_module_configuration.instrument_module.instrument ) # Get observing program information program = obs_unit.observing_program subprogram = obs_unit.sub_observing_program # Generate a timestamp for the dataset ID if executed_obs_unit is None timestamp = ( executed_obs_unit.start_time.strftime("%Y%m%d") if executed_obs_unit and executed_obs_unit.start_time else datetime.datetime.now().strftime("%Y%m%d") ) # Log debug information logger.debug( "metadata_generation", raw_data_package_id=raw_data_package.id, obs_unit_id=obs_unit.id, executed_obs_unit_id=executed_obs_unit.id if executed_obs_unit else None, source_type=source.__class__.__name__, instrument_name=instrument.name, program_id=program.short_name, ) # Construct metadata metadata = { "obs_dataset_id": f"{instrument.name}-{timestamp}-{obs_unit.id}", "obs_telescope": instrument.telescope.name, "obs_instrument": instrument.name, "obs_date_obs": ( executed_obs_unit.start_time.isoformat() if executed_obs_unit and executed_obs_unit.start_time else None ), # Target information "obs_target_name": target_name, "obs_ra_deg": str(ra_deg) if ra_deg is not None else None, "obs_dec_deg": str(dec_deg) if dec_deg is not None else None, # Program tracking "obs_program_id": program.short_name, "obs_subprogram_id": subprogram.short_name if subprogram else None, # Database linkage "db_obs_unit_id": str(obs_unit.id), "db_executed_obs_unit_id": ( str(executed_obs_unit.id) if executed_obs_unit else None ), # Data quality and status "file_xxhash64": raw_data_package.checksum, } # Remove None values metadata = {k: v for k, v in metadata.items() if v is not None} # Log final metadata keys logger.debug( "metadata_generated", metadata_keys=list(metadata.keys()), metadata_count=len(metadata), ) return metadata def _upload_extended_metadata( s3_client, destination_bucket: str, destination_url: str, metadata: dict, ) -> bool: """Upload extended metadata to S3 as a separate JSON file. Parameters ---------- s3_client : boto3.client S3 client instance destination_bucket : str S3 bucket name destination_url : str Base destination path in S3 metadata : dict Extended metadata to upload Returns ------- bool True if upload was successful, False otherwise """ try: # save to tmp file and then read and upload with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp_file: tmp_file.write(json.dumps(metadata, indent=2).encode("utf-8")) tmp_file_path = tmp_file.name # Read the file content for upload with open(tmp_file_path, "rb") as f: file_content = f.read() # Calculate checksum from the content metadata_checksum = base64.b64encode( hashlib.sha256(file_content).digest() ).decode("utf-8") # Construct metadata file path metadata_path = f"{os.path.splitext(destination_url)[0]}_metadata.json" # Upload metadata file with content and checksum s3_client.put_object( Bucket=destination_bucket, Key=metadata_path, Body=file_content, ContentType="application/json", ChecksumSHA256=metadata_checksum, ) # Clean up temporary file os.unlink(tmp_file_path) logger.info( "extended_metadata_upload_successful", metadata_path=f"s3://{destination_bucket}/{metadata_path}", ) return True except Exception as e: logger.error( "extended_metadata_upload_failed", error=str(e), metadata_path=f"s3://{destination_bucket}/{metadata_path}", ) # Clean up temporary file on error try: if "tmp_file_path" in locals(): os.unlink(tmp_file_path) except Exception: pass return False def _get_raw_data_package_id_from_db(session: Session, filename: str) -> int: """Get raw data package ID from database using filename. Parameters ---------- session : Session SQLAlchemy database session filename : str The filename to look up Returns ------- int The raw data package ID Raises ------ ValueError If the raw data package cannot be found """ try: # Get just the filename without the path basename = os.path.basename(filename) # Log the lookup attempt logger.debug( "looking_up_raw_data_package", filename=basename, full_path=filename ) # Try to find the raw data package by filename raw_data_package = ( session.query(models.RawDataPackage) .filter(models.RawDataPackage.name == basename) .first() ) if not raw_data_package: # Try to find by relative path raw_data_package = ( session.query(models.RawDataPackage) .filter(models.RawDataPackage.relative_path.like(f"%{basename}")) .first() ) if not raw_data_package: # Log the failed lookup logger.error( "raw_data_package_not_found", filename=basename, full_path=filename ) raise ValueError(f"No raw data package found for filename: {basename}") # Log successful lookup logger.debug( "raw_data_package_found", package_id=raw_data_package.id, filename=basename ) return raw_data_package.id except Exception as e: logger.error( "error_looking_up_raw_data_package", error=str(e), filename=os.path.basename(filename), full_path=filename, ) raise ValueError(f"Error looking up raw data package: {str(e)}") def _execute_transfer( session: Session, source_url: str, raw_data_package_id: int, destination_url: str, destination_bucket: str, long_term_archive_transfer: models.LongTermArchiveTransfer, ) -> None: """Execute the transfer between different storage types using the new DataLocation system.""" try: start_time = time.time() # Get the source and destination locations source_location = long_term_archive_transfer.origin_data_location destination_location = long_term_archive_transfer.destination_data_location # Determine transfer method based on location types if isinstance(source_location, models.DiskDataLocation) and isinstance( destination_location, models.DiskDataLocation ): # Disk to disk transfer _execute_disk_to_disk_transfer(source_url, destination_url) elif isinstance(source_location, models.DiskDataLocation) and isinstance( destination_location, models.S3DataLocation ): logger.debug( " ".join( [ source_url, str(raw_data_package_id), destination_url, destination_bucket, ] ) ) # Disk to S3 transfer _execute_disk_to_s3_transfer( session, source_url, raw_data_package_id, destination_url, destination_bucket, destination_location, source_location.site.short_name if source_location.site else None, ) elif isinstance(source_location, models.S3DataLocation) and isinstance( destination_location, models.DiskDataLocation ): # S3 to disk transfer _execute_s3_to_disk_transfer(source_url, destination_url) elif isinstance(source_location, models.S3DataLocation) and isinstance( destination_location, models.S3DataLocation ): # S3 to S3 transfer # _execute_s3_to_s3_transfer(source_url, destination_url, destination_bucket) raise ValueError("S3 to S3 transfer is not supported yet") else: raise ValueError( f"Unsupported transfer combination: {source_location.storage_type} to {destination_location.storage_type}" ) # Calculate transfer metrics end_time = time.time() duration = end_time - start_time # Get file size for metrics file_size = 0 if isinstance(source_location, models.DiskDataLocation) and os.path.exists( source_url ): file_size = os.path.getsize(source_url) # Send metrics to InfluxDB if file_size > 0: transfer_rate = (file_size / duration) / (1024 * 1024) # MB/s metrics = HousekeepingMetrics() try: metrics.send_transfer_metrics( operation="long_term_archive_transfer", source_path=source_url, destination_path=destination_url, file_size=file_size, duration=duration, success=True, error_message=None, additional_fields={ "transfer_rate_mbps": transfer_rate, }, additional_tags={ "source_location": source_location.name, "destination_location": destination_location.name, "transfer_id": str(long_term_archive_transfer.id), "transfer_method": f"{source_location.storage_type.value}_to_{destination_location.storage_type.value}", }, ) except Exception as e: logger.error( "metrics_send_failed", error=e, transfer_id=long_term_archive_transfer.id, ) finally: metrics.close() except Exception as e: logger.error( "transfer_execution_failed", error=str(e), source_url=source_url, destination_url=destination_url, ) raise def _execute_disk_to_disk_transfer(source_url: str, destination_url: str) -> None: """Execute disk to disk transfer using cp command.""" try: # Create destination directory if it doesn't exist dest_dir = os.path.dirname(destination_url) if not os.path.exists(dest_dir): os.makedirs(dest_dir, exist_ok=True) # Execute copy command cp_command = ["cp", source_url, destination_url] logger.info("executing_disk_to_disk_transfer", command=" ".join(cp_command)) result = subprocess.run( cp_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) if result.returncode == 0: logger.info( "disk_to_disk_transfer_successful", source=source_url, destination=destination_url, ) else: logger.error( "disk_to_disk_transfer_failed", source=source_url, destination=destination_url, error=result.stderr, ) except Exception as e: logger.error( "disk_to_disk_transfer_exception", error=str(e), source=source_url, destination=destination_url, ) raise def _execute_disk_to_s3_transfer( session: Session, source_url: str, raw_data_package_id: int, destination_url: str, destination_bucket: str, destination_location: Optional[models.S3DataLocation] = None, site_name: Optional[str] = None, ) -> None: """Execute disk to S3 transfer using the existing S3 upload logic.""" from .config.config import ccat_data_transfer_settings if ccat_data_transfer_settings.s3_method == "boto3": _execute_s3_upload( session, source_url, raw_data_package_id, destination_url, destination_bucket, None, # No source_archive in new system destination_location, site_name, ) elif ccat_data_transfer_settings.s3_method == "coscine": _execute_coscine_s3_upload( session, source_url, raw_data_package_id, destination_url, destination_bucket, destination_location, site_name, ) def _execute_s3_to_disk_transfer(source_url: str, destination_url: str) -> None: """Execute S3 to disk transfer using aws s3 cp command.""" try: # Create destination directory if it doesn't exist dest_dir = os.path.dirname(destination_url) if not os.path.exists(dest_dir): os.makedirs(dest_dir, exist_ok=True) # Execute S3 download command s3_command = ["aws", "s3", "cp", source_url, destination_url] logger.info("executing_s3_to_disk_transfer", command=" ".join(s3_command)) result = subprocess.run( s3_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) if result.returncode == 0: logger.info( "s3_to_disk_transfer_successful", source=source_url, destination=destination_url, ) else: logger.error( "s3_to_disk_transfer_failed", source=source_url, destination=destination_url, error=result.stderr, ) except Exception as e: logger.error( "s3_to_disk_transfer_exception", error=str(e), source=source_url, destination=destination_url, ) raise # def _execute_s3_to_s3_transfer( # source_url: str, destination_url: str, destination_bucket: str # ) -> None: # """Execute S3 to S3 transfer using aws s3 cp command.""" # try: # # Execute S3 copy command # s3_command = ["aws", "s3", "cp", source_url, destination_url] # logger.info("executing_s3_to_s3_transfer", command=" ".join(s3_command)) # result = subprocess.run( # s3_command, # stdout=subprocess.PIPE, # stderr=subprocess.PIPE, # text=True, # ) # if result.returncode == 0: # logger.info( # "s3_to_s3_transfer_successful", # source=source_url, # destination=destination_url, # ) # else: # logger.error( # "s3_to_s3_transfer_failed", # source=source_url, # destination=destination_url, # error=result.stderr, # ) # except Exception as e: # logger.error( # "s3_to_s3_transfer_exception", # error=str(e), # source=source_url, # destination=destination_url, # ) # raise def _execute_s3_upload( session: Session, source_url: str, raw_data_package_id: int, destination_url: str, destination_bucket: str, source_archive: models.DataArchive, destination_location: Optional[models.S3DataLocation] = None, site_name: Optional[str] = None, ) -> None: """Execute the command to upload files to S3 with MD5 verification and comprehensive metadata. This function uploads a file to S3 with MD5 verification to ensure data integrity during transfer. It also includes comprehensive metadata from the database to ensure proper data discovery and provenance tracking. Additionally, it generates and uploads an extended metadata file in JSON format. Parameters ---------- source_url : str Path to the source file to upload destination_url : str Destination path in S3 destination_bucket : str S3 bucket name destination_location : Optional[models.S3DataLocation] Destination S3 location for specific configuration site_name : Optional[str] Name of the site for credential loading Raises ------ RuntimeError If the S3 upload operation fails """ start_time = time.time() # Read file data and calculate MD5 with open(source_url, "rb") as f: file_data = f.read() sha256_hash = hashlib.sha256(file_data).digest() sha256_b64 = base64.b64encode(sha256_hash).decode("utf-8") # Get file size file_size = os.path.getsize(source_url) # Get S3 client with location-specific configuration s3_client = get_s3_client(destination_location, site_name) # Query the database for package information raw_data_package = session.query(models.RawDataPackage).get(raw_data_package_id) if not raw_data_package: raise ValueError(f"Raw data package {raw_data_package_id} not found") # Get metadata from database metadata = _get_s3_metadata(session, raw_data_package, file_size) # Upload with MD5 verification and metadata s3_client.put_object( Bucket=destination_bucket, Key=destination_url, Body=file_data, ChecksumSHA256=sha256_b64, Metadata=metadata, ) # Calculate transfer metrics end_time = time.time() # Generate and upload extended metadata extended_metadata = _generate_ivoa_metadata(session, raw_data_package, file_size) metadata_upload_success = _upload_extended_metadata( s3_client, destination_bucket, destination_url, extended_metadata ) if not metadata_upload_success: logger.warning( "extended_metadata_upload_failed_but_data_uploaded", source=source_url, destination=f"s3://{destination_bucket}/{destination_url}", ) logger.info( "s3_upload_successful", source=source_url, destination=f"s3://{destination_bucket}/{destination_url}", md5_verified=True, metadata_keys=list(metadata.keys()), extended_metadata_uploaded=metadata_upload_success, ) duration = end_time - start_time transfer_rate = (file_size / duration) / (1024 * 1024) # MB/s transfer_metrics = { "bytes_transferred": file_size, "duration": duration, "transfer_rate_mbps": transfer_rate, } # Send metrics to InfluxDB metrics = HousekeepingMetrics() try: metrics.send_transfer_metrics( operation="s3_upload", source_path=source_url, destination_path=f"s3://{destination_bucket}/{destination_url}", file_size=transfer_metrics["bytes_transferred"], duration=transfer_metrics["duration"], success=True, error_message=None, additional_fields={ "transfer_rate_mbps": transfer_metrics["transfer_rate_mbps"], }, additional_tags={ "source_archive": ( source_archive.short_name if source_archive else "unknown" ), "destination_archive": "s3", "transfer_id": str(raw_data_package_id), "transfer_method": "s3", }, ) except Exception as e: logger.error("metrics_send_failed", error=e, transfer_id=raw_data_package_id) finally: metrics.close() def _execute_coscine_s3_upload( session: Session, source_url: str, raw_data_package_id: int, destination_url: str, destination_bucket: str, source_archive: models.DataArchive, destination_location: Optional[models.S3DataLocation] = None, site_name: Optional[str] = None, ) -> None: """Execute the command to upload files to S3 using Coscine with MD5 verification and comprehensive metadata. This function uploads a file to S3 using the Coscine API with MD5 verification to ensure data integrity during transfer. It also includes comprehensive metadata from the database to ensure proper data discovery and provenance tracking. Additionally, it generates and uploads an extended metadata file in JSON format. Parameters ---------- source_url : str Path to the source file to upload destination_url : str Destination path in S3 destination_bucket : str S3 bucket name destination_location : Optional[models.S3DataLocation] Destination S3 location for specific configuration site_name : Optional[str] Name of the site for credential loading Raises ------ RuntimeError If the S3 upload operation fails """ from .config.config import ccat_data_transfer_settings start_time = time.time() # Check if COSCINE is configured if ( ccat_data_transfer_settings.coscine_api_token == "none" or ccat_data_transfer_settings.coscine_project == "none" or ccat_data_transfer_settings.coscine_resource == "none" ): raise ValueError( "COSCINE configuration is not set. Please configure COSCINE_API_TOKEN, COSCINE_PROJECT, and COSCINE_RESOURCE in settings.toml" ) # Read file data with open(source_url, "rb") as f: file_data = f.read() # Note: Coscine handles integrity verification internally # Get file size file_size = os.path.getsize(source_url) # Query the database for package information raw_data_package = session.query(models.RawDataPackage).get(raw_data_package_id) if not raw_data_package: raise ValueError(f"Raw data package {raw_data_package_id} not found") # Get metadata from database metadata = _get_s3_metadata(session, raw_data_package, file_size) # Initialize Coscine client client = coscine.ApiClient(ccat_data_transfer_settings.coscine_api_token) project = client.project(ccat_data_transfer_settings.coscine_project) resource = project.resource(ccat_data_transfer_settings.coscine_resource) # Get metadata form and populate it coscine_metadata = resource.metadata_form() # Map our metadata to Coscine metadata format # Basic metadata mapping if "obs_target_name" in metadata: coscine_metadata["Title"] = f"CCAT Observation: {metadata['obs_target_name']}" else: coscine_metadata["Title"] = ( f"CCAT Data Package: {raw_data_package.relative_path}" ) if "obs_creator" in metadata: coscine_metadata["Creator"] = metadata["obs_creator"] else: coscine_metadata["Creator"] = "CCAT Observatory" # Add additional metadata fields if "obs_start_time" in metadata: coscine_metadata["Date"] = metadata["obs_start_time"] if "obs_instrument_name" in metadata: coscine_metadata["Subject"] = f"Instrument: {metadata['obs_instrument_name']}" # Upload file with metadata try: # Create a file-like object from the file data import io file_obj = io.BytesIO(file_data) destination_url = destination_url.replace(":", "_") # Upload the file resource.upload(destination_url, file_obj, coscine_metadata) # Calculate transfer metrics end_time = time.time() duration = end_time - start_time transfer_rate = (file_size / duration) / (1024 * 1024) # MB/s # Generate and upload extended metadata extended_metadata = _generate_ivoa_metadata( session, raw_data_package, file_size ) # For Coscine, we'll upload the extended metadata as a separate file metadata_upload_success = _upload_coscine_extended_metadata( client, project, resource, destination_url, extended_metadata ) if not metadata_upload_success: logger.warning( "extended_metadata_upload_failed_but_data_uploaded", source=source_url, destination=f"coscine://{ccat_data_transfer_settings.coscine_project}/{ccat_data_transfer_settings.coscine_resource}/{destination_url}", ) logger.info( "coscine_s3_upload_successful", source=source_url, destination=f"coscine://{ccat_data_transfer_settings.coscine_project}/{ccat_data_transfer_settings.coscine_resource}/{destination_url}", md5_verified=True, metadata_keys=list(metadata.keys()), extended_metadata_uploaded=metadata_upload_success, ) transfer_metrics = { "bytes_transferred": file_size, "duration": duration, "transfer_rate_mbps": transfer_rate, } # Send metrics to InfluxDB metrics = HousekeepingMetrics() try: metrics.send_transfer_metrics( operation="coscine_s3_upload", source_path=source_url, destination_path=f"coscine://{ccat_data_transfer_settings.coscine_project}/{ccat_data_transfer_settings.coscine_resource}/{destination_url}", file_size=transfer_metrics["bytes_transferred"], duration=transfer_metrics["duration"], success=True, error_message=None, additional_fields={ "transfer_rate_mbps": transfer_metrics["transfer_rate_mbps"], }, additional_tags={ "source_archive": ( source_archive.short_name if source_archive else "unknown" ), "destination_archive": "coscine", "transfer_id": str(raw_data_package_id), "transfer_method": "coscine", }, ) except Exception as e: logger.error( "metrics_send_failed", error=e, transfer_id=raw_data_package_id ) finally: metrics.close() except Exception as e: logger.error( "coscine_s3_upload_failed", source=source_url, destination=f"coscine://{ccat_data_transfer_settings.coscine_project}/{ccat_data_transfer_settings.coscine_resource}/{destination_url}", error=str(e), ) raise RuntimeError(f"COSCINE S3 upload failed: {str(e)}") def _upload_coscine_extended_metadata( client, project, resource, destination_url: str, metadata: dict, ) -> bool: """Upload extended metadata to Coscine as a separate JSON file. Parameters ---------- client : coscine.ApiClient Coscine API client instance project : coscine.Project Coscine project instance resource : coscine.Resource Coscine resource instance destination_url : str Base destination path metadata : dict Extended metadata to upload Returns ------- bool True if upload was successful, False otherwise """ try: # Create metadata filename metadata_filename = f"{destination_url}_metadata.json" # Convert metadata to JSON metadata_json = json.dumps(metadata, indent=2) # Create a file-like object from the JSON data import io metadata_file_obj = io.BytesIO(metadata_json.encode("utf-8")) metadata_form = resource.metadata_form() metadata_form["Title"] = "Extended Metadata" metadata_form["Creator"] = "CCAT Observatory" # Upload the metadata file resource.upload( metadata_filename, metadata_file_obj, metadata_form, # {"Title": "Extended Metadata", "Creator": "CCAT Observatory"}, ) return True except Exception as e: logger.error( "coscine_metadata_upload_failed", error=str(e), metadata_filename=metadata_filename, ) return False def _mark_transfer_successful( session: Session, long_term_archive_transfer: models.LongTermArchiveTransfer, duration: datetime.timedelta, destination_url: str, ) -> None: """Mark the transfer as successful and log the result using the new DataLocation system.""" logger.info( "long_term_archive_transfer_completed", long_term_archive_transfer_id=long_term_archive_transfer.id, package_path=long_term_archive_transfer.raw_data_package.relative_path, duration=str(duration), ) # Create physical copy record for the destination LTA location destination_location = long_term_archive_transfer.destination_data_location physical_copy = models.RawDataPackagePhysicalCopy( raw_data_package=long_term_archive_transfer.raw_data_package, data_location=destination_location, checksum=long_term_archive_transfer.raw_data_package.checksum, verified_at=datetime.datetime.now(), ) session.add(physical_copy) long_term_archive_transfer.status = models.Status.COMPLETED long_term_archive_transfer.raw_data_package.state = models.PackageState.ARCHIVED long_term_archive_transfer.end_time = datetime.datetime.now() session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "long_term_archive_transfer_completed", "data": long_term_archive_transfer.id, } ), ) _add_transfer_log( session, long_term_archive_transfer, f"Transfer successful to long term archive in {duration}", ) def _add_transfer_log( session: Session, long_term_archive_transfer: models.LongTermArchiveTransfer, log_message: str, ) -> None: """Add a log entry for the long term archive transfer.""" log_entry = models.LongTermArchiveTransferLog( long_term_archive_transfer_id=long_term_archive_transfer.id, log=f"{datetime.datetime.now()} - {log_message}", timestamp=datetime.datetime.now(), ) session.add(log_entry) session.commit()
[docs] def transfer_raw_data_packages_to_long_term_archive( verbose: bool = False, site_name: Optional[str] = None ) -> None: """ Schedule long term archive transfer tasks for pending raw data packages using the new DataLocation system. Args: verbose (bool): If True, sets logging to DEBUG level. Defaults to False. site_name (Optional[str]): If provided, only schedules transfers for the specified site. Raises: SQLAlchemyError: If there's an issue with database operations. """ db = DatabaseConnection() session, _ = db.get_connection() _set_logging_level(verbose) # Get all sites with LTA locations lta_sites = _get_sites_with_lta_locations(session, site_name) logger.debug( "lta_sites", sites=[site.short_name for site in lta_sites], count=len(lta_sites), ) for site in lta_sites: # Get all LTA locations for this site lta_locations = _get_lta_locations_for_site(session, site) for lta_location in lta_locations: # Find pending existing transfers for this LTA location pending_existing_transfers = ( _get_pending_existing_transfers_to_lta_location(session, lta_location) ) if len(pending_existing_transfers) > 0: for transfer in pending_existing_transfers: _schedule_transfer_task(session, transfer) # Find new transfers that need to be created for this LTA location pending_new_transfers = _get_pending_new_transfers_to_lta_location( session, lta_location ) if len(pending_new_transfers) > 0: logger.info( "pending_transfers", site_name=site.short_name, lta_location_name=lta_location.name, pending_transfers=len(pending_new_transfers), ) for package in pending_new_transfers: long_term_archive_transfer = _create_long_term_archive_transfer( session, package, lta_location ) _schedule_transfer_task(session, long_term_archive_transfer) session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "long_term_archive_transfer_scheduled", } ), )
def _get_pending_existing_transfers_to_lta_location( session: Session, lta_location: models.DataLocation ) -> list: return ( session.query(models.LongTermArchiveTransfer) .filter( models.LongTermArchiveTransfer.destination_data_location_id == lta_location.id, models.LongTermArchiveTransfer.status == models.Status.PENDING, ) .all() ) def _get_failed_transfers(session: Session, lta_location: models.DataLocation) -> list: """ Retrieve failed long term archive transfers for a specific LTA location. Args: session (Session): The database session. lta_location (models.DataLocation): The LTA location to check for failed transfers. Returns: list: A list of LongTermArchiveTransfer objects with failed status. """ return ( session.query(models.LongTermArchiveTransfer) .filter( models.LongTermArchiveTransfer.destination_data_location_id == lta_location.id, models.LongTermArchiveTransfer.status == models.Status.FAILED, models.LongTermArchiveTransfer.attempt_count < 3, ) .all() ) def _set_logging_level(verbose: bool) -> None: if verbose: logger.setLevel("DEBUG") def _get_sites_with_lta_locations( session: Session, site_name: Optional[str] = None ) -> list: """Get all sites that have LTA locations.""" query = ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.DataLocation.active == True, # noqa: E712 ) .distinct() ) if site_name: query = query.filter(models.Site.short_name == site_name) return query.all() def _get_lta_locations_for_site(session: Session, site: models.Site) -> list: """Get all LTA locations for a specific site.""" return ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == site.id, models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.DataLocation.active == True, # noqa: E712 ) .all() ) def _get_pending_new_transfers_to_lta_location( session: Session, lta_location: models.DataLocation ) -> list: """Get raw data packages that need to be transferred to a specific LTA location.""" # Find packages that are in buffer locations and haven't been transferred to this LTA location yet # Debug: Log the LTA location details logger.debug( "checking_lta_location", lta_location_id=lta_location.id, lta_location_name=lta_location.name, lta_location_site_id=lta_location.site_id, lta_location_site_name=lta_location.site.short_name, ) # Debug: Check what buffer locations exist for this site buffer_locations = ( session.query(models.DataLocation.id, models.DataLocation.name) .filter( models.DataLocation.location_type == models.LocationType.BUFFER, models.DataLocation.active == True, # noqa: E712 models.DataLocation.site_id == lta_location.site_id, ) .all() ) logger.debug( "buffer_locations_for_site", site_id=lta_location.site_id, site_name=lta_location.site.short_name, buffer_locations=[{"id": loc.id, "name": loc.name} for loc in buffer_locations], ) # Debug: Check existing LTA transfers for this destination existing_transfers = ( session.query(models.LongTermArchiveTransfer.raw_data_package_id) .filter( models.LongTermArchiveTransfer.destination_data_location_id == lta_location.id, ) .all() ) existing_package_ids = [t.raw_data_package_id for t in existing_transfers] logger.debug( "existing_lta_transfers", destination_location_id=lta_location.id, destination_location_name=lta_location.name, existing_package_count=len(existing_package_ids), existing_package_ids=existing_package_ids, ) pending_packages = ( session.query(models.RawDataPackage) .join(models.RawDataPackagePhysicalCopy) .filter( models.RawDataPackagePhysicalCopy.data_location_id.in_( [loc.id for loc in buffer_locations] ), models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ~models.RawDataPackage.id.in_(existing_package_ids), ) .distinct() .all() ) logger.info( "pending_packages", lta_location_name=lta_location.name, site_name=lta_location.site.short_name, pending_packages=len(pending_packages), ) # Debug: Log details of found packages if pending_packages: package_details = [] for package in pending_packages: physical_copies = [ { "location_id": pc.data_location_id, "location_name": pc.data_location.name, "status": pc.status.value, } for pc in package.physical_copies ] package_details.append( { "id": package.id, "name": package.name, "physical_copies": physical_copies, } ) logger.debug( "pending_package_details", packages=package_details, ) return pending_packages def _create_long_term_archive_transfer( session: Session, package: models.RawDataPackage, lta_location: models.DataLocation ) -> models.LongTermArchiveTransfer: """Create a new long term archive transfer for a package to a specific LTA location.""" # Find the source buffer location for this package source_physical_copy = ( session.query(models.RawDataPackagePhysicalCopy) .join(models.DataLocation) .filter( models.RawDataPackagePhysicalCopy.raw_data_package_id == package.id, models.DataLocation.location_type == models.LocationType.BUFFER, models.DataLocation.site_id == lta_location.site_id, models.DataLocation.active == True, # noqa: E712 models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) .first() ) if not source_physical_copy: raise ValueError(f"No source buffer found for package {package.id}") long_term_archive_transfer = models.LongTermArchiveTransfer( raw_data_package_id=package.id, site_id=lta_location.site_id, origin_data_location_id=source_physical_copy.data_location_id, destination_data_location_id=lta_location.id, status=models.Status.PENDING, ) session.add(long_term_archive_transfer) session.flush() session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "long_term_archive_transfer_created", "data": long_term_archive_transfer.id, } ), ) return long_term_archive_transfer def _max_retries_reached( long_term_archive_transfer: models.LongTermArchiveTransfer, ) -> bool: return long_term_archive_transfer.attempt_count >= 3 def _schedule_transfer_task( session: Session, long_term_archive_transfer: models.LongTermArchiveTransfer, ) -> None: """Schedule a long term archive transfer task using dynamic queue routing.""" try: # Use dynamic queue routing based on the destination location queue_name = route_task_by_location( OperationType.LONG_TERM_ARCHIVE_TRANSFER, long_term_archive_transfer.destination_data_location, ) logger.info( "scheduling_long_term_archive_transfer", long_term_archive_transfer_id=long_term_archive_transfer.id, destination_location=long_term_archive_transfer.destination_data_location.name, queue=queue_name, ) # Schedule the task with the appropriate queue send_data_to_long_term_archive.apply_async( args=[long_term_archive_transfer.id], queue=queue_name, ) long_term_archive_transfer.status = models.Status.SCHEDULED long_term_archive_transfer.start_time = datetime.datetime.now() except Exception as e: logger.error( "scheduling_failed", long_term_archive_transfer_id=long_term_archive_transfer.id, error=str(e), ) long_term_archive_transfer_log = models.LongTermArchiveTransferLog( long_term_archive_transfer_id=long_term_archive_transfer.id, log=f"Error scheduling long term archive transfer task: {str(e)}", long_term_archive_transfer=long_term_archive_transfer, timestamp=datetime.datetime.now(), ) session.add(long_term_archive_transfer_log) raise ScheduleError( f"Error scheduling long term archive transfer task: {str(e)}" ) finally: session.commit() logger.info( "long_term_archive_transfer_scheduled", long_term_archive_transfer_id=long_term_archive_transfer.id, queue=queue_name, ) redis_.publish( "transfer:overview", json.dumps( { "type": "long_term_archive_transfer_scheduled", "data": long_term_archive_transfer.id, "queue": queue_name, } ), ) def _generate_ivoa_metadata( session: Session, raw_data_package: models.RawDataPackage, file_size: int, ) -> dict: """Generate IVOA-compatible extended metadata for a raw data package. This function creates a comprehensive metadata document that follows IVOA standards while maintaining flexibility for different instruments and observatories. It combines data from the core database models with additional metadata stored in the RawDataPackageMetadata table. Parameters ---------- session : Session SQLAlchemy database session raw_data_package : models.RawDataPackage The raw data package being uploaded file_size : int Size of the file in bytes Returns ------- dict IVOA-compatible metadata document Notes ----- The metadata structure follows IVOA standards while maintaining flexibility: 1. Core identifiers follow IVOA URI conventions 2. Facility information is standardized but extensible 3. Instrument-specific metadata is stored in RawDataPackageMetadata 4. All numeric values are stored with units 5. Timestamps are in ISO 8601 format with timezone """ # Get observation information obs_unit = raw_data_package.obs_unit executed_obs_unit = ( obs_unit.executed_obs_units[0] if obs_unit.executed_obs_units else None ) # Get source information source = obs_unit.source if isinstance(source, models.FixedSource): ra_deg = source.ra_deg dec_deg = source.dec_deg target_name = source.name else: ra_deg = None dec_deg = None target_name = source.name # Get instrument information instrument = ( obs_unit.primary_instrument_module_configuration.instrument_module.instrument ) # Get observing program information program = obs_unit.observing_program subprogram = obs_unit.sub_observing_program # Get additional metadata additional_metadata = raw_data_package.package_metadata # Construct base metadata metadata = { # Core identifiers (IVOA compatible) "id": { "dataset_id": f"{instrument.name}-{executed_obs_unit.start_time.strftime('%Y%m%d')}-{obs_unit.id}", "obs_unit_id": str(obs_unit.id), "executed_obs_unit_id": ( str(executed_obs_unit.id) if executed_obs_unit else None ), "publisher_did": f"ivo://org.ccat-p/raw/{instrument.name}-{executed_obs_unit.start_time.strftime('%Y%m%d')}-{obs_unit.id}", "ivoa_collection": "ivo://org.ccat-p/collection/raw-observations", }, # Facility information (standardized) "facility": { "observatory": { "name": instrument.telescope.observatory.name, "altitude_m": instrument.telescope.alt_m, "longitude_deg": instrument.telescope.lon_deg, "latitude_deg": instrument.telescope.lat_deg, }, "telescope": { "name": instrument.telescope.name, "longitude_deg": instrument.telescope.lon_deg, "latitude_deg": instrument.telescope.lat_deg, "altitude_m": instrument.telescope.alt_m, }, "instrument": { "name": instrument.name, "type": instrument.instrument_type, "description": instrument.description, "modules": [ { "name": module.instrument_module.name, "pixels": module.instrument_module.pixels, } for module in obs_unit.instrument_module_configurations ], }, }, # Target information "target": { "name": target_name, "type": source.__class__.__name__.lower(), "coordinates": { "ra": { "degrees": ra_deg, "sexagesimal": ( _degrees_to_sexagesimal(ra_deg) if ra_deg is not None else None ), }, "dec": { "degrees": dec_deg, "sexagesimal": ( _degrees_to_sexagesimal(dec_deg) if dec_deg is not None else None ), }, "frame": "ICRS", "epoch": "J2000", }, }, # Observation configuration (IVOA compatible) "observation": { "program": { "id": program.short_name, "name": program.name, # "pi": program.pi_name, # "pi_affiliation": program.pi_affiliation, # "proposal_id": program.proposal_id, }, "subprogram": { "id": subprogram.short_name if subprogram else None, "name": subprogram.name if subprogram else None, }, "timing": { "start_time": ( executed_obs_unit.start_time.isoformat() if executed_obs_unit else None ), "end_time": ( executed_obs_unit.end_time.isoformat() if executed_obs_unit else None ), "duration_s": ( ( executed_obs_unit.end_time - executed_obs_unit.start_time ).total_seconds() if executed_obs_unit else None ), }, }, # Contents information "contents": { "file_count": len(raw_data_package.raw_data_files), "size_bytes": file_size, "files": [ { "filename": file.name, "type": file.file_type, "size_bytes": file.size, "checksum": {"algorithm": "xxh64", "value": file.checksum}, } for file in raw_data_package.raw_data_files ], }, # Data quality assessment "quality": { "status": executed_obs_unit.status if executed_obs_unit else None, }, # Processing information "processing": { "level": "raw", "pipeline": { "name": instrument.name, "version": "1.0", # Default version since it's not in the model }, }, # Data rights and access information # "access": { # "policy": program.data_policy, # }, # Full provenance chain # "provenance": { # "creator": instrument.name, # Use instrument name since data_acquisition_system doesn't exist # "created": raw_data_package.created_at.isoformat(), # "contributors": ( # [ # { # "name": executed_obs_unit.observer_name, # "role": "Observer", # "affiliation": executed_obs_unit.observer_affiliation, # } # ] # if executed_obs_unit # else [] # ), # }, } # Add additional metadata if available if additional_metadata: # Add instrument-specific metadata if additional_metadata.instrument_specific: metadata["instrument_specific"] = additional_metadata.instrument_specific # Add quality metrics if additional_metadata.quality_metrics: metadata["quality"].update(additional_metadata.quality_metrics) # Add extended provenance if additional_metadata.provenance: metadata["provenance"].update(additional_metadata.provenance) # Add custom metadata if additional_metadata.custom_metadata: metadata["custom"] = additional_metadata.custom_metadata # Remove None values metadata = _remove_none_values(metadata) return metadata def _degrees_to_sexagesimal(degrees: float) -> str: """Convert decimal degrees to sexagesimal format.""" if degrees is None: return None # Handle negative degrees for declination sign = "-" if degrees < 0 else "+" degrees = abs(degrees) # Convert to hours/degrees, minutes, seconds hours = int(degrees) minutes = int((degrees - hours) * 60) seconds = (degrees - hours - minutes / 60) * 3600 return f"{sign}{hours:02d}h{minutes:02d}m{seconds:06.3f}s" def _remove_none_values(d: dict) -> dict: """Recursively remove None values from a dictionary.""" if not isinstance(d, dict): return d return { k: _remove_none_values(v) for k, v in d.items() if v is not None and (not isinstance(v, dict) or _remove_none_values(v)) }