Source code for ccat_ops_db.models

import os
import astropy.units as u
from astropy.coordinates import SkyCoord
from datetime import datetime, timezone
from sqlalchemy import (
    Boolean,
    Column,
    DateTime,
    Float,
    ForeignKey,
    Integer,
    String,
    Table,
    Text,
    UniqueConstraint,
    Enum as SQLAlchemyEnum,
    case,
    BigInteger,
)
from sqlalchemy.ext.hybrid import hybrid_property
from enum import Enum
from .config.config import ccat_ops_db_settings


from sqlalchemy.dialects.postgresql import JSON, UUID
from sqlalchemy.orm import relationship
from typing import Optional

from .ccat_ops_db import Base


[docs] class LocationType(Enum): """Location type enum The location type determines the role of the location in the data transfer system. SOURCE: Telescope instrument computers BUFFER: Input/transit buffers LONG_TERM_ARCHIVE: Permanent storage PROCESSING: Temporary processing areas """ SOURCE = "source" # Telescope instrument computers BUFFER = "buffer" # Input/processing buffers LONG_TERM_ARCHIVE = "long_term_archive" # Permanent storage PROCESSING = "processing" # Temporary processing areas
[docs] class StorageType(Enum): """Storage type enum The storage type determines the physical storage medium of the location. DISK: Traditional disk storage S3: Object storage (AWS S3 or compatible) TAPE: Tape-based archival storage """ DISK = "disk" S3 = "s3" TAPE = "tape"
[docs] class RouteType(Enum): """Route type enum The route type determines the route of the data transfer. DIRECT: RELAY: Route through intermediate site CUSTOM: Custom location-to-location override """ DIRECT = "direct" RELAY = "relay" # Route through intermediate site CUSTOM = "custom" # Custom location-to-location override
[docs] class Status(Enum): """Status enum These Status labels are used to make the Status checks uniform accross the entire data transfer system. These are used accros many concepts such as RawDataPackage, DataTransferPackage, DataTransfer, etc. PENDING: Is used to mark a concept as waiting for execution e.g. a RawDataPackage in PENDING will be scheduled to be build. SCHEDULED: Marks a concept as scheduled for execution e.g. a RawDataPackage in IN_PROGRESS: Marks that a celery job is working on this concept. COMPLETED: Succesfully completed the concept. FAILED: Failed to execute the concept. """ PENDING = ccat_ops_db_settings.status.PENDING SCHEDULED = ccat_ops_db_settings.status.SCHEDULED IN_PROGRESS = ccat_ops_db_settings.status.IN_PROGRESS COMPLETED = ccat_ops_db_settings.status.COMPLETED FAILED = ccat_ops_db_settings.status.FAILED
[docs] class PackageState(Enum): """Package state enum The package state determines the state of the package. WAITING: Waiting for transfer TRANSFERRING: Transferring ARCHIVED: Archived FAILED: Failed """ WAITING = "waiting" # Yellow hourglass - only in primary archive TRANSFERRING = "transferring" # Blue rotating circle - part of DataTransferPackage ARCHIVED = "archived" # Green checkmark - archived with all statuses completed FAILED = "failed" # Red cross - any status failed
[docs] class PhysicalCopyStatus(Enum): PRESENT = "present" STAGED = ( "staged" # Package staged and unpacked, physical RawDataPackage file removed ) DELETION_POSSIBLE = "deletion_possible" DELETION_PENDING = "deletion_pending" DELETION_SCHEDULED = "deletion_scheduled" DELETION_IN_PROGRESS = "deletion_in_progress" DELETION_FAILED = "deletion_failed" DELETED = "deleted"
[docs] class Site(Base): """Represents a physical or logical site where data can be stored or processed.""" __tablename__ = "site" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) # "CCAT", "Cologne", "Cornell" short_name = Column(String(20), nullable=False) # "ccat", "cologne", "us" site_location = Column(String(250), nullable=True) # "Atacama", "Germany", "USA" long_term_archive_transfers = relationship( "LongTermArchiveTransfer", back_populates="site" ) locations = relationship("DataLocation", back_populates="site") __table_args__ = (UniqueConstraint("short_name", name="uix_site_short_name"),)
[docs] class DataLocation(Base): """Base class for all data storage locations with polymorphic storage types.""" __tablename__ = "data_location" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) # "ccat_chai_computer", "fyst_buffer_1" location_type = Column(SQLAlchemyEnum(LocationType), nullable=False) site_id = Column(Integer, ForeignKey("site.id"), nullable=False) site = relationship("Site", back_populates="locations") # Buffer hierarchy and failover active = Column(Boolean, default=True, nullable=False) priority = Column(Integer, default=0, nullable=False) # Lower = higher priority # Polymorphic setup storage_type = Column( SQLAlchemyEnum(StorageType), nullable=False ) # "disk", "s3", "tape" __mapper_args__ = {"polymorphic_identity": None, "polymorphic_on": storage_type} __table_args__ = ( UniqueConstraint("site_id", "name", name="uix_data_location_name"), )
[docs] class DiskDataLocation(DataLocation): """Disk-based storage location.""" __tablename__ = "disk_data_location" id = Column(Integer, ForeignKey("data_location.id"), primary_key=True) path = Column(String(500), nullable=False) host = Column(String(250)) user = Column(String(250)) __mapper_args__ = {"polymorphic_identity": StorageType.DISK}
[docs] class S3DataLocation(DataLocation): """S3-compatible object storage location.""" __tablename__ = "s3_data_location" id = Column(Integer, ForeignKey("data_location.id"), primary_key=True) bucket_name = Column(String(250), nullable=False) region = Column(String(100)) endpoint_url = Column(String(500)) # For non-AWS S3 __mapper_args__ = {"polymorphic_identity": StorageType.S3}
[docs] def get_s3_credentials(self, site_name: str) -> tuple[str, str]: """Get S3 credentials from environment variables using the pattern: {Site.name}_{DataLocation.name}_S3_ACCESS_KEY_ID {Site.name}_{DataLocation.name}_S3_SECRET_ACCESS_KEY Examples -------- Cologne site, long_term_archive location:: COLOGNE_LONG_TERM_ARCHIVE_S3_ACCESS_KEY_ID=your_access_key COLOGNE_LONG_TERM_ARCHIVE_S3_SECRET_ACCESS_KEY=your_secret_key US site, long_term_archive location:: US_LONG_TERM_ARCHIVE_S3_ACCESS_KEY_ID=your_access_key US_LONG_TERM_ARCHIVE_S3_SECRET_ACCESS_KEY=your_secret_key Parameters ---------- site_name : str Name of the site Returns ------- tuple[str, str] (access_key_id, secret_access_key) """ from ccat_data_transfer.config.config import ccat_data_transfer_settings # Construct environment variable names access_key_var = f"{site_name}_{self.name}_S3_ACCESS_KEY_ID" secret_key_var = f"{site_name}_{self.name}_S3_SECRET_ACCESS_KEY" # Get credentials from DynaConf settings access_key_id = getattr(ccat_data_transfer_settings, access_key_var, None) secret_access_key = getattr(ccat_data_transfer_settings, secret_key_var, None) # Fall back to global settings if location-specific ones aren't set if not access_key_id: access_key_id = ccat_data_transfer_settings.s3_access_key_id if not secret_access_key: secret_access_key = ccat_data_transfer_settings.s3_secret_access_key return access_key_id, secret_access_key
[docs] class TapeDataLocation(DataLocation): """Tape-based storage location.""" __tablename__ = "tape_data_location" id = Column(Integer, ForeignKey("data_location.id"), primary_key=True) library_name = Column(String(250)) mount_path = Column(String(500)) __mapper_args__ = {"polymorphic_identity": StorageType.TAPE}
raw_data_package_staging_job_association = Table( "raw_data_package_staging_job_association", Base.metadata, Column("raw_data_package_id", Integer, ForeignKey("raw_data_package.id")), Column("staging_job_id", Integer, ForeignKey("staging_job.id")), ) user_role_association = Table( "user_role_association", Base.metadata, Column("user_id", Integer, ForeignKey("user.id"), primary_key=True), Column("role_id", Integer, ForeignKey("role.id"), primary_key=True), ) instrument_observing_program_association = Table( "instrument_observing_program", Base.metadata, Column("instrument_id", Integer, ForeignKey("instrument.id"), primary_key=True), Column( "observing_program_id", Integer, ForeignKey("observing_program.id"), primary_key=True, ), ) obs_unit_instrument_module_configuration_association = Table( "obs_unit_instrument_module_configuration_association", Base.metadata, Column("obs_unit_id", Integer, ForeignKey("obs_unit.id")), Column( "instrument_module_configuration_id", Integer, ForeignKey("instrument_module_configuration.id"), ), )
[docs] class ObservingProgram(Base): __tablename__ = "observing_program" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) short_name = Column(String(20), nullable=False) description = Column(Text) lead_id = Column(Integer, ForeignKey("user.id"), nullable=True) lead = relationship("User", back_populates="observing_programs") sub_observing_programs = relationship( "SubObservingProgram", back_populates="observing_program", ) instruments = relationship( "Instrument", secondary=instrument_observing_program_association, back_populates="observing_programs", ) obs_units = relationship("ObsUnit", back_populates="observing_program")
[docs] class SubObservingProgram(Base): __tablename__ = "sub_observing_program" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) short_name = Column(String(20), nullable=False) description = Column(Text) observing_program_id = Column( Integer, ForeignKey("observing_program.id"), nullable=False, ) observing_program = relationship( "ObservingProgram", back_populates="sub_observing_programs", ) obs_units = relationship("ObsUnit", back_populates="sub_observing_program")
# The following tables implement a data transfer system from one side to the other with # handshaking and deletion on the origin uncommented # the Source Table needs a sqlalchemy column that resembles a SkyCoord object read from # the ra_deg and dec_deg columns as well as the coordinate_system column and the epoch # column
[docs] class Source(Base): """A source is a celestial object This class serves as a base class for various types of sources. It is a polymorphic class in SQLAlchemy, not instantiated directly, but used to provide common attributes. Subclasses, implemented as separate database tables, inherit from the source class and can have additional specific attributes. See classes that are based on this class for more information on the implemented types of sources. """ __tablename__ = "source" id = Column(Integer, primary_key=True) name = Column( String(250), nullable=False, unique=True, doc="The name of the source" ) type = Column(String(250), nullable=False, doc="The polymorphic identity") __mapper_args__ = { "polymorphic_on": type, "polymorphic_identity": "source", } # Add relationships obs_units = relationship("ObsUnit", back_populates="source")
[docs] class FixedSource(Source): """A source that has a fixed coordinates This class is a subclass of the Source class and inherits all attributes from the Source class. The class implements the skycoord property that returns a SkyCoord object from the ra_deg and dec_deg columns. """ __tablename__ = "fixed_source" __mapper_args__ = {"polymorphic_identity": "fixed_source"} id = Column(Integer, ForeignKey("source.id"), primary_key=True) ra_deg = Column(Float, nullable=False, doc="The right ascension in degrees (ICRS)") dec_deg = Column(Float, nullable=False, doc="The declination in degrees (ICRS)") slam = Column(String(50), doc="Longitude string of the original input") sbet = Column(String(50), doc="Latitude string of the original input") vlsr = Column(Float, doc="The local standard of rest velocity in km/s") frame = Column(String(50), doc="The frame of the coordinates of the original input") @property def skycoord(self): """Return a SkyCoord object from the ra_deg and dec_deg columns""" if self.ra_deg is not None and self.dec_deg is not None: return SkyCoord( ra=self.ra_deg * u.deg, dec=self.dec_deg * u.deg, frame="icrs", ) return None
[docs] class SolarSystemObject(Source): """A source that is a solar system object This class is a subclass of the Source class and inherits all attributes from the Source class. """ __tablename__ = "solar_system_object" __mapper_args__ = {"polymorphic_identity": "solar_system_object"} id = Column(Integer, ForeignKey("source.id"), primary_key=True) eph_name = Column( String(50), nullable=True, doc="Standard ephemeris name of the source", ) # https://naif.jpl.nasa.gov/pub/naif/toolkit_docs/FORTRAN/req/naif_ids.html#NAIF%20Integer%20ID%20codes naif_id = Column(String(50), nullable=True, doc="The NAIF ID of the source")
[docs] class ConstantElevationSource(Source): """A source that is observed at a constant elevation This class is a subclass of the Source class and inherits all attributes from the Source class. """ __tablename__ = "constant_elevation_source" __mapper_args__ = {"polymorphic_identity": "constant_elevation_source"} id = Column(Integer, ForeignKey("source.id"), primary_key=True) ra_deg_min = Column( Float, nullable=False, doc="The minimum right ascension of the area in degrees" ) ra_deg_max = Column( Float, nullable=False, doc="The maximum right ascension of the area in degrees" ) dec_deg_min = Column( Float, nullable=False, doc="The minimum declination of the area in degrees" ) dec_deg_max = Column( Float, nullable=False, doc="The maximum declination of the area in degrees" ) slam_min = Column( String(50), doc="The minimum longitude string of the original input" ) slam_max = Column( String(50), doc="The maximum longitude string of the original input" ) sbet_min = Column( String(50), doc="The minimum latitude string of the original input" ) sbet_max = Column( String(50), doc="The maximum latitude string of the original input" ) vlsr = Column(Float, doc="The local standard of rest velocity in km/s") frame = Column(String(50), doc="The frame of the coordinates of the original input")
[docs] class Line(Base): """A spectral line that is observed by an instrument""" __tablename__ = "line" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False, doc="The name of the spectral line") rest_frequency = Column(Float, nullable=False, doc="The rest frequency of the line") side_band = Column(String(250), nullable=False, doc="The side band of the line") available = Column(Boolean, nullable=False) comment = Column(String(250), nullable=True)
# chai_array_configurations = relationship( # "ChaiArrayConfiguration", back_populates="line" # ) # To be able to also include data from other observatories such as GREAT/SOFIA we # include a specific observatory table
[docs] class Observatory(Base): __tablename__ = "observatory" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False, unique=True) description = Column(String(250), nullable=False) telescopes = relationship("Telescope", back_populates="observatory")
# To prepare for the possibility of multiple telescopes in an observatory we include a # telescope table
[docs] class Telescope(Base): __tablename__ = "telescope" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) description = Column(String(250), nullable=False) lon_deg = Column(Float, nullable=False) lat_deg = Column(Float, nullable=False) alt_m = Column(Float, nullable=False) instruments = relationship("Instrument", back_populates="telescope") observatory_id = Column(Integer, ForeignKey("observatory.id")) observatory = relationship("Observatory", back_populates="telescopes")
# An instrument runs on a specific telescope
[docs] class Instrument(Base): __tablename__ = "instrument" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) instrument_type = Column(String(250), nullable=False) description = Column(String(250)) telescope_id = Column(Integer, ForeignKey("telescope.id")) telescope = relationship("Telescope", back_populates="instruments") modules = relationship("InstrumentModule", back_populates="instrument") observing_programs = relationship( "ObservingProgram", secondary=instrument_observing_program_association, back_populates="instruments", ) available = Column(Boolean, nullable=False)
[docs] class InstrumentModule(Base): __tablename__ = "instrument_module" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) description = Column(String(250)) pixels = Column(Integer, nullable=True, default=1) instrument_id = Column(Integer, ForeignKey("instrument.id")) instrument = relationship("Instrument", back_populates="modules") instrument_module_configurations = relationship( "InstrumentModuleConfiguration", back_populates="instrument_module" ) available = Column(Boolean, nullable=False) raw_data_packages = relationship( "RawDataPackage", back_populates="instrument_module" ) raw_data_files = relationship( "RawDataFile", back_populates="instrument_module", )
[docs] class InstrumentModuleConfiguration(Base): __tablename__ = "instrument_module_configuration" id = Column(Integer, primary_key=True) type = Column(String) instrument_module_id = Column(Integer, ForeignKey("instrument_module.id")) instrument_module = relationship( "InstrumentModule", back_populates="instrument_module_configurations" ) raw_data_files = relationship( "RawDataFile", back_populates="instrument_module_configuration" ) __mapper_args__ = { "polymorphic_on": type, "polymorphic_identity": "instrument_module_configuration", } # Polymorphic mapping
[docs] class ChaiModuleConfiguration(InstrumentModuleConfiguration): __tablename__ = "chai_module_configuration" __mapper_args__ = {"polymorphic_identity": "chai_module_configuration"} id = Column( Integer, ForeignKey("instrument_module_configuration.id"), primary_key=True ) line_id = Column(Integer, ForeignKey("line.id"), nullable=False) line = relationship("Line") config_parameters = Column( JSON, doc="List of instrument configuration parameters", )
[docs] class PrimeCamModuleConfiguration(InstrumentModuleConfiguration): __tablename__ = "prime_cam_module_configuration" __mapper_args__ = { "polymorphic_identity": "prime_cam_module_configuration" } # Polymorphic mapping id = Column( Integer, ForeignKey("instrument_module_configuration.id"), primary_key=True ) config_parameters = Column( JSON, doc="List of instrument configuration parameters", )
[docs] class ObservationConfiguration(Base): __tablename__ = "observation_configuration" id = Column(Integer, primary_key=True) type = Column(String) obs_units = relationship("ObsUnit", back_populates="observation_configuration") __mapper_args__ = { "polymorphic_on": type, "polymorphic_identity": "observation_configuration", } azimuth_range = Column( JSON, nullable=True, doc="Azimuth range lookup table for constant elevation scans", )
[docs] class ChaiObservationConfiguration(ObservationConfiguration): __tablename__ = "chai_observation_configuration" __mapper_args__ = {"polymorphic_identity": "chai_observation_configuration"} id = Column(Integer, ForeignKey("observation_configuration.id"), primary_key=True) chai_tilings = relationship( "ChaiTiling", back_populates="chai_observation_configuration" ) # chai_inpar_parameters = relationship( # "ChaiInparParameter", # secondary=chai_observation_configuration_chai_inpar_parameters_association, # back_populates="chai_observation_configurations", # ) ntilelines = Column( Integer, nullable=True, doc="Number of tile lines to be grouped (for socring in scheduler)", )
[docs] class PrimeCamObservationConfiguration(ObservationConfiguration): __tablename__ = "prime_cam_observation_configuration" __mapper_args__ = { "polymorphic_identity": "prime_cam_observation_configuration" } # Polymorphic mapping id = Column(Integer, ForeignKey("observation_configuration.id"), primary_key=True) version = Column(Integer, nullable=False) history = Column(JSON, nullable=True) mapping_parameters = Column( JSON, nullable=True, doc="List of mapping parameters", )
[docs] class ObsMode(Base): __tablename__ = "obs_mode" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) description = Column(String(250)) obs_units = relationship("ObsUnit", back_populates="obs_mode")
[docs] class ObsUnit(Base): __tablename__ = "obs_unit" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) version = Column(Integer, nullable=False) history = Column(JSON, nullable=True) phase = Column( String(20), nullable=False, doc="CM=commissioning BS=baseline science etc." ) group = Column(String(250), nullable=True) group_type = Column(String(250), nullable=True) equal_tolerance = Column( Float, nullable=True, doc="Tolerance of inbalanced schedule when group_type is equal", ) min_alt = Column(Float, nullable=True) max_alt = Column(Float, nullable=True) min_rotang = Column(Float, nullable=True) max_rotang = Column(Float, nullable=True) nominal_alt = Column(String(250), nullable=True) min_lsa = Column(Float, nullable=True) max_lsa = Column(Float, nullable=True) lsa_margin = Column(Float, nullable=True) cadence = Column(Float, nullable=True) requested_time_h = Column(Float, comment="Mandatory for PrimeCam", nullable=True) unit_duration_h = Column(Float, nullable=False) max_distance_in_map = Column(Float, nullable=True) trans_ref = Column(Float, nullable=True, doc="Reference transmission") priorities = Column(Float, nullable=False) # needs to be float, not integer available = Column( Boolean, nullable=False, doc="Whether this ObsUnit is ready to be scheduled" ) pre_scheduled_basis = Column( Boolean, doc="Whether this Obsunit is executed on pre-scheduled basis, meaning that it has a very low priority outside of the pre-scheduled slots.", ) additional_parameters = Column( JSON, nullable=True, doc="Any special parameters or constraints to be stored" ) source_id = Column(Integer, ForeignKey("source.id"), nullable=True) source = relationship("Source", back_populates="obs_units") observing_program_id = Column( Integer, ForeignKey("observing_program.id"), nullable=False, ) observing_program = relationship("ObservingProgram", back_populates="obs_units") sub_observing_program_id = Column(Integer, ForeignKey("sub_observing_program.id")) sub_observing_program = relationship( "SubObservingProgram", back_populates="obs_units", ) obs_mode_id = Column(Integer, ForeignKey("obs_mode.id"), nullable=True) obs_mode = relationship("ObsMode", back_populates="obs_units") primary_instrument_module_configuration_id = Column( Integer, ForeignKey("instrument_module_configuration.id"), nullable=False ) primary_instrument_module_configuration = relationship( "InstrumentModuleConfiguration", backref="primary_obs_units", ) # Note: it is a bit redundant but the following list includes # primary_instrument_module_configuration as well # so that it is easier to retrieve all instrument module configuration # that are associated to this obsunit instrument_module_configurations = relationship( "InstrumentModuleConfiguration", secondary=obs_unit_instrument_module_configuration_association, backref="obs_units", ) observation_configuration_id = Column( Integer, ForeignKey("observation_configuration.id"), nullable=True ) observation_configuration = relationship( "ObservationConfiguration", back_populates="obs_units" ) executed_obs_units = relationship("ExecutedObsUnit", back_populates="obs_unit") pre_scheduled_slots = relationship("PreScheduledSlot", back_populates="obs_unit") raw_data_files = relationship("RawDataFile", back_populates="obs_unit") raw_data_packages = relationship("RawDataPackage", back_populates="obs_unit")
[docs] class PreScheduledSlot(Base): __tablename__ = "pre_scheduled_slot" id = Column(Integer, primary_key=True) start_time = Column(DateTime, nullable=False) end_time = Column(DateTime, nullable=False) obs_unit_id = Column(Integer, ForeignKey("obs_unit.id"), nullable=False) obs_unit = relationship("ObsUnit", back_populates="pre_scheduled_slots")
[docs] class ExecutedObsUnit(Base): __tablename__ = "executed_obs_unit" id = Column(UUID(as_uuid=True), primary_key=True) start_time = Column(DateTime, nullable=False) end_time = Column(DateTime) status = Column( String(20), nullable=True, doc="running/*success*/*interruped*/unusable/..." ) mean_pwv = Column(Float, nullable=True) mean_elevation = Column(Float, nullable=True) obs_unit_id = Column(Integer, ForeignKey("obs_unit.id"), nullable=False) obs_unit = relationship("ObsUnit", back_populates="executed_obs_units") obs_info = Column( JSON, nullable=True, doc="Static ancillary infomation (e.g. ObsUnit version, tiling ID for CHAI)", ) obs_progress = Column(JSON, nullable=True, doc="Field for progress tracking") raw_data_packages = relationship( "RawDataPackage", back_populates="executed_obs_unit" ) raw_data_files = relationship("RawDataFile", back_populates="executed_obs_unit")
# class ExecutedChaiTiling(Base): # __tablename__ = "executed_chai_tiling" # id = Column(Integer, primary_key=True) # start_time = Column(DateTime, nullable=False) # end_time = Column(DateTime, nullable=False) # success = Column(Boolean, nullable=False) # mean_pwv = Column(Float, nullable=False) # mean_elevation = Column(Float, nullable=False) # # chai_tiling_id = Column(Integer, ForeignKey("chai_tiling.id")) # # chai_tiling = relationship("ChaiTiling", back_populates="executed_chai_tiling") # # chai_inpar_parameter_id = Column(Integer, ForeignKey("chai_inpar_parameter.id")) # # chai_inpar_parameter = relationship( # # "ChaiInparParameter", back_populates="chai_obs_units" # # ) # # secondary_chai_array_configuration = relationship( # # "ChaiArrayConfiguration", # # secondary=\ # # executed_chai_obs_unit_secondary_chai_array_configuration_table, # # back_populates="secondary_executed_chai_obs_units", # # )
[docs] class ChaiTiling(Base): __tablename__ = "chai_tiling" id = Column(Integer, primary_key=True) version = Column(Integer, nullable=False) history = Column(JSON, nullable=True) priority_in_tiling = Column(Integer, nullable=False) tile_id = Column(String(10), nullable=False) tile_offset_x = Column(Float, nullable=False) tile_offset_y = Column(Float, nullable=False) x_or_y = Column(String(2)) tile_unit_scaling_x = Column(Float) tile_unit_scaling_y = Column(Float) edge = Column(String(10)) goal_ncycle = Column(Integer, nullable=False) # tiling is connected to obsunit through observation configuration # obs_unit_id = Column(Integer, ForeignKey("obs_unit.id"), nullable=False) # obs_unit = relationship("ObsUnit") chai_observation_configuration_id = Column( Integer, ForeignKey("chai_observation_configuration.id") ) chai_observation_configuration = relationship( "ChaiObservationConfiguration", back_populates="chai_tilings" ) chai_inpar_parameter_id = Column(Integer, ForeignKey("chai_inpar_parameter.id")) chai_inpar_parameter = relationship( "ChaiInparParameter", back_populates="chai_tilings" )
[docs] class ChaiInparParameter(Base): __tablename__ = "chai_inpar_parameter" id = Column(Integer, primary_key=True) # nullable=True is default name = Column(String(100), nullable=False) version = Column(Integer, nullable=False) history = Column(JSON) chai_tilings = relationship("ChaiTiling", back_populates="chai_inpar_parameter") lam = Column(Float, nullable=False, doc="Map center offset in longitude [arcsec]") bet = Column(Float, nullable=False, doc="Map center offset in latitude [arcsec]") cormap = Column(String(10), nullable=False, doc="Map cood. system") line_range = Column( String(50), doc='Range of velocities [km/s] affected by lines (e.g. "-70:-30:v,-10:10:v")', ) goal_resolution = Column( Float, doc="Spectral resolution [km/s] with which the scientific analysis will be done", ) refname = Column( String(20), doc="Reference name when using the absolute positions. Set NAN to use relative", ) refoffl = Column( Float, doc="Reference position relative to on in longitude [arcsec]" ) refoffb = Column( Float, doc="Reference position relative to on in latitude [arcesc]" ) corref = Column(String(10), doc="Reference cood. system") mode = Column(String(10), doc="Observing mode; otfl or otfb") otfpattern = Column( String(50), doc="OTF pattern file name specified in kosma_software" ) ton = Column(Float, doc="On integration time [sec]") toff = Column( Float, doc="Forced off time [s] (-1 to use the default calculation = default)", ) repetition = Column(Integer, doc="Repetition number") offononoff = Column( Integer, doc="Sequence of OFF and ON (1 = OFF-ON-ON-OFF (default), 0 = OFF-ON)", ) stepl = Column(Float, doc="Step size in longitude [arcsec]") stepb = Column(Float, doc="Step size in latitude [arcsec]") mapsizel = Column(Float, doc="Size of map in longitude [arcsec]") mapsizeb = Column(Float, doc="Size of map in latitude [arcsec]") nmapl = Column(Integer, doc="number of raster positions along longitude") nmapb = Column(Integer, doc="number of raster positions along latitude") mapangle = Column(Float, doc="Position angle of the map (counter-clock) [degree]") crosssizel = Column(Float, doc="Size of cross in longitude [arcsec]") crosssizeb = Column(Float, doc="Size of cross in latitude [arcsec]") reverseflg = Column( Integer, doc=" OTF scan direction in cross : 0 = (+x,+y), 1 = (-x,-y), 2 = (-x,+x,-y,+y)", ) scan_dir = Column(Integer, doc="OTF scan direction (1 is +x or +y, -1 is -x or -y)") scan_order = Column( Integer, doc="Order of OTF lines (1 is +x or +y, -1 is -x or -y)" ) evendump = Column( Integer, doc="If 1 allow even number of dump positions without hitting the center. Default = 0", ) novertical = Column( Integer, doc="If 1 skip the second (vertical) scan for cross observations. Default = 0", ) pointingflg = Column( Integer, doc="Flag to indicate that it is a pointing session. Default = 0", ) offperload = Column(Integer, doc="Define how many off for one load measurment") onperload = Column( Integer, doc="Define how many on positions for one load measurement" ) repperload = Column( Integer, doc="Define how many repetition for one load measurment" ) lineperoff = Column( Integer, doc="Define how many otf scan lines for one off measurement" ) onperoff = Column( Integer, doc="Define how many on positions for one off measurement" ) offperpattern = Column(Integer, doc="Define how many off measurement per pattern") refpoint = Column(String(20), doc="Reference point (as is used in setpoint)") act_pixflg = Column( Integer, doc="Flag to use the actual pixel position of refpoint. Default = 0", ) nextflg = Column( String(1), doc="If Y, enable telescope moving when writing data. Default = N", )
# chai_observation_configurations = relationship( # "ChaiObservationConfiguration", # secondary=chai_observation_configuration_chai_inpar_parameters_association, # back_populates="chai_inpar_parameters", # )
[docs] class Role(Base): __tablename__ = "role" id = Column(Integer, primary_key=True) name = Column(String(150), unique=True) description = Column(String(150), nullable=True) # GitHub team mappings for automatic role assignment github_team_mappings = Column( JSON, nullable=True, doc="GitHub teams that map to this role" ) # Permission scopes for this role permissions = Column( JSON, nullable=True, doc="List of permissions granted to this role" ) users = relationship( "User", secondary=user_role_association, back_populates="roles" )
[docs] class User(Base): __tablename__ = "user" id = Column(Integer, primary_key=True) email = Column(String(150)) username = Column(String(150)) first_name = Column(String(150), nullable=True) last_name = Column(String(150), nullable=True) title = Column(String(150), nullable=True) affiliation = Column(Text, nullable=True) password = Column(String(150)) # GitHub integration fields github_id = Column(String(50), nullable=True, unique=True, doc="GitHub user ID") github_username = Column(String(150), nullable=True, doc="GitHub username") # User preferences stored as JSON preferences = Column(JSON, nullable=True, doc="User preferences and settings") roles = relationship( "Role", secondary=user_role_association, back_populates="users" ) # Clean up duplicate fields (removing duplicates) last_login_at = Column(DateTime()) current_login_at = Column(DateTime()) last_login_ip = Column(String(100)) current_login_ip = Column(String(100)) login_count = Column(Integer) active = Column(Boolean()) confirmed_at = Column(DateTime()) # Relationships observing_programs = relationship("ObservingProgram", back_populates="lead") api_tokens = relationship( "ApiToken", back_populates="user", cascade="all, delete-orphan" )
[docs] class ApiToken(Base): """API tokens for programmatic access to the API""" __tablename__ = "api_token" id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey("user.id"), nullable=False) user = relationship("User", back_populates="api_tokens") # Token identification name = Column(String(100), nullable=False, doc="Human-readable name for the token") token_hash = Column( String(255), nullable=False, unique=True, doc="Hashed token value" ) token_prefix = Column( String(10), nullable=False, doc="First few characters for identification" ) # Token permissions and scopes scopes = Column(JSON, nullable=True, doc="List of permission scopes for this token") # Token lifecycle created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, ) expires_at = Column( DateTime(timezone=True), nullable=True, doc="Token expiration time" ) last_used_at = Column( DateTime(timezone=True), nullable=True, doc="Last time token was used" ) active = Column( Boolean, default=True, nullable=False, doc="Whether token is active" ) # Usage tracking usage_count = Column( Integer, default=0, nullable=False, doc="Number of times token was used" ) last_used_ip = Column(String(100), nullable=True, doc="IP address of last usage")
[docs] def is_expired(self): """Check if token is expired""" if self.expires_at is None: return False return datetime.now(timezone.utc) > self.expires_at
[docs] def is_valid(self): """Check if token is valid (active and not expired)""" return self.active and not self.is_expired()
[docs] class DataTransferRoute(Base): """Defines routes for data transfer between sites and locations. The database implements a flexible routing system that supports: 1. Direct routes between specific locations 2. Relay routes through intermediate sites 3. Custom location-to-location overrides """ __tablename__ = "data_transfer_route" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) # Site-level routing origin_site_id = Column(Integer, ForeignKey("site.id"), nullable=False) origin_site = relationship("Site", foreign_keys=[origin_site_id]) destination_site_id = Column(Integer, ForeignKey("site.id"), nullable=False) destination_site = relationship("Site", foreign_keys=[destination_site_id]) # Route configuration route_type = Column(SQLAlchemyEnum(RouteType), nullable=False) transfer_method = Column(String(250), nullable=False) # "bbcp", "s3", "cp" # Optional location-specific overrides origin_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=True) origin_location = relationship("DataLocation", foreign_keys=[origin_location_id]) destination_location_id = Column( Integer, ForeignKey("data_location.id"), nullable=True ) destination_location = relationship( "DataLocation", foreign_keys=[destination_location_id] ) # Relay configuration relay_site_id = Column(Integer, ForeignKey("site.id"), nullable=True) relay_site = relationship("Site", foreign_keys=[relay_site_id]) __table_args__ = ( UniqueConstraint( "origin_site_id", "destination_site_id", "route_type", name="uix_data_transfer_route", ), )
data_transfer_package_files_association = Table( "data_transfer_package_files", Base.metadata, Column( "data_transfer_package_id", Integer, ForeignKey("data_transfer_package.id"), primary_key=True, ), Column( "file_id", UUID(as_uuid=True), ForeignKey("raw_data_file.id"), primary_key=True ), )
[docs] class RawDataFile(Base): """Represents a raw data file from an instrument.""" __tablename__ = "raw_data_file" id = Column(UUID(as_uuid=True), primary_key=True) name = Column(String(250), nullable=False) relative_path = Column(String(250), nullable=False) obs_unit_id = Column(Integer, ForeignKey("obs_unit.id"), nullable=False) obs_unit = relationship("ObsUnit", back_populates="raw_data_files") created_at = Column( DateTime(timezone=True), nullable=False, default=lambda: datetime.now(timezone.utc), ) instrument_module_id = Column( Integer, ForeignKey("instrument_module.id"), nullable=False ) instrument_module = relationship( "InstrumentModule", back_populates="raw_data_files" ) instrument_module_configuration_id = Column( Integer, ForeignKey("instrument_module_configuration.id") ) instrument_module_configuration = relationship( "InstrumentModuleConfiguration", back_populates="raw_data_files" ) file_type = Column(String(250), nullable=False) size = Column(BigInteger, nullable=False) checksum = Column(String(250), nullable=False) description = Column(Text, nullable=True) # Source location tracking source_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=False) source_location = relationship("DataLocation") # Package relationships raw_data_package_id = Column( Integer, ForeignKey("raw_data_package.id", ondelete="CASCADE"), nullable=True ) raw_data_package = relationship("RawDataPackage", back_populates="raw_data_files") data_transfer_package_id = Column( Integer, ForeignKey("data_transfer_package.id"), nullable=True ) data_transfer_package = relationship( "DataTransferPackage", back_populates="raw_data_files" ) executed_obs_unit_id = Column( UUID(as_uuid=True), ForeignKey("executed_obs_unit.id"), nullable=True ) executed_obs_unit = relationship("ExecutedObsUnit", back_populates="raw_data_files") # Physical copies state = Column(SQLAlchemyEnum(PackageState), nullable=True) physical_copies = relationship( "RawDataFilePhysicalCopy", back_populates="raw_data_file", cascade="all, delete-orphan", )
[docs] class RawDataPackageMetadata(Base): """Model for storing additional metadata for raw data packages. This table stores metadata that is not part of the core database models but is needed for IVOA-compatible metadata generation. It includes: - Instrument-specific parameters - Additional quality metrics - Extended provenance information - Custom metadata fields """ __tablename__ = "raw_data_package_metadata" id = Column(Integer, primary_key=True) raw_data_package_id = Column( Integer, ForeignKey("raw_data_package.id", ondelete="CASCADE"), nullable=False ) created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc) ) updated_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc), ) # Instrument-specific metadata (JSON) instrument_specific = Column( JSON, nullable=True, comment="Instrument-specific parameters and configuration" ) # Additional quality metrics quality_metrics = Column( JSON, nullable=True, comment="Additional quality metrics not in core models" ) # Extended provenance provenance = Column(JSON, nullable=True, comment="Extended provenance information") # Custom metadata fields custom_metadata = Column(JSON, nullable=True, comment="Custom metadata fields") # Relationships raw_data_package = relationship("RawDataPackage", back_populates="package_metadata") def __repr__(self): return f"<RawDataPackageMetadata(id={self.id}, raw_data_package_id={self.raw_data_package_id})>"
[docs] class RawDataPackage(Base): """A raw data package is a bundle of raw data files that were observed in an observation unit. But they should never be larger than 50GB in size. """ __tablename__ = "raw_data_package" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False) relative_path = Column( String(250), nullable=False, doc=""" This is the relative path to the raw data package. The absolute path of the location of this file for each archive is stored in the DataLocation table. The path is relative to the raw_data_path of the DataLocation. """, ) size = Column(BigInteger, nullable=False) executed_obs_unit_id = Column( UUID(as_uuid=True), ForeignKey("executed_obs_unit.id", ondelete="CASCADE"), nullable=False, ) executed_obs_unit = relationship( "ExecutedObsUnit", back_populates="raw_data_packages" ) instrument_module_id = Column( Integer, ForeignKey("instrument_module.id"), nullable=False ) instrument_module = relationship( "InstrumentModule", back_populates="raw_data_packages" ) obs_unit_id = Column( Integer, ForeignKey("obs_unit.id", ondelete="CASCADE"), nullable=False ) obs_unit = relationship("ObsUnit", back_populates="raw_data_packages") created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, ) checksum = Column(String(250), nullable=False) raw_data_files = relationship("RawDataFile", back_populates="raw_data_package") data_transfer_package_id = Column( Integer, ForeignKey("data_transfer_package.id", ondelete="CASCADE"), nullable=True, ) data_transfer_package = relationship( "DataTransferPackage", back_populates="raw_data_packages" ) long_term_archive_transfers = relationship( "LongTermArchiveTransfer", back_populates="raw_data_package", cascade="all, delete-orphan", ) analyze_status = Column( SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING ) physical_copies = relationship( "RawDataPackagePhysicalCopy", back_populates="raw_data_package", cascade="all, delete-orphan", ) package_metadata = relationship( "RawDataPackageMetadata", back_populates="raw_data_package", uselist=False, ) status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING) state = Column( SQLAlchemyEnum(PackageState), nullable=True, default=PackageState.WAITING, ) staging_jobs = relationship( "StagingJob", secondary=raw_data_package_staging_job_association, back_populates="raw_data_packages", ) retry_count = Column(Integer, default=0)
[docs] class DataTransferPackage(Base): __tablename__ = "data_transfer_package" id = Column(Integer, primary_key=True) hash_id = Column(String(250), nullable=False) file_name = Column(String(250), nullable=False) size = Column(BigInteger, nullable=True) checksum = Column(String(250), nullable=True) relative_path = Column(String(250), nullable=False) origin_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=False) origin_location = relationship("DataLocation", foreign_keys=[origin_location_id]) raw_data_files = relationship( "RawDataFile", back_populates="data_transfer_package", ) raw_data_packages = relationship( "RawDataPackage", back_populates="data_transfer_package", ) data_transfers = relationship( "DataTransfer", back_populates="data_transfer_package" ) status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING) failure_error_message = Column(Text, nullable=True) physical_copies = relationship( "DataTransferPackagePhysicalCopy", back_populates="data_transfer_package", cascade="all, delete-orphan", ) # Counts the attempts to generate the data transfer package retry_count = Column(Integer, default=0)
[docs] class DataTransfer(Base): """Tracks data transfers between locations.""" __tablename__ = "data_transfer" id = Column(Integer, primary_key=True) process_id = Column(String(250)) retry_count = Column(Integer, nullable=False, default=0) start_time = Column(DateTime) end_time = Column(DateTime) data_transfer_method = Column(String(250), default="bbcp") status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING) failure_error_message = Column(Text, nullable=True) transfer_program_log = Column(Text) # Location references origin_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=False) origin_location = relationship("DataLocation", foreign_keys=[origin_location_id]) destination_location_id = Column( Integer, ForeignKey("data_location.id"), nullable=False ) destination_location = relationship( "DataLocation", foreign_keys=[destination_location_id] ) # Package reference data_transfer_package_id = Column( Integer, ForeignKey("data_transfer_package.id"), nullable=False ) data_transfer_package = relationship( "DataTransferPackage", back_populates="data_transfers" ) unpack_status = Column( SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING ) unpack_failure_error_message = Column(Text, nullable=True) unpack_start_time = Column(DateTime) unpack_end_time = Column(DateTime) unpack_log = Column(Text) unpack_retry_count = Column(Integer, nullable=False, default=0) logs = relationship("DataTransferLog", back_populates="data_transfer")
[docs] class SystemLog(Base): __tablename__ = "system_log" id = Column(Integer, primary_key=True) type = Column(String(250), nullable=False) __mapper_args__ = { "polymorphic_on": type, "polymorphic_identity": "system_log", }
[docs] class DataTransferLog(SystemLog): """Log entries for data transfers with references to log files. This model implements a lightweight approach where: - Basic status info and log file path are stored in the database - Full command outputs are stored in files - Detailed metrics are stored in InfluxDB """ __tablename__ = "data_transfer_log" __mapper_args__ = { "polymorphic_identity": "data_transfer_log", } id = Column(Integer, ForeignKey("system_log.id"), primary_key=True) data_transfer_id = Column(Integer, ForeignKey("data_transfer.id"), nullable=False) data_transfer = relationship("DataTransfer", back_populates="logs") # Basic info timestamp = Column(DateTime, nullable=False) status = Column( String(50), nullable=False, doc="Status of this transfer attempt (success/failure)", ) # Path to the log file log_path = Column( String(500), nullable=False, doc="Path to file containing full command output" ) @property def content(self) -> Optional[str]: """Read and return the full log content if file exists.""" if self.log_path and os.path.exists(self.log_path): with open(self.log_path, "r") as f: return f.read() return None
# def __repr__(self): # return ( # f"<DataTransferLog(id={self.id}, " # f"data_transfer_id={self.data_transfer_id}, " # f"timestamp={self.timestamp}, " # f"status={self.status})>" # )
[docs] class DataArchive(Base): __tablename__ = "data_archive" id = Column(Integer, primary_key=True) name = Column(String(250), nullable=False)
[docs] class PhysicalCopy(Base): """Base class for tracking physical copies of files across different storage locations. This class implements a polymorphic pattern to track physical copies of different types of files (RawDataFile, RawDataPackage, DataTransferPackage) across different storage locations and types. Each physical copy represents an actual file on disk or other storage medium. """ __tablename__ = "physical_copy" id = Column(Integer, primary_key=True) type = Column(String(50), nullable=False) # Location reference data_location_id = Column(Integer, ForeignKey("data_location.id"), nullable=False) data_location = relationship("DataLocation") created_at = Column( DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, ) verified_at = Column(DateTime(timezone=True), nullable=True) status = Column( SQLAlchemyEnum(PhysicalCopyStatus), nullable=False, default=PhysicalCopyStatus.PRESENT, ) checksum = Column(String(250), nullable=True) deletion_task_id = Column(Integer, nullable=True) deleted_at = Column(DateTime(timezone=True), nullable=True) __mapper_args__ = { "polymorphic_on": type, "polymorphic_identity": "physical_copy", }
[docs] class RawDataFilePhysicalCopy(PhysicalCopy): """Tracks physical copies of individual raw data files.""" __tablename__ = "raw_data_file_physical_copy" __mapper_args__ = {"polymorphic_identity": "raw_data_file_physical_copy"} id = Column(Integer, ForeignKey("physical_copy.id"), primary_key=True) raw_data_file_id = Column( UUID(as_uuid=True), ForeignKey("raw_data_file.id"), nullable=False ) raw_data_file = relationship("RawDataFile", back_populates="physical_copies") @property def full_path(self): """Get the full path/S3 key for this physical copy.""" if isinstance(self.data_location, DiskDataLocation): return os.path.join( self.data_location.path, self.raw_data_file.relative_path ) elif isinstance(self.data_location, S3DataLocation): # For S3, return just the relative path - the actual S3 key construction # should be handled by the data-transfer package using the shared utility functions return self.raw_data_file.relative_path elif isinstance(self.data_location, TapeDataLocation): return os.path.join( self.data_location.mount_path, self.raw_data_file.relative_path ) return self.raw_data_file.relative_path
[docs] class RawDataPackagePhysicalCopy(PhysicalCopy): """Tracks physical copies of raw data packages.""" __tablename__ = "raw_data_package_physical_copy" __mapper_args__ = {"polymorphic_identity": "raw_data_package_physical_copy"} id = Column(Integer, ForeignKey("physical_copy.id"), primary_key=True) raw_data_package_id = Column( Integer, ForeignKey("raw_data_package.id", ondelete="CASCADE"), nullable=False ) raw_data_package = relationship("RawDataPackage", back_populates="physical_copies") @property def full_path(self): """Get the full path/S3 key for this physical copy.""" if isinstance(self.data_location, DiskDataLocation): return os.path.join( self.data_location.path, self.raw_data_package.relative_path ) elif isinstance(self.data_location, S3DataLocation): # For S3, return just the relative path - the actual S3 key construction # should be handled by the data-transfer package using the shared utility functions return self.raw_data_package.relative_path elif isinstance(self.data_location, TapeDataLocation): return os.path.join( self.data_location.mount_path, self.raw_data_package.relative_path ) return self.raw_data_package.relative_path
[docs] class DataTransferPackagePhysicalCopy(PhysicalCopy): """Tracks physical copies of data transfer packages.""" __tablename__ = "data_transfer_package_physical_copy" __mapper_args__ = {"polymorphic_identity": "data_transfer_package_physical_copy"} id = Column(Integer, ForeignKey("physical_copy.id"), primary_key=True) data_transfer_package_id = Column( Integer, ForeignKey("data_transfer_package.id", ondelete="CASCADE"), nullable=False, ) data_transfer_package = relationship( "DataTransferPackage", back_populates="physical_copies" ) @property def full_path(self): """Get the full path/S3 key for this physical copy.""" if isinstance(self.data_location, DiskDataLocation): return os.path.join( self.data_location.path, self.data_transfer_package.relative_path ) elif isinstance(self.data_location, S3DataLocation): # For S3, return just the relative path - the actual S3 key construction # should be handled by the data-transfer package using the shared utility functions return self.data_transfer_package.relative_path elif isinstance(self.data_location, TapeDataLocation): return os.path.join( self.data_location.mount_path, self.data_transfer_package.relative_path ) return self.data_transfer_package.relative_path
[docs] class LongTermArchiveTransfer(Base): __tablename__ = "long_term_archive_transfer" id = Column(Integer, primary_key=True) site_id = Column(Integer, ForeignKey("site.id")) site = relationship("Site", back_populates="long_term_archive_transfers") origin_data_location_id = Column(Integer, ForeignKey("data_location.id")) origin_data_location = relationship( "DataLocation", foreign_keys=[origin_data_location_id] ) destination_data_location_id = Column(Integer, ForeignKey("data_location.id")) destination_data_location = relationship( "DataLocation", foreign_keys=[destination_data_location_id] ) raw_data_package_id = Column(Integer, ForeignKey("raw_data_package.id")) raw_data_package = relationship( "RawDataPackage", back_populates="long_term_archive_transfers" ) # Store logs directly in the LongTermArchiveTransfer model as a text field logs = relationship( "LongTermArchiveTransferLog", back_populates="long_term_archive_transfer" ) # Long term archive transfer status status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING) failure_error_message = Column(Text, nullable=True) start_time = Column(DateTime, nullable=True) end_time = Column(DateTime, nullable=True) attempt_count = Column(Integer, nullable=False, default=0) # Add fields for tracking attempts last_attempt_time = Column(DateTime, nullable=True) # Add a field to store error messages error_message = Column(Text, nullable=True) @hybrid_property def requires_intervention(self): return self.attempt_count >= 3 @requires_intervention.expression def requires_intervention(cls): return case((cls.attempt_count >= 3, True), else_=False) __table_args__ = ( UniqueConstraint( "raw_data_package_id", "origin_data_location_id", "destination_data_location_id", name="uix_long_term_archive_transfer_location", ), )
[docs] class LongTermArchiveTransferLog(SystemLog): __tablename__ = "long_term_archive_transfer_log" id = Column(Integer, ForeignKey("system_log.id"), primary_key=True) long_term_archive_transfer_id = Column( Integer, ForeignKey("long_term_archive_transfer.id"), nullable=False ) long_term_archive_transfer = relationship( "LongTermArchiveTransfer", back_populates="logs" ) log = Column(Text, nullable=False) timestamp = Column(DateTime, nullable=False) __mapper_args__ = { "polymorphic_identity": "long_term_archive_transfer_log", }
[docs] class StagingJob(Base): __tablename__ = "staging_job" id = Column(Integer, primary_key=True) status = Column(SQLAlchemyEnum(Status), nullable=False, default=Status.PENDING) failure_error_message = Column(Text, nullable=True) start_time = Column(DateTime, nullable=True) active = Column(Boolean, nullable=False, default=True) end_time = Column(DateTime, nullable=True) retry_count = Column(Integer, nullable=False, default=0) raw_data_packages = relationship( "RawDataPackage", secondary=raw_data_package_staging_job_association, back_populates="staging_jobs", ) logs = relationship("StagingJobLog", back_populates="staging_job") origin_data_location_id = Column( Integer, ForeignKey("data_location.id"), nullable=False ) origin_data_location = relationship( "DataLocation", foreign_keys=[origin_data_location_id] ) destination_data_location_id = Column( Integer, ForeignKey("data_location.id"), nullable=False ) destination_data_location = relationship( "DataLocation", foreign_keys=[destination_data_location_id] )
[docs] class StagingJobLog(SystemLog): __tablename__ = "staging_job_log" id = Column(Integer, ForeignKey("system_log.id"), primary_key=True) staging_job_id = Column(Integer, ForeignKey("staging_job.id"), nullable=False) staging_job = relationship("StagingJob", back_populates="logs") log = Column(Text, nullable=False) timestamp = Column(DateTime, nullable=False) __mapper_args__ = { "polymorphic_identity": "staging_job_log", }