Source code for ccat_ops_db.utils

import logging
from datetime import datetime

import sqlalchemy
from astropy import units as u
from astropy.coordinates import SkyCoord
from sqlalchemy import exc
import json

from . import models


[docs] def add_or_update_source(session, values, source_type=None): log = logging.getLogger(__name__) log.debug("Adding or updating source: %s", values) if source_type is None or source_type == 'fixed_source': values["type"] = "fixed_source" source_model = models.FixedSource if values["frame"] == "icrs": coord = SkyCoord( values["slam"], values["sbet"], unit=(u.hourangle, u.deg), frame=values["frame"], ) values["ra_deg"], values["dec_deg"] = float(coord.ra.deg), float( coord.dec.deg) if values["frame"] == "galactic": coord = SkyCoord( values["slam"], values["sbet"], unit=(u.deg, u.deg), frame=values["frame"], ) equatorial = coord.transform_to("icrs") values["ra_deg"], values["dec_deg"] = float(equatorial.ra.deg), float( equatorial.dec.deg ) elif source_type == "solar_system_object": values["type"] = source_type source_model = models.SolarSystemObject elif source_type == "constant_elevation_source": values["type"] = source_type source_model = models.ConstantElevationSource coord_min = SkyCoord( values["slam_min"], values["sbet_min"], unit=(u.hourangle, u.deg), frame=values["frame"] ) values["ra_deg_min"], values["dec_deg_min"] = float(coord_min.ra.deg), float( coord_min.dec.deg ) coord_max = SkyCoord( values["slam_max"], values["sbet_max"], unit=(u.hourangle, u.deg), frame=values["frame"] ) values["ra_deg_max"], values["dec_deg_max"] = float(coord_max.ra.deg), float( coord_max.dec.deg ) try: # Check if a source with the given name already exists try: source = ( session.query(source_model) .filter_by( name=values["name"], type=values["type"], ) .one() ) except sqlalchemy.orm.exc.NoResultFound: log.debug("No source found with name %s", values["name"]) source = None if source is not None: log.info("Source found with name %s", values["name"]) for column in source.__table__.columns: column_name = column.name if column_name in values: if source.type == source_type: old_value = getattr(source, column_name) new_value = values[column_name] updated_column = False if isinstance(old_value, str) and isinstance(new_value, str): if old_value.strip() != new_value.strip(): updated_column = True elif isinstance(old_value, float) and isinstance( new_value, str ): new_value = float(new_value) if abs(old_value - new_value) > 1e-6: updated_column = True elif isinstance(old_value, float) and isinstance( new_value, float ): if abs(old_value - new_value) > 1e-6: updated_column = True elif old_value != new_value: updated_column = True setattr(source, column_name, new_value) if updated_column: log.warning( "Value for column %s has changed from %s to %s", column_name, old_value, new_value, ) else: setattr(source, column_name, values[column_name]) else: # If not, create a new source record filtered_values = {} for key in values: if hasattr(source_model, key.lower()): try: filtered_values[key.lower()] = values[key] except AttributeError: filtered_values[key.lower()] = values[key] source = source_model( **filtered_values, ) session.add(source) # Commit the transaction session.commit() log.debug("Source added or updated successfully.") except exc.IntegrityError as e: log.error(e) # Handle the integrity error (e.g., duplicate name) session.rollback() log.warning("A record with the name %s already exists.", key)
[docs] def add_inpar_and_tiling( session, observation_configuration, inpar_parameters_list=None, tiling_parameters=None, ): """ Add inpar and tiling parameters to the observation_configuration Parameters ---------- session : database session observation_configuration : ccat.opsdb.observation_configuration Observation configuration with inpar and tiling to be associated inpar_parameters_list : list List of dictionary of inpar_parameters tiling_parameters : list List of dictionary of each line in tiling Returns ------- observation_configuration : ccat.opsdb.observation_configuration Updated observation configuration """ log = logging.getLogger(__name__) if inpar_parameters_list is not None: for inpar_parameters in inpar_parameters_list: # check whether this inpar file already exists inpar_entry = ( session.query(models.ChaiInparParameter) .filter_by(name=inpar_parameters["name"]) .all() ) if len(inpar_entry) == 0: # add as a new one filtered_inpar_parameter_values = {} for key in inpar_parameters: if hasattr(models.ChaiInparParameter, key): filtered_inpar_parameter_values[key] = inpar_parameters[key] filtered_inpar_parameter_values["version"] = 1 chai_inpar_parameter = models.ChaiInparParameter( **filtered_inpar_parameter_values, ) session.add(chai_inpar_parameter) elif len(inpar_entry) == 1: # update with a warning history = inpar_entry[0].history if history is not None: history = json.loads(history) if not isinstance(history, list): history = [history] updateflg = 0 for column in inpar_entry[0].__dict__: column_name = column if column_name in inpar_parameters: if getattr(inpar_entry[0], column_name) \ != inpar_parameters[column_name]: log.warning( f"inpar parameter {column_name} in {inpar_entry[0].name} is updated from {getattr(inpar_entry[0], column_name)} to {inpar_parameters[column_name]}") newhistory = { "timestamp":datetime.now().isoformat(), "key":column_name, "old_value":getattr(inpar_entry[0], column_name) } if history is None: history = [newhistory] else: history.append(newhistory) setattr( inpar_entry[0], column_name, inpar_parameters[column_name], ) updateflg = 1 if updateflg == 1: setattr(inpar_entry[0], 'history', json.dumps(history)) ver = inpar_entry[0].version setattr(inpar_entry[0], 'version', ver+1) else: log.error("Two inpar files with same name!") exit(1) if tiling_parameters is not None: for tiling in tiling_parameters: # Fill in the tiling values filtered_tiling_values = {} for key in tiling: if hasattr(models.ChaiTiling, key): filtered_tiling_values[key] = tiling[key] # associate to inpar file try: inpar_parameters = ( session.query(models.ChaiInparParameter) .filter_by(name=tiling["inpar_file"]) .one() ) except sqlalchemy.orm.exc.NoResultFound: print(f"inpar {tiling['inpar_file']} not found") exit(1) filtered_tiling_values["chai_inpar_parameter"] = inpar_parameters filtered_tiling_values["version"] = 1 chai_tiling = models.ChaiTiling( **filtered_tiling_values, ) session.add(chai_tiling) observation_configuration.chai_tilings += [chai_tiling] return observation_configuration
[docs] def add_or_update_obs_unit( session, values, obs_unit_type="chai_obs_unit", inpar_parameters_list=None, tiling_parameters=None, instrument_module_configurations=None, azimuth_range=None, mapping_parameters=None, ): log = logging.getLogger(__name__) try: #values["source"] = values["name"] # Check if an user with the given name already exists obs_unit = ( session.query(models.ObsUnit) .filter_by( name=values["name"], ) .first() ) if obs_unit: log.debug("ObsUnit found with name %s", values["name"]) # If it exists, update the description ver = obs_unit.version setattr(obs_unit, 'version', ver+1) history = obs_unit.history newhistory = { "timestamp":datetime.now().isoformat(), "key":"all", "old_value":"not-tracked"} if history is None: history = [newhistory] else: history = json.loads(history) if not isinstance(history, list): history = [history] history.append(newhistory) setattr(obs_unit, 'history', json.dumps(history)) observation_configuration = obs_unit.observation_configuration if observation_configuration: if observation_configuration.type == "chai_observation_configuration": # Not replacing, just adding more inpars and tiles # because it is very difficult to relate which individual tile lines # corresponds to new tile lines. # Add inpar and tiling parameters to the observation_configuration observation_configuration = add_inpar_and_tiling( session, observation_configuration, inpar_parameters_list=inpar_parameters_list, tiling_parameters=tiling_parameters, ) elif ( observation_configuration.type == "prime_cam_observation_configuration" ): if mapping_parameters: ver = observation_configuration.version setattr(observation_configuration, 'version', ver+1) history = observation_configuration.history newhistory = { "timestamp":datetime.now().isoformat(), "key":"mapping_parameters", "old_value":getattr(observation_configuration, "mapping_parameters") } if history is None: history = [newhistory] else: history = json.loads(history) if not isinstance(history, list): history = [history] history.append(newhistory) setattr(observation_configuration, 'history', json.dumps(history)) setattr( observation_configuration, "mapping_parameters", mapping_parameters, ) if azimuth_range is not None: setattr(observation_configuration, "azimuth_range", azimuth_range) for column in obs_unit.__table__.columns: column_name = column.name if column_name in values: if column_name == "source": observing_program = ( session.query(models.Source) .filter_by(name=values["source"]) .one() ) setattr(obs_unit, column_name, observing_program) if column_name == "observing_program": observing_program = ( session.query(models.ObservingProgram) .filter_by(short_name=values["observing_program"]) .one() ) setattr(obs_unit, column_name, observing_program) elif column_name == "sub_observing_program": if values["sub_observing_program"] == "None": sub_observing_program = None else: observing_program = ( session.query(models.ObservingProgram) .filter_by(short_name=values["observing_program"]) .one() ) try: sub_observing_program = ( session.query(models.SubObservingProgram) .filter_by( name=values["sub_observing_program"], observing_program=observing_program, ) .one() ) except sqlalchemy.orm.exc.NoResultFound: # TODO handle this situation properly print("SubObservingProgram not found", values["sub_observing_program"]) sub_observing_program = None setattr(obs_unit, column_name, sub_observing_program) elif column_name == "obs_mode": obs_mode = ( session.query(models.ObsMode) .filter_by(short_name=values["obs_mode"]) .one() ) setattr(obs_unit, column_name, obs_mode) elif column_name == "observation_configuration": logging.debug("Updating observation_configuration") else: setattr(obs_unit, column_name, values[column_name]) else: # First we need to check if the observation_configuration is a # chai_observation_configuration or a prime_cam_observation_configuration # and create the appropriate object if obs_unit_type == "chai_obs_unit": # If it is a chai_observation_configuration we need to fill in the # chai_inpar_parameter and chai_tiling observation_configuration = models.ChaiObservationConfiguration( type="chai_observation_configuration", ) session.add(observation_configuration) # # Load the Chai inpar parameters for this obs_unit # filtered_inpar_parameter_values = {} # for key in inpar_parameters: # if hasattr(models.ChaiInparParameter, key): # filtered_inpar_parameter_values[key] = inpar_parameters[key] # chai_inpar_parameter = models.ChaiInparParameter( # **filtered_inpar_parameter_values, # ) # session.add(chai_inpar_parameter) # observation_configuration.chai_inpar_parameters += [ # chai_inpar_parameter # ] # Add inpar and tiling parameters to the observation_configuration observation_configuration = add_inpar_and_tiling( session, observation_configuration, inpar_parameters_list=inpar_parameters_list, tiling_parameters=tiling_parameters, ) # Add ntilelines observation_configuration.ntilelines = values["ntilelines"] elif obs_unit_type == "prime_cam_obs_unit": # If it is a prime_cam_observation_configuration observation_configuration = models.PrimeCamObservationConfiguration( type="prime_cam_observation_configuration", ) session.add(observation_configuration) if mapping_parameters: setattr( observation_configuration, "mapping_parameters", mapping_parameters, ) setattr( observation_configuration, 'version', 1 ) # add azimuth range if azimuth_range is not None: setattr(observation_configuration, "azimuth_range", azimuth_range) values["observation_configuration"] = observation_configuration instrument_module_configuration_list = [] for instrument_module_configuration in instrument_module_configurations: # Here we have to create or load the module configuration and log them # in the database try: primary = instrument_module_configuration["primary"] except KeyError: primary = False if ( instrument_module_configuration["type"] == "chai_module_configuration" ): instrument = ( session.query(models.Instrument).filter_by( name=instrument_module_configuration["instrument"]).one() ) instrument_module = ( session.query(models.InstrumentModule) .filter_by( instrument=instrument, name=instrument_module_configuration["instrument_module"], ) .one() ) line = ( session.query(models.Line) .filter_by(name=instrument_module_configuration["line"]) .one() ) try: instrument_module_configuration_db = ( session.query(models.ChaiModuleConfiguration) .filter_by( instrument_module=instrument_module, line=line, ) .one() ) except sqlalchemy.orm.exc.NoResultFound: instrument_module_configuration_db = ( models.ChaiModuleConfiguration( instrument_module=instrument_module, line=line, ) ) session.add(instrument_module_configuration_db) elif ( instrument_module_configuration["type"] == "prime_cam_module_configuration" ): instrument = ( session.query(models.Instrument) .filter_by(name=instrument_module_configuration["instrument"]) .one() ) instrument_module = ( session.query(models.InstrumentModule) .filter_by( instrument=instrument, name=instrument_module_configuration["instrument_module"], ) .one() ) try: instrument_module_configuration_db = ( session.query(models.PrimeCamModuleConfiguration) .filter_by( instrument_module=instrument_module, ) .one() ) except sqlalchemy.orm.exc.NoResultFound: instrument_module_configuration_db = ( models.PrimeCamModuleConfiguration( instrument_module=instrument_module, ) ) session.add(instrument_module_configuration_db) if primary: primary_instrument_module_configuration = ( instrument_module_configuration_db ) instrument_module_configuration_list += [ instrument_module_configuration_db ] # Now we can create the obs_unit filtered_values = {} for key in values: if hasattr(models.ObsUnit, key): # we need to look up the objects for the source, observing_program, # sub_observing_program # from the database and add them to the filtered_values if key == "observing_program": try: observing_program = ( session.query(models.ObservingProgram) .filter_by(short_name=values["observing_program"]) .one() ) except sqlalchemy.orm.exc.NoResultFound: print("ObservingProgram not found", values["observing_program"]) observing_program = None filtered_values[key] = observing_program elif key == "source": source = ( session.query(models.Source) .filter_by(name=values["source"]) .one() ) filtered_values[key] = source elif key == "sub_observing_program": if values["sub_observing_program"] == "None": sub_observing_program = None else: try: observing_program = ( session.query(models.ObservingProgram) .filter_by(short_name=values["observing_program"]) .one() ) except sqlalchemy.exc.NoResultFound: log.info( "ObservingProgram: %s not found", values["observing_program"]) log.debug("SubObservingProgram: %s", values["sub_observing_program"]) sub_observing_program = ( session.query(models.SubObservingProgram) .filter_by( short_name=values["sub_observing_program"], observing_program=observing_program ) .one() ) filtered_values[ "sub_observing_program"] = sub_observing_program # elif key == "obs_mode": # obs_mode = ( # session.query(models.ObsMode) # .filter_by(name=values["obs_mode"]) # .one() # ) # filtered_values[key] = obs_mode elif key == "obs_mode": obs_mode = ( session.query(models.ObsMode) .filter_by(name=values["obs_mode"]) .one() ) filtered_values[key] = obs_mode elif key == "observation_configuration": logging.debug("Updating observation_configuration") filtered_values["observation_configuration"] = ( observation_configuration ) else: filtered_values[key] = values[key] filtered_values["instrument_module_configurations"] = ( instrument_module_configuration_list ) filtered_values["primary_instrument_module_configuration"] = ( primary_instrument_module_configuration ) filtered_values["version"] = 1 obs_unit = models.ObsUnit( **filtered_values, ) session.add(obs_unit) # Commit the transaction session.commit() except exc.IntegrityError as e: # Handle the integrity error (e.g., duplicate name) session.rollback() print(e)
[docs] def add_pre_scheduled_slot( session, values, ): log = logging.getLogger(__name__) # add pre scheduled slot filtered_values = {} for key in values: if key == "obsunit_name": # check that the refered ObsUnit exists try: obs_unit = ( session.query(models.ObsUnit) .filter_by(name=values["obsunit_name"]) .one() ) filtered_values["obs_unit"] = obs_unit except sqlalchemy.orm.exc.NoResultFound: log.error("No obsunit found with name %s", values["obsunit_name"]) exit(1) elif key == "start_time" or key == "end_time": filtered_values[key] = datetime.fromisoformat(values[key]) else: filtered_values[key] = values[key] slot = models.PreScheduledSlot( **filtered_values, ) session.add(slot) # Commit the transaction session.commit() log.debug("Pre-scheduled slots added successfully.")