import hashlib
import logging
import os
import time
import click
import sqlalchemy
from sqlalchemy import exc
from . import models
from .ccat_ops_db import init_ccat_ops_db
from .init_obs_units import init_obs_units
from .models import (
Site,
DataLocation,
DiskDataLocation,
S3DataLocation,
TapeDataLocation,
LocationType,
StorageType,
)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
)
def unique_id():
# Get the current time as a float
current_time = str(time.time())
# Create a hash object
hash_object = hashlib.sha1(current_time.encode())
# Return the hexadecimal digest of the hash
return hash_object.hexdigest()
def successfull_initialization():
with open("/app/state/db_initialized", "w") as f:
f.write("Database successfully initialized")
def check_successfull_initialization():
if os.path.exists("/tmp/opsdb_init"):
exit(0)
else:
exit(1)
# this function takes a folder and looks for all .toml files in it returning a list of them
def get_toml_files_in_folder(folder):
"""This function takes a folder and looks for all .toml files in it returning a list of them
Args:FastAPI
folder (str): the folder to look for .toml files in
Returns:
list: a list of all the .toml files in the folder
"""
toml_files = []
for root, dirs, files in os.walk(folder):
for file in files:
if file.endswith(".toml"):
toml_files.append(os.path.join(root, file))
logging.info("Found %s toml files in %s", len(toml_files), folder)
return toml_files
def add_or_update_observatory(session, values):
try:
# Check if an observatory with the given name already exists
observatory = (
session.query(models.Observatory).filter_by(name=values["name"]).first()
)
if observatory:
# If it exists, update the description
observatory.description = values["description"]
else:
# If not, create a new observatory record
observatory = models.Observatory(
name=values["name"], description=values["description"]
)
session.add(observatory)
# Commit the transaction
session.commit()
except exc.IntegrityError:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print("A record with the name %s already exists.", values["name"])
# finally:
# session.close()
def add_or_update_telescope(session, values):
# get the dependencies for the telescope
observatory = (
session.query(models.Observatory).filter_by(name=values["observatory"]).one()
)
try:
# Check if an telescope with the given name already exists
telescope = (
session.query(models.Telescope).filter_by(name=values["name"]).first()
)
if telescope:
# If it exists, update the description
telescope.description = values["description"]
telescope.lon_deg = values["lon_deg"]
telescope.lat_deg = values["lat_deg"]
telescope.alt_m = values["alt_m"]
telescope.observatory = observatory
else:
# If not, create a new telescope record
telescope = models.Telescope(
name=values["name"],
description=values["description"],
lon_deg=values["lon_deg"],
lat_deg=values["lat_deg"],
alt_m=values["alt_m"],
observatory=observatory,
)
session.add(telescope)
# Commit the transaction
session.commit()
except exc.IntegrityError:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print("A record with the name %s already exists.", values["name"])
finally:
session.close()
def add_or_update_module(session, values, instrument):
try:
# Check if an module with the given name already exists
instrument_module = (
session.query(models.InstrumentModule)
.filter_by(
name=values["name"],
instrument=instrument,
)
.first()
)
if instrument_module:
# If it exists, update the description
instrument_module.description = values.get("description", "")
instrument_module.instrument = instrument
else:
# If not, create a new instrument_module record
instrument_module = models.InstrumentModule(
name=values["name"],
description=values.get("description", ""),
pixels=values.get("pixels", 1),
available=True, # start with all available
instrument=instrument,
)
session.add(instrument_module)
# Commit the transaction
session.commit()
except exc.IntegrityError:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print("A record with the name %s already exists.", values["name"])
finally:
session.close()
def add_or_update_instrument(session, values):
telescope = (
session.query(models.Telescope).filter_by(name=values["telescope"]).one()
)
values["telescope"] = telescope
try:
# Check if an instrument with the given name already exists
instrument = (
session.query(models.Instrument)
.filter_by(
name=values["name"],
telescope=telescope,
)
.first()
)
if instrument:
# If it exists, update the description
for column in instrument.__table__.columns:
column_name = column.name
if column_name in values:
setattr(instrument, column_name, values[column_name])
else:
# If not, create a new instrument record
filtered_values = {}
for key in values:
if hasattr(models.Instrument, key):
filtered_values[key] = values[key]
filtered_values["available"] = True # start with all available
instrument = models.Instrument(
**filtered_values,
)
session.add(instrument)
for key, values in values["module"].items():
add_or_update_module(session, values, instrument)
# Commit the transaction
session.commit()
except exc.IntegrityError as e:
print(e)
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print("A record with the name %s already exists.", values["name"])
finally:
session.close()
def add_or_update_role(session, values):
try:
# Check if an user with the given name already exists
role = (
session.query(models.Role)
.filter_by(
name=values["name"],
)
.first()
)
if role:
# If it exists, update the description
for column in role.__table__.columns:
column_name = column.name
if column_name in values:
setattr(role, column_name, values[column_name])
else:
# If not, create a new source record
filtered_values = {}
for key in values:
if hasattr(models.Role, key):
filtered_values[key] = values[key]
role = models.Role(
**filtered_values,
)
session.add(role)
# Commit the transaction
session.commit()
except exc.IntegrityError as e:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print(e)
finally:
session.close()
def add_or_update_user(session, values):
try:
# Check if an user with the given name already exists
user = (
session.query(models.User)
.filter_by(
first_name=values["first_name"],
last_name=values["last_name"],
username=values["username"],
)
.first()
)
roles = []
for role in values["roles"]:
try:
roles += [session.query(models.Role).filter_by(name=role).one()]
except sqlalchemy.orm.exc.NoResultFound:
print("Role not found", role)
if user:
# If it exists, update the description
for column in user.__table__.columns:
column_name = column.name
if column_name in values:
setattr(user, column_name, values[column_name])
setattr(user, "roles", roles)
else:
# If not, create a new source record
filtered_values = {}
for key in values:
if hasattr(models.User, key):
filtered_values[key] = values[key]
filtered_values["roles"] = roles
user = models.User(
**filtered_values,
)
session.add(user)
# Commit the transaction
session.commit()
except exc.IntegrityError as e:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print(e)
finally:
session.close()
def add_or_update_sub_observing_program(session, values, observing_program):
try:
# Check if an module with the given name already exists
sub_observing_program = (
session.query(models.SubObservingProgram)
.filter_by(
name=values["name"],
observing_program=observing_program,
)
.first()
)
if sub_observing_program:
# If it exists, update the description
sub_observing_program.short_name = values.get("short_name", "")
sub_observing_program.observing_program = observing_program
else:
# If not, create a new instrument_module record
sub_observing_program = models.SubObservingProgram(
name=values["name"],
short_name=values.get("short_name", ""),
observing_program=observing_program,
)
session.add(sub_observing_program)
# Commit the transaction
session.commit()
except exc.IntegrityError:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print("A record with the name %s already exists.", values["name"])
finally:
session.close()
def add_or_update_observing_program(session, values):
instruments = []
for instrument_name in values["instruments"]:
instruments += [
session.query(models.Instrument).filter_by(name=instrument_name).one()
]
values["instruments"] = instruments
try:
# Check if an instrument with the given name already exists
observing_program = (
session.query(models.ObservingProgram)
.filter_by(
name=values["name"],
)
.first()
)
if observing_program:
# If it exists, update the description
for column in observing_program.__table__.columns:
column_name = column.name
if column_name in values:
if column_name == "lead":
logging.info("Updating lead")
user = (
session.query(models.User)
.filter_by(username=values["lead"])
.one()
)
setattr(observing_program, column_name, user)
else:
setattr(observing_program, column_name, values[column_name])
else:
# If not, create a new observing_program record
filtered_values = {}
for key in values:
if hasattr(models.ObservingProgram, key):
if key == "lead":
try:
user = (
session.query(models.User)
.filter_by(username=values["lead"])
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
print("User not found", values["lead"])
user = None
filtered_values[key] = user
elif key == "deputy_lead":
try:
user = (
session.query(models.User)
.filter_by(username=values["deputy_lead"])
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
logging.error("User not found", values["deputy_lead"])
user = None
filtered_values[key] = user
else:
filtered_values[key] = values[key]
observing_program = models.ObservingProgram(
**filtered_values,
)
session.add(observing_program)
try:
for key, values in values["sub_observing_program"].items():
add_or_update_sub_observing_program(session, values, observing_program)
except KeyError:
logging.info(
"No sub_observing_programs found for observing_program %s",
values["name"],
)
pass
# Commit the transaction
session.commit()
except exc.IntegrityError as e:
print(e)
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print("A record with the name %s already exists.", values["name"])
finally:
session.close()
def add_or_update_line(session, key, values):
values["name"] = key
try:
# Check if an user with the given name already exists
try:
line = (
session.query(models.Line)
.filter_by(
name=key,
)
.one()
)
except sqlalchemy.orm.exc.NoResultFound:
line = None
if line:
# If it exists, update the description
for column in line.__table__.columns:
column_name = column.name
if column_name in values:
setattr(line, column_name, values[column_name])
else:
# If not, create a new source record
filtered_values = {}
for key in values:
if hasattr(models.Line, key):
filtered_values[key] = values[key]
filtered_values["available"] = True # start with all available
line = models.Line(
**filtered_values,
)
session.add(line)
# Commit the transaction
session.commit()
except exc.IntegrityError as e:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print(e)
finally:
session.close()
def add_or_update_obs_mode(session, key, values):
values["name"] = key
try:
# Check if an user with the given name already exists
obs_mode = (
session.query(models.ObsMode)
.filter_by(
name=key,
)
.first()
)
if obs_mode:
# If it exists, update the description
for column in obs_mode.__table__.columns:
column_name = column.name
if column_name in values:
setattr(obs_mode, column_name, values[column_name])
else:
# If not, create a new source record
filtered_values = {}
for key in values:
if hasattr(models.ObsMode, key):
filtered_values[key] = values[key]
obs_mode = models.ObsMode(
**filtered_values,
)
session.add(obs_mode)
# Commit the transaction
session.commit()
except exc.IntegrityError as e:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print(e)
finally:
session.close()
def add_or_update_data_transfer_routes(session, values):
try:
# Check if an user with the given name already exists
data_transfer_route = (
session.query(models.DataTransferRoute)
.filter_by(
name=values["name"],
)
.first()
)
if data_transfer_route:
# If it exists, update the description
origin = (
session.query(models.DataLocation)
.filter_by(name=values["origin_data_archive_short_name"])
.one()
)
destination = (
session.query(models.DataLocation)
.filter_by(name=values["destination_data_archive_short_name"])
.one()
)
data_transfer_route.name = values["name"]
data_transfer_route.origin = origin
data_transfer_route.data_transfer_method = values["data_transfer_method"]
data_transfer_route.primary = values["primary"]
data_transfer_route.destination = destination
else:
# If not, create a new source record
origin = (
session.query(models.DataLocation)
.filter_by(name=values["origin_data_archive_short_name"])
.one()
)
destination = (
session.query(models.DataLocation)
.filter_by(name=values["destination_data_archive_short_name"])
.one()
)
data_transfer_route = models.DataTransferRoute(
name=values["name"],
origin_data_location=origin,
destination_data_location=destination,
data_transfer_method=values["data_transfer_method"],
primary=values["primary"],
)
session.add(data_transfer_route)
# Commit the transaction
session.commit()
logging.info("Added data_transfer_route %s", values["name"])
except exc.IntegrityError as e:
# Handle the integrity error (e.g., duplicate name)
session.rollback()
print(f"An error occurred: {e}")
finally:
session.close()
def add_or_update_site_and_locations(session, site_key, values):
"""Add or update a site and its associated data locations.
Args:
session: SQLAlchemy session
site_key: The key from the TOML file (e.g. "CologneStaging")
values: Dictionary containing site and location information
"""
# Extract site information
site_values = {
"name": values["name"],
"short_name": values["short_name"],
"site_location": values["site_location"],
}
# Add or update site
site = session.query(Site).filter_by(short_name=site_values["short_name"]).first()
if site is None:
site = Site(**site_values)
session.add(site)
session.flush() # Flush to get the site ID
else:
for key, value in site_values.items():
setattr(site, key, value)
# Process locations
for key, value in values.items():
logging.info("Processing key %s", key)
if key == "location":
# Process each location under the site
logging.info("Processing locations %s", value.items())
for location_name, location_data in value.items():
logging.info("Processing location %s", location_name)
# Create location values dict
location_values = {
"name": location_data["name"],
"location_type": LocationType(location_data["location_type"]),
"site_id": site.id,
"storage_type": StorageType(location_data["storage_type"]),
"active": location_data.get("active", True),
"priority": location_data.get("priority", 0),
}
# Add storage-specific fields
if location_data["storage_type"] == "disk":
location_values.update(
{
"path": location_data["path"],
"host": location_data["host"],
"user": location_data["user"],
}
)
elif location_data["storage_type"] == "s3":
location_values.update(
{
"bucket_name": location_data["bucket_name"],
"region": location_data.get("region"),
"endpoint_url": location_data.get("endpoint_url"),
}
)
elif location_data["storage_type"] == "tape":
location_values.update(
{
"library_name": location_data.get("library_name"),
"mount_path": location_data.get("mount_path"),
}
)
# Add or update location
location = (
session.query(DataLocation)
.filter_by(site_id=site.id, name=location_values["name"])
.first()
)
if location is None:
if location_data["storage_type"] == "disk":
location = DiskDataLocation(**location_values)
elif location_data["storage_type"] == "s3":
location = S3DataLocation(**location_values)
elif location_data["storage_type"] == "tape":
location = TapeDataLocation(**location_values)
session.add(location)
else:
for key, value in location_values.items():
setattr(location, key, value)
logging.info(
"Added location %s (id: %s, site_id: %s, type: %s, storage: %s, active: %s, priority: %s)",
location.name,
location.id,
location.site_id,
location.location_type,
location.storage_type,
location.active,
location.priority,
)
# Commit all changes
session.commit()
[docs]
def opsdb_initialization(
verbose=False,
drop=False,
local_url=None,
docker_compose=False,
database=None,
data_archive_mode="development",
):
"""This function provisions the ccat_obs_db for testing purposes"""
import toml
from git import Repo
if verbose:
logging.getLogger().setLevel(logging.DEBUG)
session, engine = init_ccat_ops_db(drop=drop, database=database)
repo_url = "git@github.com:ccatobs/data-center.git"
if not local_url:
logging.info("Cloning data-center repository from %s", repo_url)
target_dir = "/tmp/data-center"
os.system("rm -rf /tmp/data-center")
Repo.clone_from(repo_url, target_dir, branch="main")
else:
target_dir = local_url
# First we setup the observatory reading the defined observatories in the tomls
# files in the observatory folder of the data-center repository
observatory_folder = target_dir + "/observatory"
toml_files = get_toml_files_in_folder(observatory_folder)
logging.debug("Adding observatories from %s", observatory_folder)
for toml_file in toml_files:
# here we read the toml file and for every entry match the defined fields to the
# fields in the Observatory sqlalchemy model
with open(toml_file, "r") as f:
data = toml.load(f)
for key, values in data.items():
add_or_update_observatory(session, values)
# Now the same for telescope
telescope_folder = target_dir + "/telescope"
toml_files = get_toml_files_in_folder(telescope_folder)
logging.debug("Adding telescopes from %s", telescope_folder)
for toml_file in toml_files:
with open(toml_file, "r") as f:
data = toml.load(f)
for key, values in data.items():
add_or_update_telescope(session, values)
# Now the same for instruments
instrument_folder = target_dir + "/instruments"
toml_files = get_toml_files_in_folder(instrument_folder)
logging.debug("Adding instruments from %s", instrument_folder)
for toml_file in toml_files:
with open(toml_file, "r") as f:
data = toml.load(f)
for key, values in data.items():
add_or_update_instrument(session, values)
# roles_folder = target_dir + "/roles"
# toml_files = get_toml_files_in_folder(roles_folder)
# logging.debug("Adding roles from %s", roles_folder)
# for toml_file in toml_files:
# with open(toml_file, "r") as f:
# data = toml.load(f)
# for key, values in data.items():
# add_or_update_role(session, values)
# Now the same for users
# logging.debug("Adding users from %s", target_dir + "/users")
# users_folder = target_dir + "/users"
# toml_files = get_toml_files_in_folder(users_folder)
# for toml_file in toml_files:
# with open(toml_file, "r") as f:
# data = toml.load(f)
# for key, values in data.items():
# add_or_update_user(session, values)
# Now the same for observing_programs
observing_programs_folder = target_dir + "/observing_programs"
toml_files = get_toml_files_in_folder(observing_programs_folder)
logging.debug("Adding observing_programs from %s", observing_programs_folder)
for toml_file in toml_files:
with open(toml_file, "r") as f:
data = toml.load(f)
for key, values in data.items():
add_or_update_observing_program(session, values)
# Now the same for lines
lines_folder = target_dir + "/lines"
toml_files = get_toml_files_in_folder(lines_folder)
logging.debug("Adding observing_programs from %s", lines_folder)
for toml_file in toml_files:
with open(toml_file, "r") as f:
data = toml.load(f)
for key, values in data.items():
add_or_update_line(session, key, values)
# Now the same for obs_modes
obs_modes_folder = target_dir + "/obs_modes"
toml_files = get_toml_files_in_folder(obs_modes_folder)
logging.debug("Adding obs_modes from %s", obs_modes_folder)
for toml_file in toml_files:
with open(toml_file, "r") as f:
data = toml.load(f)
for key, values in data.items():
add_or_update_obs_mode(session, key, values)
# Replace the data archives section with sites
sites_folder = target_dir + "/sites"
toml_files = get_toml_files_in_folder(sites_folder)
logging.debug("Adding sites and locations from %s", sites_folder)
for toml_file in toml_files:
# Only process the file matching the specified data_archive_mode
if f"sites.{data_archive_mode}.toml" in toml_file:
with open(toml_file, "r") as f:
data = toml.load(f)
for key, values in data.items():
# Skip location entries as they'll be handled by add_or_update_site_and_locations
if not key.endswith(".location"):
logging.info("Adding site %s", values["name"])
add_or_update_site_and_locations(session, key, values)
# Now the same for primary data transfer routes
data_transfer_routes_folder = target_dir + "/data_transfer_routes"
toml_files = get_toml_files_in_folder(data_transfer_routes_folder)
logging.debug(
"Adding primary data transfer routes from %s",
data_transfer_routes_folder,
)
# for toml_file in toml_files:
# if f"data_transfer_routes.{data_archive_mode}.toml" in toml_file:
# with open(toml_file, "r") as f:
# data = toml.load(f)
# for key, values in data.items():
# # add logging
# logging.info(
# "Adding primary data transfer route %s", values["name"]
# )
# add_or_update_data_transfer_routes(session, values)
# Generate soures and obs_units
input_folder = target_dir + "/obs_units"
init_obs_units(
installation_path=input_folder,
input_yaml_file="input_file_obs_units_to_db.yaml",
session=session,
)
# Seed development tokens if in development mode
if data_archive_mode == "development":
try:
from .seed_dev_tokens import seed_dev_tokens
seed_dev_tokens(session, data_archive_mode)
except ImportError as e:
logging.warning(f"Could not import seed_dev_tokens: {e}")
except Exception as e:
logging.warning(f"Error seeding development tokens: {e}")
# Don't fail initialization if seeding fails
if docker_compose:
successfull_initialization()
for _ in range(5):
time.sleep(10)
return session, engine
# this function is a click entry point and has a click command argument to drop the
# database before creating it
@click.command()
@click.option(
"-v", "--verbose", is_flag=True, help="Enable verbose output (debug level)."
)
@click.option("--drop", is_flag=True, help="Drop all tables before creating them")
# add option to read local_url from command line
@click.option("--local_url", default=None, help="Local url to use for git clone")
@click.option("--docker_compose", is_flag=True, default=None)
@click.option("--database", default=None, help="Database URL")
@click.option("--data_archive_mode", default="development", help="Data archive mode")
def opsdb_init(
verbose=False,
drop=False,
local_url=None,
docker_compose=False,
database=None,
data_archive_mode=None,
):
opsdb_initialization(
verbose,
drop,
local_url,
docker_compose,
database,
data_archive_mode,
)