import os
import astropy.units as u
from astropy.coordinates import SkyCoord
from datetime import datetime, timezone
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Integer,
String,
Table,
Text,
UniqueConstraint,
Enum as SQLAlchemyEnum,
case,
BigInteger,
)
from sqlalchemy.ext.hybrid import hybrid_property
from enum import Enum
from .config.config import ccat_ops_db_settings
from sqlalchemy.dialects.postgresql import JSON, UUID
from sqlalchemy.orm import relationship
from typing import Optional
from .ccat_ops_db import Base
[docs]
class LocationType(Enum):
"""Location type enum
The location type determines the role of the location in the data transfer system.
SOURCE: Telescope instrument computers
BUFFER: Input/transit buffers
LONG_TERM_ARCHIVE: Permanent storage
PROCESSING: Temporary processing areas
"""
SOURCE = "source" # Telescope instrument computers
BUFFER = "buffer" # Input/processing buffers
LONG_TERM_ARCHIVE = "long_term_archive" # Permanent storage
PROCESSING = "processing" # Temporary processing areas
[docs]
class StorageType(Enum):
"""Storage type enum
The storage type determines the physical storage medium of the location.
DISK: Traditional disk storage
S3: Object storage (AWS S3 or compatible)
TAPE: Tape-based archival storage
"""
DISK = "disk"
S3 = "s3"
TAPE = "tape"
[docs]
class RouteType(Enum):
"""Route type enum
The route type determines the route of the data transfer.
DIRECT:
RELAY: Route through intermediate site
CUSTOM: Custom location-to-location override
"""
DIRECT = "direct"
RELAY = "relay" # Route through intermediate site
CUSTOM = "custom" # Custom location-to-location override
[docs]
class Status(Enum):
"""Status enum
These Status labels are used to make the Status checks uniform accross the entire
data transfer system.
These are used accros many concepts such as RawDataPackage, DataTransferPackage,
DataTransfer, etc.
PENDING: Is used to mark a concept as waiting for execution e.g. a RawDataPackage in
PENDING will be scheduled to be build.
SCHEDULED: Marks a concept as scheduled for execution e.g. a RawDataPackage in
IN_PROGRESS: Marks that a celery job is working on this concept.
COMPLETED: Succesfully completed the concept.
FAILED: Failed to execute the concept.
"""
PENDING = ccat_ops_db_settings.status.PENDING
SCHEDULED = ccat_ops_db_settings.status.SCHEDULED
IN_PROGRESS = ccat_ops_db_settings.status.IN_PROGRESS
COMPLETED = ccat_ops_db_settings.status.COMPLETED
FAILED = ccat_ops_db_settings.status.FAILED
[docs]
class PackageState(Enum):
"""Package state enum
The package state determines the state of the package.
WAITING: Waiting for transfer
TRANSFERRING: Transferring
ARCHIVED: Archived
FAILED: Failed
"""
WAITING = "waiting" # Yellow hourglass - only in primary archive
TRANSFERRING = "transferring" # Blue rotating circle - part of DataTransferPackage
ARCHIVED = "archived" # Green checkmark - archived with all statuses completed
FAILED = "failed" # Red cross - any status failed
[docs]
class PhysicalCopyStatus(Enum):
PRESENT = "present"
STAGED = (
"staged" # Package staged and unpacked, physical RawDataPackage file removed
)
DELETION_POSSIBLE = "deletion_possible"
DELETION_PENDING = "deletion_pending"
DELETION_SCHEDULED = "deletion_scheduled"
DELETION_IN_PROGRESS = "deletion_in_progress"
DELETION_FAILED = "deletion_failed"
DELETED = "deleted"
[docs]
class Site(Base):
"""Represents a physical or logical site where data can be stored or processed."""
__tablename__ = "site"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False) # "CCAT", "Cologne", "Cornell"
short_name = Column(String(20), nullable=False) # "ccat", "cologne", "us"
site_location = Column(String(250), nullable=True) # "Atacama", "Germany", "USA"
long_term_archive_transfers = relationship(
"LongTermArchiveTransfer", back_populates="site"
)
locations = relationship("DataLocation", back_populates="site")
__table_args__ = (UniqueConstraint("short_name", name="uix_site_short_name"),)
[docs]
class DataLocation(Base):
"""Base class for all data storage locations with polymorphic storage types."""
__tablename__ = "data_location"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False) # "ccat_chai_computer", "fyst_buffer_1"
location_type = Column(SQLAlchemyEnum(LocationType), nullable=False)
site_id = Column(Integer, ForeignKey("site.id"), nullable=False)
site = relationship("Site", back_populates="locations")
# Buffer hierarchy and failover
active = Column(Boolean, default=True, nullable=False)
priority = Column(Integer, default=0, nullable=False) # Lower = higher priority
# Polymorphic setup
storage_type = Column(
SQLAlchemyEnum(StorageType), nullable=False
) # "disk", "s3", "tape"
__mapper_args__ = {"polymorphic_identity": None, "polymorphic_on": storage_type}
__table_args__ = (
UniqueConstraint("site_id", "name", name="uix_data_location_name"),
)
[docs]
class DiskDataLocation(DataLocation):
"""Disk-based storage location."""
__tablename__ = "disk_data_location"
id = Column(Integer, ForeignKey("data_location.id"), primary_key=True)
path = Column(String(500), nullable=False)
host = Column(String(250))
user = Column(String(250))
__mapper_args__ = {"polymorphic_identity": StorageType.DISK}
[docs]
class S3DataLocation(DataLocation):
"""S3-compatible object storage location."""
__tablename__ = "s3_data_location"
id = Column(Integer, ForeignKey("data_location.id"), primary_key=True)
bucket_name = Column(String(250), nullable=False)
region = Column(String(100))
endpoint_url = Column(String(500)) # For non-AWS S3
__mapper_args__ = {"polymorphic_identity": StorageType.S3}
[docs]
def get_s3_credentials(self, site_name: str) -> tuple[str, str]:
"""Get S3 credentials from environment variables using the pattern:
{Site.name}_{DataLocation.name}_S3_ACCESS_KEY_ID
{Site.name}_{DataLocation.name}_S3_SECRET_ACCESS_KEY
Examples
--------
Cologne site, long_term_archive location::
COLOGNE_LONG_TERM_ARCHIVE_S3_ACCESS_KEY_ID=your_access_key
COLOGNE_LONG_TERM_ARCHIVE_S3_SECRET_ACCESS_KEY=your_secret_key
US site, long_term_archive location::
US_LONG_TERM_ARCHIVE_S3_ACCESS_KEY_ID=your_access_key
US_LONG_TERM_ARCHIVE_S3_SECRET_ACCESS_KEY=your_secret_key
Parameters
----------
site_name : str
Name of the site
Returns
-------
tuple[str, str]
(access_key_id, secret_access_key)
"""
from ccat_data_transfer.config.config import ccat_data_transfer_settings
# Construct environment variable names
access_key_var = f"{site_name}_{self.name}_S3_ACCESS_KEY_ID"
secret_key_var = f"{site_name}_{self.name}_S3_SECRET_ACCESS_KEY"
# Get credentials from DynaConf settings
access_key_id = getattr(ccat_data_transfer_settings, access_key_var, None)
secret_access_key = getattr(ccat_data_transfer_settings, secret_key_var, None)
# Fall back to global settings if location-specific ones aren't set
if not access_key_id:
access_key_id = ccat_data_transfer_settings.s3_access_key_id
if not secret_access_key:
secret_access_key = ccat_data_transfer_settings.s3_secret_access_key
return access_key_id, secret_access_key
[docs]
class TapeDataLocation(DataLocation):
"""Tape-based storage location."""
__tablename__ = "tape_data_location"
id = Column(Integer, ForeignKey("data_location.id"), primary_key=True)
library_name = Column(String(250))
mount_path = Column(String(500))
__mapper_args__ = {"polymorphic_identity": StorageType.TAPE}
raw_data_package_staging_job_association = Table(
"raw_data_package_staging_job_association",
Base.metadata,
Column("raw_data_package_id", Integer, ForeignKey("raw_data_package.id")),
Column("staging_job_id", Integer, ForeignKey("staging_job.id")),
)
user_role_association = Table(
"user_role_association",
Base.metadata,
Column("user_id", Integer, ForeignKey("user.id"), primary_key=True),
Column("role_id", Integer, ForeignKey("role.id"), primary_key=True),
)
instrument_observing_program_association = Table(
"instrument_observing_program",
Base.metadata,
Column("instrument_id", Integer, ForeignKey("instrument.id"), primary_key=True),
Column(
"observing_program_id",
Integer,
ForeignKey("observing_program.id"),
primary_key=True,
),
)
obs_unit_instrument_module_configuration_association = Table(
"obs_unit_instrument_module_configuration_association",
Base.metadata,
Column("obs_unit_id", Integer, ForeignKey("obs_unit.id")),
Column(
"instrument_module_configuration_id",
Integer,
ForeignKey("instrument_module_configuration.id"),
),
)
[docs]
class ObservingProgram(Base):
__tablename__ = "observing_program"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
short_name = Column(String(20), nullable=False)
description = Column(Text)
lead_id = Column(Integer, ForeignKey("user.id"), nullable=True)
lead = relationship("User", back_populates="observing_programs")
sub_observing_programs = relationship(
"SubObservingProgram",
back_populates="observing_program",
)
instruments = relationship(
"Instrument",
secondary=instrument_observing_program_association,
back_populates="observing_programs",
)
obs_units = relationship("ObsUnit", back_populates="observing_program")
[docs]
class SubObservingProgram(Base):
__tablename__ = "sub_observing_program"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
short_name = Column(String(20), nullable=False)
description = Column(Text)
observing_program_id = Column(
Integer,
ForeignKey("observing_program.id"),
nullable=False,
)
observing_program = relationship(
"ObservingProgram",
back_populates="sub_observing_programs",
)
obs_units = relationship("ObsUnit", back_populates="sub_observing_program")
# The following tables implement a data transfer system from one side to the other with
# handshaking and deletion on the origin uncommented
# the Source Table needs a sqlalchemy column that resembles a SkyCoord object read from
# the ra_deg and dec_deg columns as well as the coordinate_system column and the epoch
# column
[docs]
class Source(Base):
"""A source is a celestial object
This class serves as a base class for various types of sources. It is a polymorphic
class in SQLAlchemy, not instantiated directly, but used to provide common
attributes. Subclasses, implemented as separate database tables, inherit from the
source class and can have additional specific attributes.
See classes that are based on this class for more information on the implemented
types of sources.
"""
__tablename__ = "source"
id = Column(Integer, primary_key=True)
name = Column(
String(250), nullable=False, unique=True, doc="The name of the source"
)
type = Column(String(250), nullable=False, doc="The polymorphic identity")
__mapper_args__ = {
"polymorphic_on": type,
"polymorphic_identity": "source",
}
# Add relationships
obs_units = relationship("ObsUnit", back_populates="source")
[docs]
class FixedSource(Source):
"""A source that has a fixed coordinates
This class is a subclass of the Source class and inherits all attributes from the
Source class.
The class implements the skycoord property that returns a SkyCoord object from the
ra_deg and dec_deg columns.
"""
__tablename__ = "fixed_source"
__mapper_args__ = {"polymorphic_identity": "fixed_source"}
id = Column(Integer, ForeignKey("source.id"), primary_key=True)
ra_deg = Column(Float, nullable=False, doc="The right ascension in degrees (ICRS)")
dec_deg = Column(Float, nullable=False, doc="The declination in degrees (ICRS)")
slam = Column(String(50), doc="Longitude string of the original input")
sbet = Column(String(50), doc="Latitude string of the original input")
vlsr = Column(Float, doc="The local standard of rest velocity in km/s")
frame = Column(String(50), doc="The frame of the coordinates of the original input")
@property
def skycoord(self):
"""Return a SkyCoord object from the ra_deg and dec_deg columns"""
if self.ra_deg is not None and self.dec_deg is not None:
return SkyCoord(
ra=self.ra_deg * u.deg,
dec=self.dec_deg * u.deg,
frame="icrs",
)
return None
[docs]
class SolarSystemObject(Source):
"""A source that is a solar system object
This class is a subclass of the Source class and inherits all attributes from the
Source class.
"""
__tablename__ = "solar_system_object"
__mapper_args__ = {"polymorphic_identity": "solar_system_object"}
id = Column(Integer, ForeignKey("source.id"), primary_key=True)
eph_name = Column(
String(50),
nullable=True,
doc="Standard ephemeris name of the source",
)
# https://naif.jpl.nasa.gov/pub/naif/toolkit_docs/FORTRAN/req/naif_ids.html#NAIF%20Integer%20ID%20codes
naif_id = Column(String(50), nullable=True, doc="The NAIF ID of the source")
[docs]
class ConstantElevationSource(Source):
"""A source that is observed at a constant elevation
This class is a subclass of the Source class and inherits all attributes from the
Source class.
"""
__tablename__ = "constant_elevation_source"
__mapper_args__ = {"polymorphic_identity": "constant_elevation_source"}
id = Column(Integer, ForeignKey("source.id"), primary_key=True)
ra_deg_min = Column(
Float, nullable=False, doc="The minimum right ascension of the area in degrees"
)
ra_deg_max = Column(
Float, nullable=False, doc="The maximum right ascension of the area in degrees"
)
dec_deg_min = Column(
Float, nullable=False, doc="The minimum declination of the area in degrees"
)
dec_deg_max = Column(
Float, nullable=False, doc="The maximum declination of the area in degrees"
)
slam_min = Column(
String(50), doc="The minimum longitude string of the original input"
)
slam_max = Column(
String(50), doc="The maximum longitude string of the original input"
)
sbet_min = Column(
String(50), doc="The minimum latitude string of the original input"
)
sbet_max = Column(
String(50), doc="The maximum latitude string of the original input"
)
vlsr = Column(Float, doc="The local standard of rest velocity in km/s")
frame = Column(String(50), doc="The frame of the coordinates of the original input")
[docs]
class Line(Base):
"""A spectral line that is observed by an instrument"""
__tablename__ = "line"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False, doc="The name of the spectral line")
rest_frequency = Column(Float, nullable=False, doc="The rest frequency of the line")
side_band = Column(String(250), nullable=False, doc="The side band of the line")
available = Column(Boolean, nullable=False)
comment = Column(String(250), nullable=True)
# chai_array_configurations = relationship(
# "ChaiArrayConfiguration", back_populates="line"
# )
# To be able to also include data from other observatories such as GREAT/SOFIA we
# include a specific observatory table
[docs]
class Observatory(Base):
__tablename__ = "observatory"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False, unique=True)
description = Column(String(250), nullable=False)
telescopes = relationship("Telescope", back_populates="observatory")
# To prepare for the possibility of multiple telescopes in an observatory we include a
# telescope table
[docs]
class Telescope(Base):
__tablename__ = "telescope"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
description = Column(String(250), nullable=False)
lon_deg = Column(Float, nullable=False)
lat_deg = Column(Float, nullable=False)
alt_m = Column(Float, nullable=False)
instruments = relationship("Instrument", back_populates="telescope")
observatory_id = Column(Integer, ForeignKey("observatory.id"))
observatory = relationship("Observatory", back_populates="telescopes")
# An instrument runs on a specific telescope
[docs]
class Instrument(Base):
__tablename__ = "instrument"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
instrument_type = Column(String(250), nullable=False)
description = Column(String(250))
telescope_id = Column(Integer, ForeignKey("telescope.id"))
telescope = relationship("Telescope", back_populates="instruments")
modules = relationship("InstrumentModule", back_populates="instrument")
observing_programs = relationship(
"ObservingProgram",
secondary=instrument_observing_program_association,
back_populates="instruments",
)
available = Column(Boolean, nullable=False)
[docs]
class InstrumentModule(Base):
__tablename__ = "instrument_module"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
description = Column(String(250))
pixels = Column(Integer, nullable=True, default=1)
instrument_id = Column(Integer, ForeignKey("instrument.id"))
instrument = relationship("Instrument", back_populates="modules")
instrument_module_configurations = relationship(
"InstrumentModuleConfiguration", back_populates="instrument_module"
)
available = Column(Boolean, nullable=False)
raw_data_packages = relationship(
"RawDataPackage", back_populates="instrument_module"
)
raw_data_files = relationship(
"RawDataFile",
back_populates="instrument_module",
)
[docs]
class InstrumentModuleConfiguration(Base):
__tablename__ = "instrument_module_configuration"
id = Column(Integer, primary_key=True)
type = Column(String)
instrument_module_id = Column(Integer, ForeignKey("instrument_module.id"))
instrument_module = relationship(
"InstrumentModule", back_populates="instrument_module_configurations"
)
raw_data_files = relationship(
"RawDataFile", back_populates="instrument_module_configuration"
)
__mapper_args__ = {
"polymorphic_on": type,
"polymorphic_identity": "instrument_module_configuration",
} # Polymorphic mapping
[docs]
class ChaiModuleConfiguration(InstrumentModuleConfiguration):
__tablename__ = "chai_module_configuration"
__mapper_args__ = {"polymorphic_identity": "chai_module_configuration"}
id = Column(
Integer, ForeignKey("instrument_module_configuration.id"), primary_key=True
)
line_id = Column(Integer, ForeignKey("line.id"), nullable=False)
line = relationship("Line")
config_parameters = Column(
JSON,
doc="List of instrument configuration parameters",
)
[docs]
class PrimeCamModuleConfiguration(InstrumentModuleConfiguration):
__tablename__ = "prime_cam_module_configuration"
__mapper_args__ = {
"polymorphic_identity": "prime_cam_module_configuration"
} # Polymorphic mapping
id = Column(
Integer, ForeignKey("instrument_module_configuration.id"), primary_key=True
)
config_parameters = Column(
JSON,
doc="List of instrument configuration parameters",
)
[docs]
class ObservationConfiguration(Base):
__tablename__ = "observation_configuration"
id = Column(Integer, primary_key=True)
type = Column(String)
obs_units = relationship("ObsUnit", back_populates="observation_configuration")
__mapper_args__ = {
"polymorphic_on": type,
"polymorphic_identity": "observation_configuration",
}
azimuth_range = Column(
JSON,
nullable=True,
doc="Azimuth range lookup table for constant elevation scans",
)
[docs]
class ChaiObservationConfiguration(ObservationConfiguration):
__tablename__ = "chai_observation_configuration"
__mapper_args__ = {"polymorphic_identity": "chai_observation_configuration"}
id = Column(Integer, ForeignKey("observation_configuration.id"), primary_key=True)
chai_tilings = relationship(
"ChaiTiling", back_populates="chai_observation_configuration"
)
# chai_inpar_parameters = relationship(
# "ChaiInparParameter",
# secondary=chai_observation_configuration_chai_inpar_parameters_association,
# back_populates="chai_observation_configurations",
# )
ntilelines = Column(
Integer,
nullable=True,
doc="Number of tile lines to be grouped (for socring in scheduler)",
)
[docs]
class PrimeCamObservationConfiguration(ObservationConfiguration):
__tablename__ = "prime_cam_observation_configuration"
__mapper_args__ = {
"polymorphic_identity": "prime_cam_observation_configuration"
} # Polymorphic mapping
id = Column(Integer, ForeignKey("observation_configuration.id"), primary_key=True)
version = Column(Integer, nullable=False)
history = Column(JSON, nullable=True)
mapping_parameters = Column(
JSON,
nullable=True,
doc="List of mapping parameters",
)
[docs]
class ObsMode(Base):
__tablename__ = "obs_mode"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
description = Column(String(250))
obs_units = relationship("ObsUnit", back_populates="obs_mode")
[docs]
class ObsUnit(Base):
__tablename__ = "obs_unit"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
version = Column(Integer, nullable=False)
history = Column(JSON, nullable=True)
phase = Column(
String(20), nullable=False, doc="CM=commissioning BS=baseline science etc."
)
group = Column(String(250), nullable=True)
group_type = Column(String(250), nullable=True)
equal_tolerance = Column(
Float,
nullable=True,
doc="Tolerance of inbalanced schedule when group_type is equal",
)
min_alt = Column(Float, nullable=True)
max_alt = Column(Float, nullable=True)
min_rotang = Column(Float, nullable=True)
max_rotang = Column(Float, nullable=True)
nominal_alt = Column(String(250), nullable=True)
min_lsa = Column(Float, nullable=True)
max_lsa = Column(Float, nullable=True)
lsa_margin = Column(Float, nullable=True)
cadence = Column(Float, nullable=True)
requested_time_h = Column(Float, comment="Mandatory for PrimeCam", nullable=True)
unit_duration_h = Column(Float, nullable=False)
max_distance_in_map = Column(Float, nullable=True)
trans_ref = Column(Float, nullable=True, doc="Reference transmission")
priorities = Column(Float, nullable=False) # needs to be float, not integer
available = Column(
Boolean, nullable=False, doc="Whether this ObsUnit is ready to be scheduled"
)
pre_scheduled_basis = Column(
Boolean,
doc="Whether this Obsunit is executed on pre-scheduled basis, meaning that it has a very low priority outside of the pre-scheduled slots.",
)
additional_parameters = Column(
JSON, nullable=True, doc="Any special parameters or constraints to be stored"
)
source_id = Column(Integer, ForeignKey("source.id"), nullable=True)
source = relationship("Source", back_populates="obs_units")
observing_program_id = Column(
Integer,
ForeignKey("observing_program.id"),
nullable=False,
)
observing_program = relationship("ObservingProgram", back_populates="obs_units")
sub_observing_program_id = Column(Integer, ForeignKey("sub_observing_program.id"))
sub_observing_program = relationship(
"SubObservingProgram",
back_populates="obs_units",
)
obs_mode_id = Column(Integer, ForeignKey("obs_mode.id"), nullable=True)
obs_mode = relationship("ObsMode", back_populates="obs_units")
primary_instrument_module_configuration_id = Column(
Integer, ForeignKey("instrument_module_configuration.id"), nullable=False
)
primary_instrument_module_configuration = relationship(
"InstrumentModuleConfiguration",
backref="primary_obs_units",
)
# Note: it is a bit redundant but the following list includes
# primary_instrument_module_configuration as well
# so that it is easier to retrieve all instrument module configuration
# that are associated to this obsunit
instrument_module_configurations = relationship(
"InstrumentModuleConfiguration",
secondary=obs_unit_instrument_module_configuration_association,
backref="obs_units",
)
observation_configuration_id = Column(
Integer, ForeignKey("observation_configuration.id"), nullable=True
)
observation_configuration = relationship(
"ObservationConfiguration", back_populates="obs_units"
)
executed_obs_units = relationship("ExecutedObsUnit", back_populates="obs_unit")
pre_scheduled_slots = relationship("PreScheduledSlot", back_populates="obs_unit")
raw_data_files = relationship("RawDataFile", back_populates="obs_unit")
raw_data_packages = relationship("RawDataPackage", back_populates="obs_unit")
[docs]
class PreScheduledSlot(Base):
__tablename__ = "pre_scheduled_slot"
id = Column(Integer, primary_key=True)
start_time = Column(DateTime, nullable=False)
end_time = Column(DateTime, nullable=False)
obs_unit_id = Column(Integer, ForeignKey("obs_unit.id"), nullable=False)
obs_unit = relationship("ObsUnit", back_populates="pre_scheduled_slots")
[docs]
class ExecutedObsUnit(Base):
__tablename__ = "executed_obs_unit"
id = Column(UUID(as_uuid=True), primary_key=True)
start_time = Column(DateTime, nullable=False)
end_time = Column(DateTime)
status = Column(
String(20), nullable=True, doc="running/*success*/*interruped*/unusable/..."
)
mean_pwv = Column(Float, nullable=True)
mean_elevation = Column(Float, nullable=True)
obs_unit_id = Column(Integer, ForeignKey("obs_unit.id"), nullable=False)
obs_unit = relationship("ObsUnit", back_populates="executed_obs_units")
obs_info = Column(
JSON,
nullable=True,
doc="Static ancillary infomation (e.g. ObsUnit version, tiling ID for CHAI)",
)
obs_progress = Column(JSON, nullable=True, doc="Field for progress tracking")
raw_data_packages = relationship(
"RawDataPackage", back_populates="executed_obs_unit"
)
raw_data_files = relationship("RawDataFile", back_populates="executed_obs_unit")
# class ExecutedChaiTiling(Base):
# __tablename__ = "executed_chai_tiling"
# id = Column(Integer, primary_key=True)
# start_time = Column(DateTime, nullable=False)
# end_time = Column(DateTime, nullable=False)
# success = Column(Boolean, nullable=False)
# mean_pwv = Column(Float, nullable=False)
# mean_elevation = Column(Float, nullable=False)
# # chai_tiling_id = Column(Integer, ForeignKey("chai_tiling.id"))
# # chai_tiling = relationship("ChaiTiling", back_populates="executed_chai_tiling")
# # chai_inpar_parameter_id = Column(Integer, ForeignKey("chai_inpar_parameter.id"))
# # chai_inpar_parameter = relationship(
# # "ChaiInparParameter", back_populates="chai_obs_units"
# # )
# # secondary_chai_array_configuration = relationship(
# # "ChaiArrayConfiguration",
# # secondary=\
# # executed_chai_obs_unit_secondary_chai_array_configuration_table,
# # back_populates="secondary_executed_chai_obs_units",
# # )
[docs]
class ChaiTiling(Base):
__tablename__ = "chai_tiling"
id = Column(Integer, primary_key=True)
version = Column(Integer, nullable=False)
history = Column(JSON, nullable=True)
priority_in_tiling = Column(Integer, nullable=False)
tile_id = Column(String(10), nullable=False)
tile_offset_x = Column(Float, nullable=False)
tile_offset_y = Column(Float, nullable=False)
x_or_y = Column(String(2))
tile_unit_scaling_x = Column(Float)
tile_unit_scaling_y = Column(Float)
edge = Column(String(10))
goal_ncycle = Column(Integer, nullable=False)
# tiling is connected to obsunit through observation configuration
# obs_unit_id = Column(Integer, ForeignKey("obs_unit.id"), nullable=False)
# obs_unit = relationship("ObsUnit")
chai_observation_configuration_id = Column(
Integer, ForeignKey("chai_observation_configuration.id")
)
chai_observation_configuration = relationship(
"ChaiObservationConfiguration", back_populates="chai_tilings"
)
chai_inpar_parameter_id = Column(Integer, ForeignKey("chai_inpar_parameter.id"))
chai_inpar_parameter = relationship(
"ChaiInparParameter", back_populates="chai_tilings"
)
[docs]
class ChaiInparParameter(Base):
__tablename__ = "chai_inpar_parameter"
id = Column(Integer, primary_key=True)
# nullable=True is default
name = Column(String(100), nullable=False)
version = Column(Integer, nullable=False)
history = Column(JSON)
chai_tilings = relationship("ChaiTiling", back_populates="chai_inpar_parameter")
lam = Column(Float, nullable=False, doc="Map center offset in longitude [arcsec]")
bet = Column(Float, nullable=False, doc="Map center offset in latitude [arcsec]")
cormap = Column(String(10), nullable=False, doc="Map cood. system")
line_range = Column(
String(50),
doc='Range of velocities [km/s] affected by lines (e.g. "-70:-30:v,-10:10:v")',
)
goal_resolution = Column(
Float,
doc="Spectral resolution [km/s] with which the scientific analysis will be done",
)
refname = Column(
String(20),
doc="Reference name when using the absolute positions. Set NAN to use relative",
)
refoffl = Column(
Float, doc="Reference position relative to on in longitude [arcsec]"
)
refoffb = Column(
Float, doc="Reference position relative to on in latitude [arcesc]"
)
corref = Column(String(10), doc="Reference cood. system")
mode = Column(String(10), doc="Observing mode; otfl or otfb")
otfpattern = Column(
String(50), doc="OTF pattern file name specified in kosma_software"
)
ton = Column(Float, doc="On integration time [sec]")
toff = Column(
Float,
doc="Forced off time [s] (-1 to use the default calculation = default)",
)
repetition = Column(Integer, doc="Repetition number")
offononoff = Column(
Integer,
doc="Sequence of OFF and ON (1 = OFF-ON-ON-OFF (default), 0 = OFF-ON)",
)
stepl = Column(Float, doc="Step size in longitude [arcsec]")
stepb = Column(Float, doc="Step size in latitude [arcsec]")
mapsizel = Column(Float, doc="Size of map in longitude [arcsec]")
mapsizeb = Column(Float, doc="Size of map in latitude [arcsec]")
nmapl = Column(Integer, doc="number of raster positions along longitude")
nmapb = Column(Integer, doc="number of raster positions along latitude")
mapangle = Column(Float, doc="Position angle of the map (counter-clock) [degree]")
crosssizel = Column(Float, doc="Size of cross in longitude [arcsec]")
crosssizeb = Column(Float, doc="Size of cross in latitude [arcsec]")
reverseflg = Column(
Integer,
doc=" OTF scan direction in cross : 0 = (+x,+y), 1 = (-x,-y), 2 = (-x,+x,-y,+y)",
)
scan_dir = Column(Integer, doc="OTF scan direction (1 is +x or +y, -1 is -x or -y)")
scan_order = Column(
Integer, doc="Order of OTF lines (1 is +x or +y, -1 is -x or -y)"
)
evendump = Column(
Integer,
doc="If 1 allow even number of dump positions without hitting the center. Default = 0",
)
novertical = Column(
Integer,
doc="If 1 skip the second (vertical) scan for cross observations. Default = 0",
)
pointingflg = Column(
Integer,
doc="Flag to indicate that it is a pointing session. Default = 0",
)
offperload = Column(Integer, doc="Define how many off for one load measurment")
onperload = Column(
Integer, doc="Define how many on positions for one load measurement"
)
repperload = Column(
Integer, doc="Define how many repetition for one load measurment"
)
lineperoff = Column(
Integer, doc="Define how many otf scan lines for one off measurement"
)
onperoff = Column(
Integer, doc="Define how many on positions for one off measurement"
)
offperpattern = Column(Integer, doc="Define how many off measurement per pattern")
refpoint = Column(String(20), doc="Reference point (as is used in setpoint)")
act_pixflg = Column(
Integer,
doc="Flag to use the actual pixel position of refpoint. Default = 0",
)
nextflg = Column(
String(1),
doc="If Y, enable telescope moving when writing data. Default = N",
)
# chai_observation_configurations = relationship(
# "ChaiObservationConfiguration",
# secondary=chai_observation_configuration_chai_inpar_parameters_association,
# back_populates="chai_inpar_parameters",
# )
[docs]
class Role(Base):
__tablename__ = "role"
id = Column(Integer, primary_key=True)
name = Column(String(150), unique=True)
description = Column(String(150), nullable=True)
# GitHub team mappings for automatic role assignment
github_team_mappings = Column(
JSON, nullable=True, doc="GitHub teams that map to this role"
)
# Permission scopes for this role
permissions = Column(
JSON, nullable=True, doc="List of permissions granted to this role"
)
users = relationship(
"User", secondary=user_role_association, back_populates="roles"
)
[docs]
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True)
email = Column(String(150))
username = Column(String(150))
first_name = Column(String(150), nullable=True)
last_name = Column(String(150), nullable=True)
title = Column(String(150), nullable=True)
affiliation = Column(Text, nullable=True)
password = Column(String(150))
# GitHub integration fields
github_id = Column(String(50), nullable=True, unique=True, doc="GitHub user ID")
github_username = Column(String(150), nullable=True, doc="GitHub username")
# User preferences stored as JSON
preferences = Column(JSON, nullable=True, doc="User preferences and settings")
roles = relationship(
"Role", secondary=user_role_association, back_populates="users"
)
# Clean up duplicate fields (removing duplicates)
last_login_at = Column(DateTime())
current_login_at = Column(DateTime())
last_login_ip = Column(String(100))
current_login_ip = Column(String(100))
login_count = Column(Integer)
active = Column(Boolean())
confirmed_at = Column(DateTime())
# Relationships
observing_programs = relationship("ObservingProgram", back_populates="lead")
api_tokens = relationship(
"ApiToken", back_populates="user", cascade="all, delete-orphan"
)
[docs]
class ApiToken(Base):
"""API tokens for programmatic access to the API"""
__tablename__ = "api_token"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
user = relationship("User", back_populates="api_tokens")
# Token identification
name = Column(String(100), nullable=False, doc="Human-readable name for the token")
token_hash = Column(
String(255), nullable=False, unique=True, doc="Hashed token value"
)
token_prefix = Column(
String(10), nullable=False, doc="First few characters for identification"
)
# Token permissions and scopes
scopes = Column(JSON, nullable=True, doc="List of permission scopes for this token")
# Token lifecycle
created_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
nullable=False,
)
expires_at = Column(
DateTime(timezone=True), nullable=True, doc="Token expiration time"
)
last_used_at = Column(
DateTime(timezone=True), nullable=True, doc="Last time token was used"
)
active = Column(
Boolean, default=True, nullable=False, doc="Whether token is active"
)
# Usage tracking
usage_count = Column(
Integer, default=0, nullable=False, doc="Number of times token was used"
)
last_used_ip = Column(String(100), nullable=True, doc="IP address of last usage")
[docs]
def is_expired(self):
"""Check if token is expired"""
if self.expires_at is None:
return False
return datetime.now(timezone.utc) > self.expires_at
[docs]
def is_valid(self):
"""Check if token is valid (active and not expired)"""
return self.active and not self.is_expired()
[docs]
class DataTransferRoute(Base):
"""Defines routes for data transfer between sites and locations.
The database implements a flexible routing system that supports:
1. Direct routes between specific locations
2. Relay routes through intermediate sites
3. Custom location-to-location overrides
"""
__tablename__ = "data_transfer_route"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
# Site-level routing
origin_site_id = Column(Integer, ForeignKey("site.id"), nullable=False)
origin_site = relationship("Site", foreign_keys=[origin_site_id])
destination_site_id = Column(Integer, ForeignKey("site.id"), nullable=False)
destination_site = relationship("Site", foreign_keys=[destination_site_id])
# Route configuration
route_type = Column(SQLAlchemyEnum(RouteType), nullable=False)
transfer_method = Column(String(250), nullable=False) # "bbcp", "s3", "cp"
# Optional location-specific overrides
origin_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=True)
origin_location = relationship("DataLocation", foreign_keys=[origin_location_id])
destination_location_id = Column(
Integer, ForeignKey("data_location.id"), nullable=True
)
destination_location = relationship(
"DataLocation", foreign_keys=[destination_location_id]
)
# Relay configuration
relay_site_id = Column(Integer, ForeignKey("site.id"), nullable=True)
relay_site = relationship("Site", foreign_keys=[relay_site_id])
__table_args__ = (
UniqueConstraint(
"origin_site_id",
"destination_site_id",
"route_type",
name="uix_data_transfer_route",
),
)
data_transfer_package_files_association = Table(
"data_transfer_package_files",
Base.metadata,
Column(
"data_transfer_package_id",
Integer,
ForeignKey("data_transfer_package.id"),
primary_key=True,
),
Column(
"file_id", UUID(as_uuid=True), ForeignKey("raw_data_file.id"), primary_key=True
),
)
[docs]
class RawDataFile(Base):
"""Represents a raw data file from an instrument."""
__tablename__ = "raw_data_file"
id = Column(UUID(as_uuid=True), primary_key=True)
name = Column(String(250), nullable=False)
relative_path = Column(String(250), nullable=False)
obs_unit_id = Column(Integer, ForeignKey("obs_unit.id"), nullable=False)
obs_unit = relationship("ObsUnit", back_populates="raw_data_files")
created_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
instrument_module_id = Column(
Integer, ForeignKey("instrument_module.id"), nullable=False
)
instrument_module = relationship(
"InstrumentModule", back_populates="raw_data_files"
)
instrument_module_configuration_id = Column(
Integer, ForeignKey("instrument_module_configuration.id")
)
instrument_module_configuration = relationship(
"InstrumentModuleConfiguration", back_populates="raw_data_files"
)
file_type = Column(String(250), nullable=False)
size = Column(BigInteger, nullable=False)
checksum = Column(String(250), nullable=False)
description = Column(Text, nullable=True)
# Source location tracking
source_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=False)
source_location = relationship("DataLocation")
# Package relationships
raw_data_package_id = Column(
Integer, ForeignKey("raw_data_package.id", ondelete="CASCADE"), nullable=True
)
raw_data_package = relationship("RawDataPackage", back_populates="raw_data_files")
data_transfer_package_id = Column(
Integer, ForeignKey("data_transfer_package.id"), nullable=True
)
data_transfer_package = relationship(
"DataTransferPackage", back_populates="raw_data_files"
)
executed_obs_unit_id = Column(
UUID(as_uuid=True), ForeignKey("executed_obs_unit.id"), nullable=True
)
executed_obs_unit = relationship("ExecutedObsUnit", back_populates="raw_data_files")
# Physical copies
state = Column(SQLAlchemyEnum(PackageState), nullable=True)
physical_copies = relationship(
"RawDataFilePhysicalCopy",
back_populates="raw_data_file",
cascade="all, delete-orphan",
)
[docs]
class RawDataPackage(Base):
"""A raw data package is a bundle of raw data files that were observed in an
observation unit. But they should never be larger than 50GB in size.
"""
__tablename__ = "raw_data_package"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
relative_path = Column(
String(250),
nullable=False,
doc="""
This is the relative path to the raw data package. The absolute path of the location
of this file for each archive is stored in the DataLocation table. The path is
relative to the raw_data_path of the DataLocation.
""",
)
size = Column(BigInteger, nullable=False)
executed_obs_unit_id = Column(
UUID(as_uuid=True),
ForeignKey("executed_obs_unit.id", ondelete="CASCADE"),
nullable=False,
)
executed_obs_unit = relationship(
"ExecutedObsUnit", back_populates="raw_data_packages"
)
instrument_module_id = Column(
Integer, ForeignKey("instrument_module.id"), nullable=False
)
instrument_module = relationship(
"InstrumentModule", back_populates="raw_data_packages"
)
obs_unit_id = Column(
Integer, ForeignKey("obs_unit.id", ondelete="CASCADE"), nullable=False
)
obs_unit = relationship("ObsUnit", back_populates="raw_data_packages")
created_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
nullable=False,
)
checksum = Column(String(250), nullable=False)
raw_data_files = relationship("RawDataFile", back_populates="raw_data_package")
data_transfer_package_id = Column(
Integer,
ForeignKey("data_transfer_package.id", ondelete="CASCADE"),
nullable=True,
)
data_transfer_package = relationship(
"DataTransferPackage", back_populates="raw_data_packages"
)
long_term_archive_transfers = relationship(
"LongTermArchiveTransfer",
back_populates="raw_data_package",
cascade="all, delete-orphan",
)
analyze_status = Column(
SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING
)
physical_copies = relationship(
"RawDataPackagePhysicalCopy",
back_populates="raw_data_package",
cascade="all, delete-orphan",
)
package_metadata = relationship(
"RawDataPackageMetadata",
back_populates="raw_data_package",
uselist=False,
)
status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING)
state = Column(
SQLAlchemyEnum(PackageState),
nullable=True,
default=PackageState.WAITING,
)
staging_jobs = relationship(
"StagingJob",
secondary=raw_data_package_staging_job_association,
back_populates="raw_data_packages",
)
retry_count = Column(Integer, default=0)
[docs]
class DataTransferPackage(Base):
__tablename__ = "data_transfer_package"
id = Column(Integer, primary_key=True)
hash_id = Column(String(250), nullable=False)
file_name = Column(String(250), nullable=False)
size = Column(BigInteger, nullable=True)
checksum = Column(String(250), nullable=True)
relative_path = Column(String(250), nullable=False)
origin_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=False)
origin_location = relationship("DataLocation", foreign_keys=[origin_location_id])
raw_data_files = relationship(
"RawDataFile",
back_populates="data_transfer_package",
)
raw_data_packages = relationship(
"RawDataPackage",
back_populates="data_transfer_package",
)
data_transfers = relationship(
"DataTransfer", back_populates="data_transfer_package"
)
status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING)
failure_error_message = Column(Text, nullable=True)
physical_copies = relationship(
"DataTransferPackagePhysicalCopy",
back_populates="data_transfer_package",
cascade="all, delete-orphan",
)
# Counts the attempts to generate the data transfer package
retry_count = Column(Integer, default=0)
[docs]
class DataTransfer(Base):
"""Tracks data transfers between locations."""
__tablename__ = "data_transfer"
id = Column(Integer, primary_key=True)
process_id = Column(String(250))
retry_count = Column(Integer, nullable=False, default=0)
start_time = Column(DateTime)
end_time = Column(DateTime)
data_transfer_method = Column(String(250), default="bbcp")
status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING)
failure_error_message = Column(Text, nullable=True)
transfer_program_log = Column(Text)
# Location references
origin_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=False)
origin_location = relationship("DataLocation", foreign_keys=[origin_location_id])
destination_location_id = Column(
Integer, ForeignKey("data_location.id"), nullable=False
)
destination_location = relationship(
"DataLocation", foreign_keys=[destination_location_id]
)
# Package reference
data_transfer_package_id = Column(
Integer, ForeignKey("data_transfer_package.id"), nullable=False
)
data_transfer_package = relationship(
"DataTransferPackage", back_populates="data_transfers"
)
unpack_status = Column(
SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING
)
unpack_failure_error_message = Column(Text, nullable=True)
unpack_start_time = Column(DateTime)
unpack_end_time = Column(DateTime)
unpack_log = Column(Text)
unpack_retry_count = Column(Integer, nullable=False, default=0)
logs = relationship("DataTransferLog", back_populates="data_transfer")
[docs]
class SystemLog(Base):
__tablename__ = "system_log"
id = Column(Integer, primary_key=True)
type = Column(String(250), nullable=False)
__mapper_args__ = {
"polymorphic_on": type,
"polymorphic_identity": "system_log",
}
[docs]
class DataTransferLog(SystemLog):
"""Log entries for data transfers with references to log files.
This model implements a lightweight approach where:
- Basic status info and log file path are stored in the database
- Full command outputs are stored in files
- Detailed metrics are stored in InfluxDB
"""
__tablename__ = "data_transfer_log"
__mapper_args__ = {
"polymorphic_identity": "data_transfer_log",
}
id = Column(Integer, ForeignKey("system_log.id"), primary_key=True)
data_transfer_id = Column(Integer, ForeignKey("data_transfer.id"), nullable=False)
data_transfer = relationship("DataTransfer", back_populates="logs")
# Basic info
timestamp = Column(DateTime, nullable=False)
status = Column(
String(50),
nullable=False,
doc="Status of this transfer attempt (success/failure)",
)
# Path to the log file
log_path = Column(
String(500), nullable=False, doc="Path to file containing full command output"
)
@property
def content(self) -> Optional[str]:
"""Read and return the full log content if file exists."""
if self.log_path and os.path.exists(self.log_path):
with open(self.log_path, "r") as f:
return f.read()
return None
# def __repr__(self):
# return (
# f"<DataTransferLog(id={self.id}, "
# f"data_transfer_id={self.data_transfer_id}, "
# f"timestamp={self.timestamp}, "
# f"status={self.status})>"
# )
[docs]
class DataArchive(Base):
__tablename__ = "data_archive"
id = Column(Integer, primary_key=True)
name = Column(String(250), nullable=False)
[docs]
class PhysicalCopy(Base):
"""Base class for tracking physical copies of files across different storage locations.
This class implements a polymorphic pattern to track physical copies of different types
of files (RawDataFile, RawDataPackage, DataTransferPackage) across different storage
locations and types. Each physical copy represents an actual file on disk or other
storage medium.
"""
__tablename__ = "physical_copy"
id = Column(Integer, primary_key=True)
type = Column(String(50), nullable=False)
# Location reference
data_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=False)
data_location = relationship("DataLocation")
created_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
nullable=False,
)
verified_at = Column(DateTime(timezone=True), nullable=True)
status = Column(
SQLAlchemyEnum(PhysicalCopyStatus),
nullable=False,
default=PhysicalCopyStatus.PRESENT,
)
checksum = Column(String(250), nullable=True)
deletion_task_id = Column(Integer, nullable=True)
deleted_at = Column(DateTime(timezone=True), nullable=True)
__mapper_args__ = {
"polymorphic_on": type,
"polymorphic_identity": "physical_copy",
}
[docs]
class RawDataFilePhysicalCopy(PhysicalCopy):
"""Tracks physical copies of individual raw data files."""
__tablename__ = "raw_data_file_physical_copy"
__mapper_args__ = {"polymorphic_identity": "raw_data_file_physical_copy"}
id = Column(Integer, ForeignKey("physical_copy.id"), primary_key=True)
raw_data_file_id = Column(
UUID(as_uuid=True), ForeignKey("raw_data_file.id"), nullable=False
)
raw_data_file = relationship("RawDataFile", back_populates="physical_copies")
@property
def full_path(self):
"""Get the full path/S3 key for this physical copy."""
if isinstance(self.data_location, DiskDataLocation):
return os.path.join(
self.data_location.path, self.raw_data_file.relative_path
)
elif isinstance(self.data_location, S3DataLocation):
# For S3, return just the relative path - the actual S3 key construction
# should be handled by the data-transfer package using the shared utility functions
return self.raw_data_file.relative_path
elif isinstance(self.data_location, TapeDataLocation):
return os.path.join(
self.data_location.mount_path, self.raw_data_file.relative_path
)
return self.raw_data_file.relative_path
[docs]
class RawDataPackagePhysicalCopy(PhysicalCopy):
"""Tracks physical copies of raw data packages."""
__tablename__ = "raw_data_package_physical_copy"
__mapper_args__ = {"polymorphic_identity": "raw_data_package_physical_copy"}
id = Column(Integer, ForeignKey("physical_copy.id"), primary_key=True)
raw_data_package_id = Column(
Integer, ForeignKey("raw_data_package.id", ondelete="CASCADE"), nullable=False
)
raw_data_package = relationship("RawDataPackage", back_populates="physical_copies")
@property
def full_path(self):
"""Get the full path/S3 key for this physical copy."""
if isinstance(self.data_location, DiskDataLocation):
return os.path.join(
self.data_location.path, self.raw_data_package.relative_path
)
elif isinstance(self.data_location, S3DataLocation):
# For S3, return just the relative path - the actual S3 key construction
# should be handled by the data-transfer package using the shared utility functions
return self.raw_data_package.relative_path
elif isinstance(self.data_location, TapeDataLocation):
return os.path.join(
self.data_location.mount_path, self.raw_data_package.relative_path
)
return self.raw_data_package.relative_path
[docs]
class DataTransferPackagePhysicalCopy(PhysicalCopy):
"""Tracks physical copies of data transfer packages."""
__tablename__ = "data_transfer_package_physical_copy"
__mapper_args__ = {"polymorphic_identity": "data_transfer_package_physical_copy"}
id = Column(Integer, ForeignKey("physical_copy.id"), primary_key=True)
data_transfer_package_id = Column(
Integer,
ForeignKey("data_transfer_package.id", ondelete="CASCADE"),
nullable=False,
)
data_transfer_package = relationship(
"DataTransferPackage", back_populates="physical_copies"
)
@property
def full_path(self):
"""Get the full path/S3 key for this physical copy."""
if isinstance(self.data_location, DiskDataLocation):
return os.path.join(
self.data_location.path, self.data_transfer_package.relative_path
)
elif isinstance(self.data_location, S3DataLocation):
# For S3, return just the relative path - the actual S3 key construction
# should be handled by the data-transfer package using the shared utility functions
return self.data_transfer_package.relative_path
elif isinstance(self.data_location, TapeDataLocation):
return os.path.join(
self.data_location.mount_path, self.data_transfer_package.relative_path
)
return self.data_transfer_package.relative_path
[docs]
class LongTermArchiveTransfer(Base):
__tablename__ = "long_term_archive_transfer"
id = Column(Integer, primary_key=True)
site_id = Column(Integer, ForeignKey("site.id"))
site = relationship("Site", back_populates="long_term_archive_transfers")
origin_data_location_id = Column(Integer, ForeignKey("data_location.id"))
origin_data_location = relationship(
"DataLocation", foreign_keys=[origin_data_location_id]
)
destination_data_location_id = Column(Integer, ForeignKey("data_location.id"))
destination_data_location = relationship(
"DataLocation", foreign_keys=[destination_data_location_id]
)
raw_data_package_id = Column(Integer, ForeignKey("raw_data_package.id"))
raw_data_package = relationship(
"RawDataPackage", back_populates="long_term_archive_transfers"
)
# Store logs directly in the LongTermArchiveTransfer model as a text field
logs = relationship(
"LongTermArchiveTransferLog", back_populates="long_term_archive_transfer"
)
# Long term archive transfer status
status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING)
failure_error_message = Column(Text, nullable=True)
start_time = Column(DateTime, nullable=True)
end_time = Column(DateTime, nullable=True)
attempt_count = Column(Integer, nullable=False, default=0)
# Add fields for tracking attempts
last_attempt_time = Column(DateTime, nullable=True)
# Add a field to store error messages
error_message = Column(Text, nullable=True)
@hybrid_property
def requires_intervention(self):
return self.attempt_count >= 3
@requires_intervention.expression
def requires_intervention(cls):
return case((cls.attempt_count >= 3, True), else_=False)
__table_args__ = (
UniqueConstraint(
"raw_data_package_id",
"origin_data_location_id",
"destination_data_location_id",
name="uix_long_term_archive_transfer_location",
),
)
[docs]
class LongTermArchiveTransferLog(SystemLog):
__tablename__ = "long_term_archive_transfer_log"
id = Column(Integer, ForeignKey("system_log.id"), primary_key=True)
long_term_archive_transfer_id = Column(
Integer, ForeignKey("long_term_archive_transfer.id"), nullable=False
)
long_term_archive_transfer = relationship(
"LongTermArchiveTransfer", back_populates="logs"
)
log = Column(Text, nullable=False)
timestamp = Column(DateTime, nullable=False)
__mapper_args__ = {
"polymorphic_identity": "long_term_archive_transfer_log",
}
[docs]
class StagingJob(Base):
__tablename__ = "staging_job"
id = Column(Integer, primary_key=True)
status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING)
failure_error_message = Column(Text, nullable=True)
start_time = Column(DateTime, nullable=True)
active = Column(Boolean, nullable=False, default=True)
end_time = Column(DateTime, nullable=True)
retry_count = Column(Integer, nullable=False, default=0)
raw_data_packages = relationship(
"RawDataPackage",
secondary=raw_data_package_staging_job_association,
back_populates="staging_jobs",
)
logs = relationship("StagingJobLog", back_populates="staging_job")
origin_data_location_id = Column(
Integer, ForeignKey("data_location.id"), nullable=False
)
origin_data_location = relationship(
"DataLocation", foreign_keys=[origin_data_location_id]
)
destination_data_location_id = Column(
Integer, ForeignKey("data_location.id"), nullable=False
)
destination_data_location = relationship(
"DataLocation", foreign_keys=[destination_data_location_id]
)
[docs]
class StagingJobLog(SystemLog):
__tablename__ = "staging_job_log"
id = Column(Integer, ForeignKey("system_log.id"), primary_key=True)
staging_job_id = Column(Integer, ForeignKey("staging_job.id"), nullable=False)
staging_job = relationship("StagingJob", back_populates="logs")
log = Column(Text, nullable=False)
timestamp = Column(DateTime, nullable=False)
__mapper_args__ = {
"polymorphic_identity": "staging_job_log",
}