import logging
from datetime import datetime
import sqlalchemy
from astropy import units as u
from astropy.coordinates import SkyCoord
from sqlalchemy import exc
import json
from . import models
[docs]
def add_or_update_source(session, values, source_type=None):
log = logging.getLogger(__name__)
log.debug("Adding or updating source: %s", values)
if source_type is None or source_type == 'fixed_source':
values["type"] = "fixed_source"
source_model = models.FixedSource
if values["frame"] == "icrs":
coord = SkyCoord(
values["slam"],
values["sbet"],
unit=(u.hourangle, u.deg),
frame=values["frame"],
)
values["ra_deg"], values["dec_deg"] = float(coord.ra.deg), float(
coord.dec.deg)
if values["frame"] == "galactic":
coord = SkyCoord(
values["slam"],
values["sbet"],
unit=(u.deg, u.deg),
frame=values["frame"],
)
equatorial = coord.transform_to("icrs")
values["ra_deg"], values["dec_deg"] = float(equatorial.ra.deg), float(
equatorial.dec.deg
)
elif source_type == "solar_system_object":
values["type"] = source_type
source_model = models.SolarSystemObject
elif source_type == "constant_elevation_source":
values["type"] = source_type
source_model = models.ConstantElevationSource
coord_min = SkyCoord(
values["slam_min"],
values["sbet_min"],
unit=(u.hourangle, u.deg),
frame=values["frame"]
)
values["ra_deg_min"], values["dec_deg_min"] = float(coord_min.ra.deg), float(
coord_min.dec.deg
)
coord_max = SkyCoord(
values["slam_max"],
values["sbet_max"],
unit=(u.hourangle, u.deg),
frame=values["frame"]
)
values["ra_deg_max"], values["dec_deg_max"] = float(coord_max.ra.deg), float(
coord_max.dec.deg
)
try:
# Check if a source with the given name already exists
try:
source = (
session.query(source_model)
.filter_by(
name=values["name"],
type=values["type"],
)
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
log.debug("No source found with name %s", values["name"])
source = None
if source is not None:
log.info("Source found with name %s", values["name"])
for column in source.__table__.columns:
column_name = column.name
if column_name in values:
if source.type == source_type:
old_value = getattr(source, column_name)
new_value = values[column_name]
updated_column = False
if isinstance(old_value, str) and isinstance(new_value, str):
if old_value.strip() != new_value.strip():
updated_column = True
elif isinstance(old_value, float) and isinstance(
new_value, str
):
new_value = float(new_value)
if abs(old_value - new_value) > 1e-6:
updated_column = True
elif isinstance(old_value, float) and isinstance(
new_value, float
):
if abs(old_value - new_value) > 1e-6:
updated_column = True
elif old_value != new_value:
updated_column = True
setattr(source, column_name, new_value)
if updated_column:
log.warning(
"Value for column %s has changed from %s to %s",
column_name,
old_value,
new_value,
)
else:
setattr(source, column_name, values[column_name])
else:
# If not, create a new source record
filtered_values = {}
for key in values:
if hasattr(source_model, key.lower()):
try:
filtered_values[key.lower()] = values[key]
except AttributeError:
filtered_values[key.lower()] = values[key]
source = source_model(
**filtered_values,
)
session.add(source)
# Commit the transaction
session.commit()
log.debug("Source added or updated successfully.")
except exc.IntegrityError as e:
log.error(e)
# Handle the integrity error (e.g., duplicate name)
session.rollback()
log.warning("A record with the name %s already exists.", key)
[docs]
def add_inpar_and_tiling(
session,
observation_configuration,
inpar_parameters_list=None,
tiling_parameters=None,
):
"""
Add inpar and tiling parameters to the observation_configuration
Parameters
----------
session : database session
observation_configuration : ccat.opsdb.observation_configuration
Observation configuration with inpar and tiling to be associated
inpar_parameters_list : list
List of dictionary of inpar_parameters
tiling_parameters : list
List of dictionary of each line in tiling
Returns
-------
observation_configuration : ccat.opsdb.observation_configuration
Updated observation configuration
"""
log = logging.getLogger(__name__)
if inpar_parameters_list is not None:
for inpar_parameters in inpar_parameters_list:
# check whether this inpar file already exists
inpar_entry = (
session.query(models.ChaiInparParameter)
.filter_by(name=inpar_parameters["name"])
.all()
)
if len(inpar_entry) == 0:
# add as a new one
filtered_inpar_parameter_values = {}
for key in inpar_parameters:
if hasattr(models.ChaiInparParameter, key):
filtered_inpar_parameter_values[key] = inpar_parameters[key]
filtered_inpar_parameter_values["version"] = 1
chai_inpar_parameter = models.ChaiInparParameter(
**filtered_inpar_parameter_values,
)
session.add(chai_inpar_parameter)
elif len(inpar_entry) == 1:
# update with a warning
history = inpar_entry[0].history
if history is not None:
history = json.loads(history)
if not isinstance(history, list):
history = [history]
updateflg = 0
for column in inpar_entry[0].__dict__:
column_name = column
if column_name in inpar_parameters:
if getattr(inpar_entry[0], column_name) \
!= inpar_parameters[column_name]:
log.warning(
f"inpar parameter {column_name} in {inpar_entry[0].name} is updated from {getattr(inpar_entry[0], column_name)} to {inpar_parameters[column_name]}")
newhistory = {
"timestamp":datetime.now().isoformat(),
"key":column_name,
"old_value":getattr(inpar_entry[0], column_name)
}
if history is None:
history = [newhistory]
else:
history.append(newhistory)
setattr(
inpar_entry[0],
column_name,
inpar_parameters[column_name],
)
updateflg = 1
if updateflg == 1:
setattr(inpar_entry[0], 'history', json.dumps(history))
ver = inpar_entry[0].version
setattr(inpar_entry[0], 'version', ver+1)
else:
log.error("Two inpar files with same name!")
exit(1)
if tiling_parameters is not None:
for tiling in tiling_parameters:
# Fill in the tiling values
filtered_tiling_values = {}
for key in tiling:
if hasattr(models.ChaiTiling, key):
filtered_tiling_values[key] = tiling[key]
# associate to inpar file
try:
inpar_parameters = (
session.query(models.ChaiInparParameter)
.filter_by(name=tiling["inpar_file"])
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
print(f"inpar {tiling['inpar_file']} not found")
exit(1)
filtered_tiling_values["chai_inpar_parameter"] = inpar_parameters
filtered_tiling_values["version"] = 1
chai_tiling = models.ChaiTiling(
**filtered_tiling_values,
)
session.add(chai_tiling)
observation_configuration.chai_tilings += [chai_tiling]
return observation_configuration
[docs]
def add_or_update_obs_unit(
session,
values,
obs_unit_type="chai_obs_unit",
inpar_parameters_list=None,
tiling_parameters=None,
instrument_module_configurations=None,
azimuth_range=None,
mapping_parameters=None,
):
log = logging.getLogger(__name__)
try:
#values["source"] = values["name"]
# Check if an user with the given name already exists
obs_unit = (
session.query(models.ObsUnit)
.filter_by(
name=values["name"],
)
.first()
)
if obs_unit:
log.debug("ObsUnit found with name %s", values["name"])
# If it exists, update the description
ver = obs_unit.version
setattr(obs_unit, 'version', ver+1)
history = obs_unit.history
newhistory = {
"timestamp":datetime.now().isoformat(),
"key":"all",
"old_value":"not-tracked"}
if history is None:
history = [newhistory]
else:
history = json.loads(history)
if not isinstance(history, list):
history = [history]
history.append(newhistory)
setattr(obs_unit, 'history', json.dumps(history))
observation_configuration = obs_unit.observation_configuration
if observation_configuration:
if observation_configuration.type == "chai_observation_configuration":
# Not replacing, just adding more inpars and tiles
# because it is very difficult to relate which individual tile lines
# corresponds to new tile lines.
# Add inpar and tiling parameters to the observation_configuration
observation_configuration = add_inpar_and_tiling(
session,
observation_configuration,
inpar_parameters_list=inpar_parameters_list,
tiling_parameters=tiling_parameters,
)
elif (
observation_configuration.type
== "prime_cam_observation_configuration"
):
if mapping_parameters:
ver = observation_configuration.version
setattr(observation_configuration, 'version', ver+1)
history = observation_configuration.history
newhistory = {
"timestamp":datetime.now().isoformat(),
"key":"mapping_parameters",
"old_value":getattr(observation_configuration,
"mapping_parameters")
}
if history is None:
history = [newhistory]
else:
history = json.loads(history)
if not isinstance(history, list):
history = [history]
history.append(newhistory)
setattr(observation_configuration, 'history', json.dumps(history))
setattr(
observation_configuration,
"mapping_parameters",
mapping_parameters,
)
if azimuth_range is not None:
setattr(observation_configuration, "azimuth_range", azimuth_range)
for column in obs_unit.__table__.columns:
column_name = column.name
if column_name in values:
if column_name == "source":
observing_program = (
session.query(models.Source)
.filter_by(name=values["source"])
.one()
)
setattr(obs_unit, column_name, observing_program)
if column_name == "observing_program":
observing_program = (
session.query(models.ObservingProgram)
.filter_by(short_name=values["observing_program"])
.one()
)
setattr(obs_unit, column_name, observing_program)
elif column_name == "sub_observing_program":
if values["sub_observing_program"] == "None":
sub_observing_program = None
else:
observing_program = (
session.query(models.ObservingProgram)
.filter_by(short_name=values["observing_program"])
.one()
)
try:
sub_observing_program = (
session.query(models.SubObservingProgram)
.filter_by(
name=values["sub_observing_program"],
observing_program=observing_program,
)
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
# TODO handle this situation properly
print("SubObservingProgram not found",
values["sub_observing_program"])
sub_observing_program = None
setattr(obs_unit, column_name, sub_observing_program)
elif column_name == "obs_mode":
obs_mode = (
session.query(models.ObsMode)
.filter_by(short_name=values["obs_mode"])
.one()
)
setattr(obs_unit, column_name, obs_mode)
elif column_name == "observation_configuration":
logging.debug("Updating observation_configuration")
else:
setattr(obs_unit, column_name, values[column_name])
else:
# First we need to check if the observation_configuration is a
# chai_observation_configuration or a prime_cam_observation_configuration
# and create the appropriate object
if obs_unit_type == "chai_obs_unit":
# If it is a chai_observation_configuration we need to fill in the
# chai_inpar_parameter and chai_tiling
observation_configuration = models.ChaiObservationConfiguration(
type="chai_observation_configuration",
)
session.add(observation_configuration)
# # Load the Chai inpar parameters for this obs_unit
# filtered_inpar_parameter_values = {}
# for key in inpar_parameters:
# if hasattr(models.ChaiInparParameter, key):
# filtered_inpar_parameter_values[key] = inpar_parameters[key]
# chai_inpar_parameter = models.ChaiInparParameter(
# **filtered_inpar_parameter_values,
# )
# session.add(chai_inpar_parameter)
# observation_configuration.chai_inpar_parameters += [
# chai_inpar_parameter
# ]
# Add inpar and tiling parameters to the observation_configuration
observation_configuration = add_inpar_and_tiling(
session,
observation_configuration,
inpar_parameters_list=inpar_parameters_list,
tiling_parameters=tiling_parameters,
)
# Add ntilelines
observation_configuration.ntilelines = values["ntilelines"]
elif obs_unit_type == "prime_cam_obs_unit":
# If it is a prime_cam_observation_configuration
observation_configuration = models.PrimeCamObservationConfiguration(
type="prime_cam_observation_configuration",
)
session.add(observation_configuration)
if mapping_parameters:
setattr(
observation_configuration,
"mapping_parameters",
mapping_parameters,
)
setattr(
observation_configuration, 'version', 1
)
# add azimuth range
if azimuth_range is not None:
setattr(observation_configuration, "azimuth_range", azimuth_range)
values["observation_configuration"] = observation_configuration
instrument_module_configuration_list = []
for instrument_module_configuration in instrument_module_configurations:
# Here we have to create or load the module configuration and log them
# in the database
try:
primary = instrument_module_configuration["primary"]
except KeyError:
primary = False
if (
instrument_module_configuration["type"]
== "chai_module_configuration"
):
instrument = (
session.query(models.Instrument).filter_by(
name=instrument_module_configuration["instrument"]).one()
)
instrument_module = (
session.query(models.InstrumentModule)
.filter_by(
instrument=instrument,
name=instrument_module_configuration["instrument_module"],
)
.one()
)
line = (
session.query(models.Line)
.filter_by(name=instrument_module_configuration["line"])
.one()
)
try:
instrument_module_configuration_db = (
session.query(models.ChaiModuleConfiguration)
.filter_by(
instrument_module=instrument_module,
line=line,
)
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
instrument_module_configuration_db = (
models.ChaiModuleConfiguration(
instrument_module=instrument_module,
line=line,
)
)
session.add(instrument_module_configuration_db)
elif (
instrument_module_configuration["type"]
== "prime_cam_module_configuration"
):
instrument = (
session.query(models.Instrument)
.filter_by(name=instrument_module_configuration["instrument"])
.one()
)
instrument_module = (
session.query(models.InstrumentModule)
.filter_by(
instrument=instrument,
name=instrument_module_configuration["instrument_module"],
)
.one()
)
try:
instrument_module_configuration_db = (
session.query(models.PrimeCamModuleConfiguration)
.filter_by(
instrument_module=instrument_module,
)
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
instrument_module_configuration_db = (
models.PrimeCamModuleConfiguration(
instrument_module=instrument_module,
)
)
session.add(instrument_module_configuration_db)
if primary:
primary_instrument_module_configuration = (
instrument_module_configuration_db
)
instrument_module_configuration_list += [
instrument_module_configuration_db
]
# Now we can create the obs_unit
filtered_values = {}
for key in values:
if hasattr(models.ObsUnit, key):
# we need to look up the objects for the source, observing_program,
# sub_observing_program
# from the database and add them to the filtered_values
if key == "observing_program":
try:
observing_program = (
session.query(models.ObservingProgram)
.filter_by(short_name=values["observing_program"])
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
print("ObservingProgram not found",
values["observing_program"])
observing_program = None
filtered_values[key] = observing_program
elif key == "source":
source = (
session.query(models.Source)
.filter_by(name=values["source"])
.one()
)
filtered_values[key] = source
elif key == "sub_observing_program":
if values["sub_observing_program"] == "None":
sub_observing_program = None
else:
try:
observing_program = (
session.query(models.ObservingProgram)
.filter_by(short_name=values["observing_program"])
.one()
)
except sqlalchemy.exc.NoResultFound:
log.info(
"ObservingProgram: %s not found",
values["observing_program"])
log.debug("SubObservingProgram: %s",
values["sub_observing_program"])
sub_observing_program = (
session.query(models.SubObservingProgram)
.filter_by(
short_name=values["sub_observing_program"],
observing_program=observing_program
)
.one()
)
filtered_values[
"sub_observing_program"] = sub_observing_program
# elif key == "obs_mode":
# obs_mode = (
# session.query(models.ObsMode)
# .filter_by(name=values["obs_mode"])
# .one()
# )
# filtered_values[key] = obs_mode
elif key == "obs_mode":
obs_mode = (
session.query(models.ObsMode)
.filter_by(name=values["obs_mode"])
.one()
)
filtered_values[key] = obs_mode
elif key == "observation_configuration":
logging.debug("Updating observation_configuration")
filtered_values["observation_configuration"] = (
observation_configuration
)
else:
filtered_values[key] = values[key]
filtered_values["instrument_module_configurations"] = (
instrument_module_configuration_list
)
filtered_values["primary_instrument_module_configuration"] = (
primary_instrument_module_configuration
)
filtered_values["version"] = 1
obs_unit = models.ObsUnit(
**filtered_values,
)
session.add(obs_unit)
# Commit the transaction
session.commit()
except exc.IntegrityError as e:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print(e)
[docs]
def add_pre_scheduled_slot(
session,
values,
):
log = logging.getLogger(__name__)
# add pre scheduled slot
filtered_values = {}
for key in values:
if key == "obsunit_name":
# check that the refered ObsUnit exists
try:
obs_unit = (
session.query(models.ObsUnit)
.filter_by(name=values["obsunit_name"])
.one()
)
filtered_values["obs_unit"] = obs_unit
except sqlalchemy.orm.exc.NoResultFound:
log.error("No obsunit found with name %s", values["obsunit_name"])
exit(1)
elif key == "start_time" or key == "end_time":
filtered_values[key] = datetime.fromisoformat(values[key])
else:
filtered_values[key] = values[key]
slot = models.PreScheduledSlot(
**filtered_values,
)
session.add(slot)
# Commit the transaction
session.commit()
log.debug("Pre-scheduled slots added successfully.")