import datetime
import os
import subprocess
import hashlib
import base64
import json
from typing import Tuple, Optional
import time
import tempfile
import coscine
from sqlalchemy.orm import Session
from ccat_ops_db import models
from .database import DatabaseConnection
from .setup_celery_app import app, make_celery_task
from .utils import get_s3_client, get_redis_connection, get_s3_key_for_package
from .logging_utils import get_structured_logger
from .exceptions import ScheduleError
from .metrics import HousekeepingMetrics
from .queue_discovery import route_task_by_location
from .operation_types import OperationType
# Use only task loggers
logger = get_structured_logger(__name__)
redis_ = get_redis_connection()
# s3_client = boto3.client(
# "s3",
# endpoint_url=ccat_data_transfer_settings.s3_endpoint_url,
# aws_access_key_id=ccat_data_transfer_settings.s3_access_key_id,
# aws_secret_access_key=ccat_data_transfer_settings.s3_secret_access_key,
# region_name=ccat_data_transfer_settings.s3_region_name,
# config=boto3.session.Config(signature_version="s3v4"),
# )
[docs]
class LongTermArchiveTask(make_celery_task()):
"""Base class for long term archive tasks."""
[docs]
def __init__(self):
super().__init__()
self.operation_type = "long_term_archive"
[docs]
def get_retry_count(self, session, operation_id):
"""Get current retry count for this operation."""
long_term_archive_transfer = session.query(models.LongTermArchiveTransfer).get(
operation_id
)
if long_term_archive_transfer:
return long_term_archive_transfer.attempt_count
return 0
[docs]
def reset_state_on_failure(self, session, long_term_archive_transfer_id, exc):
"""Reset long term archive transfer state for retry."""
long_term_archive_transfer = session.query(models.LongTermArchiveTransfer).get(
long_term_archive_transfer_id
)
if long_term_archive_transfer:
long_term_archive_transfer.status = models.Status.PENDING
long_term_archive_transfer.raw_data_package.state = (
models.PackageState.TRANSFERRING
)
long_term_archive_transfer.failure_error_message = None
long_term_archive_transfer.attempt_count += 1
logger.info(
"Reset long term archive transfer for retry",
long_term_archive_transfer_id=long_term_archive_transfer_id,
attempt_count=long_term_archive_transfer.attempt_count,
)
_add_transfer_log(
session,
long_term_archive_transfer,
f"Transfer failed, scheduling retry: {str(exc)}",
)
session.commit()
redis_.publish(
"transfer:overview",
json.dumps(
{
"type": "long_term_archive_transfer_reset",
"data": long_term_archive_transfer_id,
}
),
)
[docs]
def mark_permanent_failure(self, session, long_term_archive_transfer_id, exc):
"""Mark long term archive transfer as permanently failed."""
long_term_archive_transfer = session.query(models.LongTermArchiveTransfer).get(
long_term_archive_transfer_id
)
if long_term_archive_transfer:
long_term_archive_transfer.status = models.Status.FAILED
long_term_archive_transfer.raw_data_package.state = (
models.PackageState.FAILED
)
long_term_archive_transfer.failure_error_message = str(exc)
logger.info(
"Marked long term archive transfer as permanently failed",
long_term_archive_transfer_id=long_term_archive_transfer_id,
)
_add_transfer_log(
session,
long_term_archive_transfer,
f"Transfer permanently failed after {long_term_archive_transfer.attempt_count} attempts: {str(exc)}",
)
session.commit()
redis_.publish(
"transfer:overview",
json.dumps(
{
"type": "long_term_archive_transfer_failed",
"data": long_term_archive_transfer_id,
}
),
)
[docs]
def get_operation_info(self, args, kwargs):
"""Get additional context for long term archive tasks."""
if not args or len(args) == 0:
return {}
with self.session_scope() as session:
try:
long_term_archive_transfer = session.query(
models.LongTermArchiveTransfer
).get(args[0])
if long_term_archive_transfer:
return {
"transfer_id": str(long_term_archive_transfer.id),
"site_name": (
long_term_archive_transfer.site.short_name
if long_term_archive_transfer.site
else None
),
"package_id": str(
long_term_archive_transfer.raw_data_package_id
),
"attempt_count": long_term_archive_transfer.attempt_count,
"status": (
long_term_archive_transfer.status.value
if long_term_archive_transfer.status
else None
),
}
except Exception as e:
logger.error(f"Error getting long term archive transfer info: {e}")
return {}
@app.task(
base=LongTermArchiveTask,
name="ccat:data_transfer:long_term_archive",
bind=True,
)
def send_data_to_long_term_archive(
self, long_term_archive_transfer_id: int, session: Session = None
) -> None:
"""
Transfers raw data package to the long term archive using dynamic queue routing.
Parameters
----------
self : celery.Task
The Celery task instance.
long_term_archive_transfer_id : int
The ID of the LongTermArchiveTransfer object in the database.
Returns
-------
None
Notes
-----
- Fetches the LongTermArchiveTransfer object from the database.
- Uses dynamic queue routing based on the destination location.
- Executes the transfer command to move the data.
- Updates the LongTermArchiveTransfer status and logs in the database.
"""
logger.info(
"send_data_to_long_term_archive",
long_term_archive_transfer_id=long_term_archive_transfer_id,
)
if session is None:
with self.session_scope() as session:
return _send_data_to_long_term_archive_internal(
session, long_term_archive_transfer_id
)
else:
return _send_data_to_long_term_archive_internal(
session, long_term_archive_transfer_id
)
def _send_data_to_long_term_archive_internal(
session: Session, long_term_archive_transfer_id: int
) -> None:
"""Send data to long term archive using the new DataLocation system."""
start_time = datetime.datetime.now()
long_term_archive_transfer = _get_long_term_archive_transfer(
session, long_term_archive_transfer_id
)
source_url, destination_url, destination_bucket = _construct_transfer_urls(
long_term_archive_transfer
)
_execute_transfer(
session,
source_url,
long_term_archive_transfer.raw_data_package_id,
destination_url,
destination_bucket,
long_term_archive_transfer,
)
end_time = datetime.datetime.now()
duration = end_time - start_time
_mark_transfer_successful(
session,
long_term_archive_transfer,
duration,
destination_url,
)
logger.info(
"long_term_archive_transfer_completed",
long_term_archive_transfer_id=long_term_archive_transfer_id,
duration=str(duration),
)
redis_.publish(
"transfer:overview",
json.dumps(
{
"type": "long_term_archive_transfer_completed",
"data": long_term_archive_transfer_id,
}
),
)
def _get_long_term_archive_transfer(
session: Session, long_term_archive_transfer_id: int
) -> models.LongTermArchiveTransfer:
"""Retrieve the LongTermArchiveTransfer object from the database."""
try:
long_term_archive_transfer = session.get(
models.LongTermArchiveTransfer, long_term_archive_transfer_id
)
if not long_term_archive_transfer:
raise ValueError(
f"Long term archive transfer {long_term_archive_transfer_id} not found"
)
logger.info(
"long_term_archive_transfer_found",
long_term_archive_transfer_id=long_term_archive_transfer_id,
)
return long_term_archive_transfer
except Exception as e:
logger.error(
"long_term_archive_transfer_not_found",
long_term_archive_transfer_id=long_term_archive_transfer_id,
error=str(e),
)
raise
def _validate_long_term_archive_path(
long_term_archive_transfer: models.LongTermArchiveTransfer,
) -> bool:
"""Validate the long term archive raw data package path."""
if (
not long_term_archive_transfer.data_archive.long_term_archive_raw_data_package_path
):
logger.error(
"missing_long_term_archive_path",
long_term_archive_transfer_id=long_term_archive_transfer.id,
archive_name=long_term_archive_transfer.data_archive.name,
)
return False
return True
def _construct_transfer_urls(
long_term_archive_transfer: models.LongTermArchiveTransfer,
) -> Tuple[str, str, str]:
"""Construct the source and destination URLs using the new DataLocation system."""
# Get the source and destination locations
source_location = long_term_archive_transfer.origin_data_location
destination_location = long_term_archive_transfer.destination_data_location
if not source_location or not destination_location:
raise ValueError(
"Source or destination location not set for long term archive transfer"
)
# Get the raw data package
raw_data_package = long_term_archive_transfer.raw_data_package
# Construct source path based on source location type
if isinstance(source_location, models.DiskDataLocation):
source_url = os.path.join(source_location.path, raw_data_package.relative_path)
elif isinstance(source_location, models.S3DataLocation):
# For source S3 locations, use the location's bucket name
source_url = (
f"s3://{source_location.bucket_name}/{raw_data_package.relative_path}"
)
else:
raise ValueError(
f"Unsupported source location type: {source_location.storage_type}"
)
# Construct destination path based on destination location type
if isinstance(destination_location, models.DiskDataLocation):
destination_url = os.path.join(
destination_location.path, raw_data_package.relative_path
)
destination_bucket = None
elif isinstance(destination_location, models.S3DataLocation):
# Use the shared function to construct the S3 key consistently
destination_url = get_s3_key_for_package(destination_location, raw_data_package)
destination_bucket = destination_location.bucket_name
else:
raise ValueError(
f"Unsupported destination location type: {destination_location.storage_type}"
)
logger.debug(
"transfer_paths_new",
source_path=source_url,
destination_path=destination_url,
source_location_type=source_location.storage_type.value,
destination_location_type=destination_location.storage_type.value,
)
return source_url, destination_url, destination_bucket
def _get_s3_metadata(
session: Session,
raw_data_package: models.RawDataPackage,
file_size: int,
) -> dict:
"""Generate comprehensive metadata for S3 upload from database models.
This function extracts metadata from the database models and constructs a dictionary
suitable for S3 object metadata. It follows a hierarchical structure for different
types of metadata (obs_, file_, archive_, etc.).
Database Relationships:
- RawDataPackage -> ObsUnit -> Source (FixedSource or other)
- RawDataPackage -> ObsUnit -> ExecutedObsUnit
- RawDataPackage -> ObsUnit -> PrimaryInstrumentModuleConfiguration -> InstrumentModule -> Instrument
- RawDataPackage -> ObsUnit -> ObservingProgram
- RawDataPackage -> ObsUnit -> SubObservingProgram
Parameters
----------
session : Session
SQLAlchemy database session
raw_data_package : models.RawDataPackage
The raw data package being uploaded
file_size : int
Size of the file in bytes
Returns
-------
dict
Dictionary of metadata key-value pairs for S3 upload
"""
# Get observation information
obs_unit = raw_data_package.obs_unit
executed_obs_unit = (
obs_unit.executed_obs_units[0] if obs_unit.executed_obs_units else None
)
# Get source information
source = obs_unit.source
if isinstance(source, models.FixedSource):
ra_deg = source.ra_deg
dec_deg = source.dec_deg
target_name = source.name
else:
ra_deg = None
dec_deg = None
target_name = source.name
# Get instrument information
instrument = (
obs_unit.primary_instrument_module_configuration.instrument_module.instrument
)
# Get observing program information
program = obs_unit.observing_program
subprogram = obs_unit.sub_observing_program
# Generate a timestamp for the dataset ID if executed_obs_unit is None
timestamp = (
executed_obs_unit.start_time.strftime("%Y%m%d")
if executed_obs_unit and executed_obs_unit.start_time
else datetime.datetime.now().strftime("%Y%m%d")
)
# Log debug information
logger.debug(
"metadata_generation",
raw_data_package_id=raw_data_package.id,
obs_unit_id=obs_unit.id,
executed_obs_unit_id=executed_obs_unit.id if executed_obs_unit else None,
source_type=source.__class__.__name__,
instrument_name=instrument.name,
program_id=program.short_name,
)
# Construct metadata
metadata = {
"obs_dataset_id": f"{instrument.name}-{timestamp}-{obs_unit.id}",
"obs_telescope": instrument.telescope.name,
"obs_instrument": instrument.name,
"obs_date_obs": (
executed_obs_unit.start_time.isoformat()
if executed_obs_unit and executed_obs_unit.start_time
else None
),
# Target information
"obs_target_name": target_name,
"obs_ra_deg": str(ra_deg) if ra_deg is not None else None,
"obs_dec_deg": str(dec_deg) if dec_deg is not None else None,
# Program tracking
"obs_program_id": program.short_name,
"obs_subprogram_id": subprogram.short_name if subprogram else None,
# Database linkage
"db_obs_unit_id": str(obs_unit.id),
"db_executed_obs_unit_id": (
str(executed_obs_unit.id) if executed_obs_unit else None
),
# Data quality and status
"file_xxhash64": raw_data_package.checksum,
}
# Remove None values
metadata = {k: v for k, v in metadata.items() if v is not None}
# Log final metadata keys
logger.debug(
"metadata_generated",
metadata_keys=list(metadata.keys()),
metadata_count=len(metadata),
)
return metadata
def _upload_extended_metadata(
s3_client,
destination_bucket: str,
destination_url: str,
metadata: dict,
) -> bool:
"""Upload extended metadata to S3 as a separate JSON file.
Parameters
----------
s3_client : boto3.client
S3 client instance
destination_bucket : str
S3 bucket name
destination_url : str
Base destination path in S3
metadata : dict
Extended metadata to upload
Returns
-------
bool
True if upload was successful, False otherwise
"""
try:
# save to tmp file and then read and upload
with tempfile.NamedTemporaryFile(delete=False, suffix=".json") as tmp_file:
tmp_file.write(json.dumps(metadata, indent=2).encode("utf-8"))
tmp_file_path = tmp_file.name
# Read the file content for upload
with open(tmp_file_path, "rb") as f:
file_content = f.read()
# Calculate checksum from the content
metadata_checksum = base64.b64encode(
hashlib.sha256(file_content).digest()
).decode("utf-8")
# Construct metadata file path
metadata_path = f"{os.path.splitext(destination_url)[0]}_metadata.json"
# Upload metadata file with content and checksum
s3_client.put_object(
Bucket=destination_bucket,
Key=metadata_path,
Body=file_content,
ContentType="application/json",
ChecksumSHA256=metadata_checksum,
)
# Clean up temporary file
os.unlink(tmp_file_path)
logger.info(
"extended_metadata_upload_successful",
metadata_path=f"s3://{destination_bucket}/{metadata_path}",
)
return True
except Exception as e:
logger.error(
"extended_metadata_upload_failed",
error=str(e),
metadata_path=f"s3://{destination_bucket}/{metadata_path}",
)
# Clean up temporary file on error
try:
if "tmp_file_path" in locals():
os.unlink(tmp_file_path)
except Exception:
pass
return False
def _get_raw_data_package_id_from_db(session: Session, filename: str) -> int:
"""Get raw data package ID from database using filename.
Parameters
----------
session : Session
SQLAlchemy database session
filename : str
The filename to look up
Returns
-------
int
The raw data package ID
Raises
------
ValueError
If the raw data package cannot be found
"""
try:
# Get just the filename without the path
basename = os.path.basename(filename)
# Log the lookup attempt
logger.debug(
"looking_up_raw_data_package", filename=basename, full_path=filename
)
# Try to find the raw data package by filename
raw_data_package = (
session.query(models.RawDataPackage)
.filter(models.RawDataPackage.name == basename)
.first()
)
if not raw_data_package:
# Try to find by relative path
raw_data_package = (
session.query(models.RawDataPackage)
.filter(models.RawDataPackage.relative_path.like(f"%{basename}"))
.first()
)
if not raw_data_package:
# Log the failed lookup
logger.error(
"raw_data_package_not_found", filename=basename, full_path=filename
)
raise ValueError(f"No raw data package found for filename: {basename}")
# Log successful lookup
logger.debug(
"raw_data_package_found", package_id=raw_data_package.id, filename=basename
)
return raw_data_package.id
except Exception as e:
logger.error(
"error_looking_up_raw_data_package",
error=str(e),
filename=os.path.basename(filename),
full_path=filename,
)
raise ValueError(f"Error looking up raw data package: {str(e)}")
def _execute_transfer(
session: Session,
source_url: str,
raw_data_package_id: int,
destination_url: str,
destination_bucket: str,
long_term_archive_transfer: models.LongTermArchiveTransfer,
) -> None:
"""Execute the transfer between different storage types using the new DataLocation system."""
try:
start_time = time.time()
# Get the source and destination locations
source_location = long_term_archive_transfer.origin_data_location
destination_location = long_term_archive_transfer.destination_data_location
# Determine transfer method based on location types
if isinstance(source_location, models.DiskDataLocation) and isinstance(
destination_location, models.DiskDataLocation
):
# Disk to disk transfer
_execute_disk_to_disk_transfer(source_url, destination_url)
elif isinstance(source_location, models.DiskDataLocation) and isinstance(
destination_location, models.S3DataLocation
):
logger.debug(
" ".join(
[
source_url,
str(raw_data_package_id),
destination_url,
destination_bucket,
]
)
)
# Disk to S3 transfer
_execute_disk_to_s3_transfer(
session,
source_url,
raw_data_package_id,
destination_url,
destination_bucket,
destination_location,
source_location.site.short_name if source_location.site else None,
)
elif isinstance(source_location, models.S3DataLocation) and isinstance(
destination_location, models.DiskDataLocation
):
# S3 to disk transfer
_execute_s3_to_disk_transfer(source_url, destination_url)
elif isinstance(source_location, models.S3DataLocation) and isinstance(
destination_location, models.S3DataLocation
):
# S3 to S3 transfer
# _execute_s3_to_s3_transfer(source_url, destination_url, destination_bucket)
raise ValueError("S3 to S3 transfer is not supported yet")
else:
raise ValueError(
f"Unsupported transfer combination: {source_location.storage_type} to {destination_location.storage_type}"
)
# Calculate transfer metrics
end_time = time.time()
duration = end_time - start_time
# Get file size for metrics
file_size = 0
if isinstance(source_location, models.DiskDataLocation) and os.path.exists(
source_url
):
file_size = os.path.getsize(source_url)
# Send metrics to InfluxDB
if file_size > 0:
transfer_rate = (file_size / duration) / (1024 * 1024) # MB/s
metrics = HousekeepingMetrics()
try:
metrics.send_transfer_metrics(
operation="long_term_archive_transfer",
source_path=source_url,
destination_path=destination_url,
file_size=file_size,
duration=duration,
success=True,
error_message=None,
additional_fields={
"transfer_rate_mbps": transfer_rate,
},
additional_tags={
"source_location": source_location.name,
"destination_location": destination_location.name,
"transfer_id": str(long_term_archive_transfer.id),
"transfer_method": f"{source_location.storage_type.value}_to_{destination_location.storage_type.value}",
},
)
except Exception as e:
logger.error(
"metrics_send_failed",
error=e,
transfer_id=long_term_archive_transfer.id,
)
finally:
metrics.close()
except Exception as e:
logger.error(
"transfer_execution_failed",
error=str(e),
source_url=source_url,
destination_url=destination_url,
)
raise
def _execute_disk_to_disk_transfer(source_url: str, destination_url: str) -> None:
"""Execute disk to disk transfer using cp command."""
try:
# Create destination directory if it doesn't exist
dest_dir = os.path.dirname(destination_url)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir, exist_ok=True)
# Execute copy command
cp_command = ["cp", source_url, destination_url]
logger.info("executing_disk_to_disk_transfer", command=" ".join(cp_command))
result = subprocess.run(
cp_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode == 0:
logger.info(
"disk_to_disk_transfer_successful",
source=source_url,
destination=destination_url,
)
else:
logger.error(
"disk_to_disk_transfer_failed",
source=source_url,
destination=destination_url,
error=result.stderr,
)
except Exception as e:
logger.error(
"disk_to_disk_transfer_exception",
error=str(e),
source=source_url,
destination=destination_url,
)
raise
def _execute_disk_to_s3_transfer(
session: Session,
source_url: str,
raw_data_package_id: int,
destination_url: str,
destination_bucket: str,
destination_location: Optional[models.S3DataLocation] = None,
site_name: Optional[str] = None,
) -> None:
"""Execute disk to S3 transfer using the existing S3 upload logic."""
from .config.config import ccat_data_transfer_settings
if ccat_data_transfer_settings.s3_method == "boto3":
_execute_s3_upload(
session,
source_url,
raw_data_package_id,
destination_url,
destination_bucket,
None, # No source_archive in new system
destination_location,
site_name,
)
elif ccat_data_transfer_settings.s3_method == "coscine":
_execute_coscine_s3_upload(
session,
source_url,
raw_data_package_id,
destination_url,
destination_bucket,
destination_location,
site_name,
)
def _execute_s3_to_disk_transfer(source_url: str, destination_url: str) -> None:
"""Execute S3 to disk transfer using aws s3 cp command."""
try:
# Create destination directory if it doesn't exist
dest_dir = os.path.dirname(destination_url)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir, exist_ok=True)
# Execute S3 download command
s3_command = ["aws", "s3", "cp", source_url, destination_url]
logger.info("executing_s3_to_disk_transfer", command=" ".join(s3_command))
result = subprocess.run(
s3_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode == 0:
logger.info(
"s3_to_disk_transfer_successful",
source=source_url,
destination=destination_url,
)
else:
logger.error(
"s3_to_disk_transfer_failed",
source=source_url,
destination=destination_url,
error=result.stderr,
)
except Exception as e:
logger.error(
"s3_to_disk_transfer_exception",
error=str(e),
source=source_url,
destination=destination_url,
)
raise
# def _execute_s3_to_s3_transfer(
# source_url: str, destination_url: str, destination_bucket: str
# ) -> None:
# """Execute S3 to S3 transfer using aws s3 cp command."""
# try:
# # Execute S3 copy command
# s3_command = ["aws", "s3", "cp", source_url, destination_url]
# logger.info("executing_s3_to_s3_transfer", command=" ".join(s3_command))
# result = subprocess.run(
# s3_command,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# text=True,
# )
# if result.returncode == 0:
# logger.info(
# "s3_to_s3_transfer_successful",
# source=source_url,
# destination=destination_url,
# )
# else:
# logger.error(
# "s3_to_s3_transfer_failed",
# source=source_url,
# destination=destination_url,
# error=result.stderr,
# )
# except Exception as e:
# logger.error(
# "s3_to_s3_transfer_exception",
# error=str(e),
# source=source_url,
# destination=destination_url,
# )
# raise
def _execute_s3_upload(
session: Session,
source_url: str,
raw_data_package_id: int,
destination_url: str,
destination_bucket: str,
source_archive: models.DataArchive,
destination_location: Optional[models.S3DataLocation] = None,
site_name: Optional[str] = None,
) -> None:
"""Execute the command to upload files to S3 with MD5 verification and comprehensive metadata.
This function uploads a file to S3 with MD5 verification to ensure data integrity during transfer.
It also includes comprehensive metadata from the database to ensure proper data discovery and
provenance tracking. Additionally, it generates and uploads an extended metadata file in JSON format.
Parameters
----------
source_url : str
Path to the source file to upload
destination_url : str
Destination path in S3
destination_bucket : str
S3 bucket name
destination_location : Optional[models.S3DataLocation]
Destination S3 location for specific configuration
site_name : Optional[str]
Name of the site for credential loading
Raises
------
RuntimeError
If the S3 upload operation fails
"""
start_time = time.time()
# Read file data and calculate MD5
with open(source_url, "rb") as f:
file_data = f.read()
sha256_hash = hashlib.sha256(file_data).digest()
sha256_b64 = base64.b64encode(sha256_hash).decode("utf-8")
# Get file size
file_size = os.path.getsize(source_url)
# Get S3 client with location-specific configuration
s3_client = get_s3_client(destination_location, site_name)
# Query the database for package information
raw_data_package = session.query(models.RawDataPackage).get(raw_data_package_id)
if not raw_data_package:
raise ValueError(f"Raw data package {raw_data_package_id} not found")
# Get metadata from database
metadata = _get_s3_metadata(session, raw_data_package, file_size)
# Upload with MD5 verification and metadata
s3_client.put_object(
Bucket=destination_bucket,
Key=destination_url,
Body=file_data,
ChecksumSHA256=sha256_b64,
Metadata=metadata,
)
# Calculate transfer metrics
end_time = time.time()
# Generate and upload extended metadata
extended_metadata = _generate_ivoa_metadata(session, raw_data_package, file_size)
metadata_upload_success = _upload_extended_metadata(
s3_client, destination_bucket, destination_url, extended_metadata
)
if not metadata_upload_success:
logger.warning(
"extended_metadata_upload_failed_but_data_uploaded",
source=source_url,
destination=f"s3://{destination_bucket}/{destination_url}",
)
logger.info(
"s3_upload_successful",
source=source_url,
destination=f"s3://{destination_bucket}/{destination_url}",
md5_verified=True,
metadata_keys=list(metadata.keys()),
extended_metadata_uploaded=metadata_upload_success,
)
duration = end_time - start_time
transfer_rate = (file_size / duration) / (1024 * 1024) # MB/s
transfer_metrics = {
"bytes_transferred": file_size,
"duration": duration,
"transfer_rate_mbps": transfer_rate,
}
# Send metrics to InfluxDB
metrics = HousekeepingMetrics()
try:
metrics.send_transfer_metrics(
operation="s3_upload",
source_path=source_url,
destination_path=f"s3://{destination_bucket}/{destination_url}",
file_size=transfer_metrics["bytes_transferred"],
duration=transfer_metrics["duration"],
success=True,
error_message=None,
additional_fields={
"transfer_rate_mbps": transfer_metrics["transfer_rate_mbps"],
},
additional_tags={
"source_archive": (
source_archive.short_name if source_archive else "unknown"
),
"destination_archive": "s3",
"transfer_id": str(raw_data_package_id),
"transfer_method": "s3",
},
)
except Exception as e:
logger.error("metrics_send_failed", error=e, transfer_id=raw_data_package_id)
finally:
metrics.close()
def _execute_coscine_s3_upload(
session: Session,
source_url: str,
raw_data_package_id: int,
destination_url: str,
destination_bucket: str,
source_archive: models.DataArchive,
destination_location: Optional[models.S3DataLocation] = None,
site_name: Optional[str] = None,
) -> None:
"""Execute the command to upload files to S3 using Coscine with MD5 verification and comprehensive metadata.
This function uploads a file to S3 using the Coscine API with MD5 verification to ensure data integrity during transfer.
It also includes comprehensive metadata from the database to ensure proper data discovery and
provenance tracking. Additionally, it generates and uploads an extended metadata file in JSON format.
Parameters
----------
source_url : str
Path to the source file to upload
destination_url : str
Destination path in S3
destination_bucket : str
S3 bucket name
destination_location : Optional[models.S3DataLocation]
Destination S3 location for specific configuration
site_name : Optional[str]
Name of the site for credential loading
Raises
------
RuntimeError
If the S3 upload operation fails
"""
from .config.config import ccat_data_transfer_settings
start_time = time.time()
# Check if COSCINE is configured
if (
ccat_data_transfer_settings.coscine_api_token == "none"
or ccat_data_transfer_settings.coscine_project == "none"
or ccat_data_transfer_settings.coscine_resource == "none"
):
raise ValueError(
"COSCINE configuration is not set. Please configure COSCINE_API_TOKEN, COSCINE_PROJECT, and COSCINE_RESOURCE in settings.toml"
)
# Read file data
with open(source_url, "rb") as f:
file_data = f.read()
# Note: Coscine handles integrity verification internally
# Get file size
file_size = os.path.getsize(source_url)
# Query the database for package information
raw_data_package = session.query(models.RawDataPackage).get(raw_data_package_id)
if not raw_data_package:
raise ValueError(f"Raw data package {raw_data_package_id} not found")
# Get metadata from database
metadata = _get_s3_metadata(session, raw_data_package, file_size)
# Initialize Coscine client
client = coscine.ApiClient(ccat_data_transfer_settings.coscine_api_token)
project = client.project(ccat_data_transfer_settings.coscine_project)
resource = project.resource(ccat_data_transfer_settings.coscine_resource)
# Get metadata form and populate it
coscine_metadata = resource.metadata_form()
# Map our metadata to Coscine metadata format
# Basic metadata mapping
if "obs_target_name" in metadata:
coscine_metadata["Title"] = f"CCAT Observation: {metadata['obs_target_name']}"
else:
coscine_metadata["Title"] = (
f"CCAT Data Package: {raw_data_package.relative_path}"
)
if "obs_creator" in metadata:
coscine_metadata["Creator"] = metadata["obs_creator"]
else:
coscine_metadata["Creator"] = "CCAT Observatory"
# Add additional metadata fields
if "obs_start_time" in metadata:
coscine_metadata["Date"] = metadata["obs_start_time"]
if "obs_instrument_name" in metadata:
coscine_metadata["Subject"] = f"Instrument: {metadata['obs_instrument_name']}"
# Upload file with metadata
try:
# Create a file-like object from the file data
import io
file_obj = io.BytesIO(file_data)
destination_url = destination_url.replace(":", "_")
# Upload the file
resource.upload(destination_url, file_obj, coscine_metadata)
# Calculate transfer metrics
end_time = time.time()
duration = end_time - start_time
transfer_rate = (file_size / duration) / (1024 * 1024) # MB/s
# Generate and upload extended metadata
extended_metadata = _generate_ivoa_metadata(
session, raw_data_package, file_size
)
# For Coscine, we'll upload the extended metadata as a separate file
metadata_upload_success = _upload_coscine_extended_metadata(
client, project, resource, destination_url, extended_metadata
)
if not metadata_upload_success:
logger.warning(
"extended_metadata_upload_failed_but_data_uploaded",
source=source_url,
destination=f"coscine://{ccat_data_transfer_settings.coscine_project}/{ccat_data_transfer_settings.coscine_resource}/{destination_url}",
)
logger.info(
"coscine_s3_upload_successful",
source=source_url,
destination=f"coscine://{ccat_data_transfer_settings.coscine_project}/{ccat_data_transfer_settings.coscine_resource}/{destination_url}",
md5_verified=True,
metadata_keys=list(metadata.keys()),
extended_metadata_uploaded=metadata_upload_success,
)
transfer_metrics = {
"bytes_transferred": file_size,
"duration": duration,
"transfer_rate_mbps": transfer_rate,
}
# Send metrics to InfluxDB
metrics = HousekeepingMetrics()
try:
metrics.send_transfer_metrics(
operation="coscine_s3_upload",
source_path=source_url,
destination_path=f"coscine://{ccat_data_transfer_settings.coscine_project}/{ccat_data_transfer_settings.coscine_resource}/{destination_url}",
file_size=transfer_metrics["bytes_transferred"],
duration=transfer_metrics["duration"],
success=True,
error_message=None,
additional_fields={
"transfer_rate_mbps": transfer_metrics["transfer_rate_mbps"],
},
additional_tags={
"source_archive": (
source_archive.short_name if source_archive else "unknown"
),
"destination_archive": "coscine",
"transfer_id": str(raw_data_package_id),
"transfer_method": "coscine",
},
)
except Exception as e:
logger.error(
"metrics_send_failed", error=e, transfer_id=raw_data_package_id
)
finally:
metrics.close()
except Exception as e:
logger.error(
"coscine_s3_upload_failed",
source=source_url,
destination=f"coscine://{ccat_data_transfer_settings.coscine_project}/{ccat_data_transfer_settings.coscine_resource}/{destination_url}",
error=str(e),
)
raise RuntimeError(f"COSCINE S3 upload failed: {str(e)}")
def _upload_coscine_extended_metadata(
client,
project,
resource,
destination_url: str,
metadata: dict,
) -> bool:
"""Upload extended metadata to Coscine as a separate JSON file.
Parameters
----------
client : coscine.ApiClient
Coscine API client instance
project : coscine.Project
Coscine project instance
resource : coscine.Resource
Coscine resource instance
destination_url : str
Base destination path
metadata : dict
Extended metadata to upload
Returns
-------
bool
True if upload was successful, False otherwise
"""
try:
# Create metadata filename
metadata_filename = f"{destination_url}_metadata.json"
# Convert metadata to JSON
metadata_json = json.dumps(metadata, indent=2)
# Create a file-like object from the JSON data
import io
metadata_file_obj = io.BytesIO(metadata_json.encode("utf-8"))
metadata_form = resource.metadata_form()
metadata_form["Title"] = "Extended Metadata"
metadata_form["Creator"] = "CCAT Observatory"
# Upload the metadata file
resource.upload(
metadata_filename,
metadata_file_obj,
metadata_form,
# {"Title": "Extended Metadata", "Creator": "CCAT Observatory"},
)
return True
except Exception as e:
logger.error(
"coscine_metadata_upload_failed",
error=str(e),
metadata_filename=metadata_filename,
)
return False
def _mark_transfer_successful(
session: Session,
long_term_archive_transfer: models.LongTermArchiveTransfer,
duration: datetime.timedelta,
destination_url: str,
) -> None:
"""Mark the transfer as successful and log the result using the new DataLocation system."""
logger.info(
"long_term_archive_transfer_completed",
long_term_archive_transfer_id=long_term_archive_transfer.id,
package_path=long_term_archive_transfer.raw_data_package.relative_path,
duration=str(duration),
)
# Create physical copy record for the destination LTA location
destination_location = long_term_archive_transfer.destination_data_location
physical_copy = models.RawDataPackagePhysicalCopy(
raw_data_package=long_term_archive_transfer.raw_data_package,
data_location=destination_location,
checksum=long_term_archive_transfer.raw_data_package.checksum,
verified_at=datetime.datetime.now(),
)
session.add(physical_copy)
long_term_archive_transfer.status = models.Status.COMPLETED
long_term_archive_transfer.raw_data_package.state = models.PackageState.ARCHIVED
long_term_archive_transfer.end_time = datetime.datetime.now()
session.commit()
redis_.publish(
"transfer:overview",
json.dumps(
{
"type": "long_term_archive_transfer_completed",
"data": long_term_archive_transfer.id,
}
),
)
_add_transfer_log(
session,
long_term_archive_transfer,
f"Transfer successful to long term archive in {duration}",
)
def _add_transfer_log(
session: Session,
long_term_archive_transfer: models.LongTermArchiveTransfer,
log_message: str,
) -> None:
"""Add a log entry for the long term archive transfer."""
log_entry = models.LongTermArchiveTransferLog(
long_term_archive_transfer_id=long_term_archive_transfer.id,
log=f"{datetime.datetime.now()} - {log_message}",
timestamp=datetime.datetime.now(),
)
session.add(log_entry)
session.commit()
[docs]
def transfer_raw_data_packages_to_long_term_archive(
verbose: bool = False, site_name: Optional[str] = None
) -> None:
"""
Schedule long term archive transfer tasks for pending raw data packages using the new DataLocation system.
Args:
verbose (bool): If True, sets logging to DEBUG level. Defaults to False.
site_name (Optional[str]): If provided, only schedules transfers for the specified site.
Raises:
SQLAlchemyError: If there's an issue with database operations.
"""
db = DatabaseConnection()
session, _ = db.get_connection()
_set_logging_level(verbose)
# Get all sites with LTA locations
lta_sites = _get_sites_with_lta_locations(session, site_name)
logger.debug(
"lta_sites",
sites=[site.short_name for site in lta_sites],
count=len(lta_sites),
)
for site in lta_sites:
# Get all LTA locations for this site
lta_locations = _get_lta_locations_for_site(session, site)
for lta_location in lta_locations:
# Find pending existing transfers for this LTA location
pending_existing_transfers = (
_get_pending_existing_transfers_to_lta_location(session, lta_location)
)
if len(pending_existing_transfers) > 0:
for transfer in pending_existing_transfers:
_schedule_transfer_task(session, transfer)
# Find new transfers that need to be created for this LTA location
pending_new_transfers = _get_pending_new_transfers_to_lta_location(
session, lta_location
)
if len(pending_new_transfers) > 0:
logger.info(
"pending_transfers",
site_name=site.short_name,
lta_location_name=lta_location.name,
pending_transfers=len(pending_new_transfers),
)
for package in pending_new_transfers:
long_term_archive_transfer = _create_long_term_archive_transfer(
session, package, lta_location
)
_schedule_transfer_task(session, long_term_archive_transfer)
session.commit()
redis_.publish(
"transfer:overview",
json.dumps(
{
"type": "long_term_archive_transfer_scheduled",
}
),
)
def _get_pending_existing_transfers_to_lta_location(
session: Session, lta_location: models.DataLocation
) -> list:
return (
session.query(models.LongTermArchiveTransfer)
.filter(
models.LongTermArchiveTransfer.destination_data_location_id
== lta_location.id,
models.LongTermArchiveTransfer.status == models.Status.PENDING,
)
.all()
)
def _get_failed_transfers(session: Session, lta_location: models.DataLocation) -> list:
"""
Retrieve failed long term archive transfers for a specific LTA location.
Args:
session (Session): The database session.
lta_location (models.DataLocation): The LTA location to check for failed transfers.
Returns:
list: A list of LongTermArchiveTransfer objects with failed status.
"""
return (
session.query(models.LongTermArchiveTransfer)
.filter(
models.LongTermArchiveTransfer.destination_data_location_id
== lta_location.id,
models.LongTermArchiveTransfer.status == models.Status.FAILED,
models.LongTermArchiveTransfer.attempt_count < 3,
)
.all()
)
def _set_logging_level(verbose: bool) -> None:
if verbose:
logger.setLevel("DEBUG")
def _get_sites_with_lta_locations(
session: Session, site_name: Optional[str] = None
) -> list:
"""Get all sites that have LTA locations."""
query = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
)
if site_name:
query = query.filter(models.Site.short_name == site_name)
return query.all()
def _get_lta_locations_for_site(session: Session, site: models.Site) -> list:
"""Get all LTA locations for a specific site."""
return (
session.query(models.DataLocation)
.filter(
models.DataLocation.site_id == site.id,
models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
models.DataLocation.active == True, # noqa: E712
)
.all()
)
def _get_pending_new_transfers_to_lta_location(
session: Session, lta_location: models.DataLocation
) -> list:
"""Get raw data packages that need to be transferred to a specific LTA location."""
# Find packages that are in buffer locations and haven't been transferred to this LTA location yet
# Debug: Log the LTA location details
logger.debug(
"checking_lta_location",
lta_location_id=lta_location.id,
lta_location_name=lta_location.name,
lta_location_site_id=lta_location.site_id,
lta_location_site_name=lta_location.site.short_name,
)
# Debug: Check what buffer locations exist for this site
buffer_locations = (
session.query(models.DataLocation.id, models.DataLocation.name)
.filter(
models.DataLocation.location_type == models.LocationType.BUFFER,
models.DataLocation.active == True, # noqa: E712
models.DataLocation.site_id == lta_location.site_id,
)
.all()
)
logger.debug(
"buffer_locations_for_site",
site_id=lta_location.site_id,
site_name=lta_location.site.short_name,
buffer_locations=[{"id": loc.id, "name": loc.name} for loc in buffer_locations],
)
# Debug: Check existing LTA transfers for this destination
existing_transfers = (
session.query(models.LongTermArchiveTransfer.raw_data_package_id)
.filter(
models.LongTermArchiveTransfer.destination_data_location_id
== lta_location.id,
)
.all()
)
existing_package_ids = [t.raw_data_package_id for t in existing_transfers]
logger.debug(
"existing_lta_transfers",
destination_location_id=lta_location.id,
destination_location_name=lta_location.name,
existing_package_count=len(existing_package_ids),
existing_package_ids=existing_package_ids,
)
pending_packages = (
session.query(models.RawDataPackage)
.join(models.RawDataPackagePhysicalCopy)
.filter(
models.RawDataPackagePhysicalCopy.data_location_id.in_(
[loc.id for loc in buffer_locations]
),
models.RawDataPackagePhysicalCopy.status
== models.PhysicalCopyStatus.PRESENT,
~models.RawDataPackage.id.in_(existing_package_ids),
)
.distinct()
.all()
)
logger.info(
"pending_packages",
lta_location_name=lta_location.name,
site_name=lta_location.site.short_name,
pending_packages=len(pending_packages),
)
# Debug: Log details of found packages
if pending_packages:
package_details = []
for package in pending_packages:
physical_copies = [
{
"location_id": pc.data_location_id,
"location_name": pc.data_location.name,
"status": pc.status.value,
}
for pc in package.physical_copies
]
package_details.append(
{
"id": package.id,
"name": package.name,
"physical_copies": physical_copies,
}
)
logger.debug(
"pending_package_details",
packages=package_details,
)
return pending_packages
def _create_long_term_archive_transfer(
session: Session, package: models.RawDataPackage, lta_location: models.DataLocation
) -> models.LongTermArchiveTransfer:
"""Create a new long term archive transfer for a package to a specific LTA location."""
# Find the source buffer location for this package
source_physical_copy = (
session.query(models.RawDataPackagePhysicalCopy)
.join(models.DataLocation)
.filter(
models.RawDataPackagePhysicalCopy.raw_data_package_id == package.id,
models.DataLocation.location_type == models.LocationType.BUFFER,
models.DataLocation.site_id == lta_location.site_id,
models.DataLocation.active == True, # noqa: E712
models.RawDataPackagePhysicalCopy.status
== models.PhysicalCopyStatus.PRESENT,
)
.first()
)
if not source_physical_copy:
raise ValueError(f"No source buffer found for package {package.id}")
long_term_archive_transfer = models.LongTermArchiveTransfer(
raw_data_package_id=package.id,
site_id=lta_location.site_id,
origin_data_location_id=source_physical_copy.data_location_id,
destination_data_location_id=lta_location.id,
status=models.Status.PENDING,
)
session.add(long_term_archive_transfer)
session.flush()
session.commit()
redis_.publish(
"transfer:overview",
json.dumps(
{
"type": "long_term_archive_transfer_created",
"data": long_term_archive_transfer.id,
}
),
)
return long_term_archive_transfer
def _max_retries_reached(
long_term_archive_transfer: models.LongTermArchiveTransfer,
) -> bool:
return long_term_archive_transfer.attempt_count >= 3
def _schedule_transfer_task(
session: Session,
long_term_archive_transfer: models.LongTermArchiveTransfer,
) -> None:
"""Schedule a long term archive transfer task using dynamic queue routing."""
try:
# Use dynamic queue routing based on the destination location
queue_name = route_task_by_location(
OperationType.LONG_TERM_ARCHIVE_TRANSFER,
long_term_archive_transfer.destination_data_location,
)
logger.info(
"scheduling_long_term_archive_transfer",
long_term_archive_transfer_id=long_term_archive_transfer.id,
destination_location=long_term_archive_transfer.destination_data_location.name,
queue=queue_name,
)
# Schedule the task with the appropriate queue
send_data_to_long_term_archive.apply_async(
args=[long_term_archive_transfer.id],
queue=queue_name,
)
long_term_archive_transfer.status = models.Status.SCHEDULED
long_term_archive_transfer.start_time = datetime.datetime.now()
except Exception as e:
logger.error(
"scheduling_failed",
long_term_archive_transfer_id=long_term_archive_transfer.id,
error=str(e),
)
long_term_archive_transfer_log = models.LongTermArchiveTransferLog(
long_term_archive_transfer_id=long_term_archive_transfer.id,
log=f"Error scheduling long term archive transfer task: {str(e)}",
long_term_archive_transfer=long_term_archive_transfer,
timestamp=datetime.datetime.now(),
)
session.add(long_term_archive_transfer_log)
raise ScheduleError(
f"Error scheduling long term archive transfer task: {str(e)}"
)
finally:
session.commit()
logger.info(
"long_term_archive_transfer_scheduled",
long_term_archive_transfer_id=long_term_archive_transfer.id,
queue=queue_name,
)
redis_.publish(
"transfer:overview",
json.dumps(
{
"type": "long_term_archive_transfer_scheduled",
"data": long_term_archive_transfer.id,
"queue": queue_name,
}
),
)
def _generate_ivoa_metadata(
session: Session,
raw_data_package: models.RawDataPackage,
file_size: int,
) -> dict:
"""Generate IVOA-compatible extended metadata for a raw data package.
This function creates a comprehensive metadata document that follows IVOA standards
while maintaining flexibility for different instruments and observatories. It combines
data from the core database models with additional metadata stored in the
RawDataPackageMetadata table.
Parameters
----------
session : Session
SQLAlchemy database session
raw_data_package : models.RawDataPackage
The raw data package being uploaded
file_size : int
Size of the file in bytes
Returns
-------
dict
IVOA-compatible metadata document
Notes
-----
The metadata structure follows IVOA standards while maintaining flexibility:
1. Core identifiers follow IVOA URI conventions
2. Facility information is standardized but extensible
3. Instrument-specific metadata is stored in RawDataPackageMetadata
4. All numeric values are stored with units
5. Timestamps are in ISO 8601 format with timezone
"""
# Get observation information
obs_unit = raw_data_package.obs_unit
executed_obs_unit = (
obs_unit.executed_obs_units[0] if obs_unit.executed_obs_units else None
)
# Get source information
source = obs_unit.source
if isinstance(source, models.FixedSource):
ra_deg = source.ra_deg
dec_deg = source.dec_deg
target_name = source.name
else:
ra_deg = None
dec_deg = None
target_name = source.name
# Get instrument information
instrument = (
obs_unit.primary_instrument_module_configuration.instrument_module.instrument
)
# Get observing program information
program = obs_unit.observing_program
subprogram = obs_unit.sub_observing_program
# Get additional metadata
additional_metadata = raw_data_package.package_metadata
# Construct base metadata
metadata = {
# Core identifiers (IVOA compatible)
"id": {
"dataset_id": f"{instrument.name}-{executed_obs_unit.start_time.strftime('%Y%m%d')}-{obs_unit.id}",
"obs_unit_id": str(obs_unit.id),
"executed_obs_unit_id": (
str(executed_obs_unit.id) if executed_obs_unit else None
),
"publisher_did": f"ivo://org.ccat-p/raw/{instrument.name}-{executed_obs_unit.start_time.strftime('%Y%m%d')}-{obs_unit.id}",
"ivoa_collection": "ivo://org.ccat-p/collection/raw-observations",
},
# Facility information (standardized)
"facility": {
"observatory": {
"name": instrument.telescope.observatory.name,
"altitude_m": instrument.telescope.alt_m,
"longitude_deg": instrument.telescope.lon_deg,
"latitude_deg": instrument.telescope.lat_deg,
},
"telescope": {
"name": instrument.telescope.name,
"longitude_deg": instrument.telescope.lon_deg,
"latitude_deg": instrument.telescope.lat_deg,
"altitude_m": instrument.telescope.alt_m,
},
"instrument": {
"name": instrument.name,
"type": instrument.instrument_type,
"description": instrument.description,
"modules": [
{
"name": module.instrument_module.name,
"pixels": module.instrument_module.pixels,
}
for module in obs_unit.instrument_module_configurations
],
},
},
# Target information
"target": {
"name": target_name,
"type": source.__class__.__name__.lower(),
"coordinates": {
"ra": {
"degrees": ra_deg,
"sexagesimal": (
_degrees_to_sexagesimal(ra_deg) if ra_deg is not None else None
),
},
"dec": {
"degrees": dec_deg,
"sexagesimal": (
_degrees_to_sexagesimal(dec_deg)
if dec_deg is not None
else None
),
},
"frame": "ICRS",
"epoch": "J2000",
},
},
# Observation configuration (IVOA compatible)
"observation": {
"program": {
"id": program.short_name,
"name": program.name,
# "pi": program.pi_name,
# "pi_affiliation": program.pi_affiliation,
# "proposal_id": program.proposal_id,
},
"subprogram": {
"id": subprogram.short_name if subprogram else None,
"name": subprogram.name if subprogram else None,
},
"timing": {
"start_time": (
executed_obs_unit.start_time.isoformat()
if executed_obs_unit
else None
),
"end_time": (
executed_obs_unit.end_time.isoformat()
if executed_obs_unit
else None
),
"duration_s": (
(
executed_obs_unit.end_time - executed_obs_unit.start_time
).total_seconds()
if executed_obs_unit
else None
),
},
},
# Contents information
"contents": {
"file_count": len(raw_data_package.raw_data_files),
"size_bytes": file_size,
"files": [
{
"filename": file.name,
"type": file.file_type,
"size_bytes": file.size,
"checksum": {"algorithm": "xxh64", "value": file.checksum},
}
for file in raw_data_package.raw_data_files
],
},
# Data quality assessment
"quality": {
"status": executed_obs_unit.status if executed_obs_unit else None,
},
# Processing information
"processing": {
"level": "raw",
"pipeline": {
"name": instrument.name,
"version": "1.0", # Default version since it's not in the model
},
},
# Data rights and access information
# "access": {
# "policy": program.data_policy,
# },
# Full provenance chain
# "provenance": {
# "creator": instrument.name, # Use instrument name since data_acquisition_system doesn't exist
# "created": raw_data_package.created_at.isoformat(),
# "contributors": (
# [
# {
# "name": executed_obs_unit.observer_name,
# "role": "Observer",
# "affiliation": executed_obs_unit.observer_affiliation,
# }
# ]
# if executed_obs_unit
# else []
# ),
# },
}
# Add additional metadata if available
if additional_metadata:
# Add instrument-specific metadata
if additional_metadata.instrument_specific:
metadata["instrument_specific"] = additional_metadata.instrument_specific
# Add quality metrics
if additional_metadata.quality_metrics:
metadata["quality"].update(additional_metadata.quality_metrics)
# Add extended provenance
if additional_metadata.provenance:
metadata["provenance"].update(additional_metadata.provenance)
# Add custom metadata
if additional_metadata.custom_metadata:
metadata["custom"] = additional_metadata.custom_metadata
# Remove None values
metadata = _remove_none_values(metadata)
return metadata
def _degrees_to_sexagesimal(degrees: float) -> str:
"""Convert decimal degrees to sexagesimal format."""
if degrees is None:
return None
# Handle negative degrees for declination
sign = "-" if degrees < 0 else "+"
degrees = abs(degrees)
# Convert to hours/degrees, minutes, seconds
hours = int(degrees)
minutes = int((degrees - hours) * 60)
seconds = (degrees - hours - minutes / 60) * 3600
return f"{sign}{hours:02d}h{minutes:02d}m{seconds:06.3f}s"
def _remove_none_values(d: dict) -> dict:
"""Recursively remove None values from a dictionary."""
if not isinstance(d, dict):
return d
return {
k: _remove_none_values(v)
for k, v in d.items()
if v is not None and (not isinstance(v, dict) or _remove_none_values(v))
}