Source code for ccat_ops_db.opsdb_init

import hashlib
import logging
import os
import time

import click
import sqlalchemy
from sqlalchemy import exc

from . import models
from .ccat_ops_db import init_ccat_ops_db
from .init_obs_units import init_obs_units
from .models import (
    Site,
    DataLocation,
    DiskDataLocation,
    S3DataLocation,
    TapeDataLocation,
    LocationType,
    StorageType,
)

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler()],
)


def unique_id():
    # Get the current time as a float
    current_time = str(time.time())
    # Create a hash object
    hash_object = hashlib.sha1(current_time.encode())
    # Return the hexadecimal digest of the hash
    return hash_object.hexdigest()


def successfull_initialization():
    with open("/app/state/db_initialized", "w") as f:
        f.write("Database successfully initialized")


def check_successfull_initialization():
    if os.path.exists("/tmp/opsdb_init"):
        exit(0)
    else:
        exit(1)


# this function takes a folder and looks for all .toml files in it returning a list of them
def get_toml_files_in_folder(folder):
    """This function takes a folder and looks for all .toml files in it returning a list of them

    Args:FastAPI
        folder (str): the folder to look for .toml files in

    Returns:
        list: a list of all the .toml files in the folder
    """
    toml_files = []
    for root, dirs, files in os.walk(folder):
        for file in files:
            if file.endswith(".toml"):
                toml_files.append(os.path.join(root, file))
    logging.info("Found %s toml files in %s", len(toml_files), folder)
    return toml_files


def add_or_update_observatory(session, values):
    try:
        # Check if an observatory with the given name already exists
        observatory = (
            session.query(models.Observatory).filter_by(name=values["name"]).first()
        )

        if observatory:
            # If it exists, update the description
            observatory.description = values["description"]
        else:
            # If not, create a new observatory record
            observatory = models.Observatory(
                name=values["name"], description=values["description"]
            )
            session.add(observatory)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print("A record with the name %s already exists.", values["name"])
    # finally:
    #     session.close()


def add_or_update_telescope(session, values):
    # get the dependencies for the telescope
    observatory = (
        session.query(models.Observatory).filter_by(name=values["observatory"]).one()
    )
    try:
        # Check if an telescope with the given name already exists
        telescope = (
            session.query(models.Telescope).filter_by(name=values["name"]).first()
        )

        if telescope:
            # If it exists, update the description
            telescope.description = values["description"]
            telescope.lon_deg = values["lon_deg"]
            telescope.lat_deg = values["lat_deg"]
            telescope.alt_m = values["alt_m"]
            telescope.observatory = observatory
        else:
            # If not, create a new telescope record
            telescope = models.Telescope(
                name=values["name"],
                description=values["description"],
                lon_deg=values["lon_deg"],
                lat_deg=values["lat_deg"],
                alt_m=values["alt_m"],
                observatory=observatory,
            )
            session.add(telescope)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print("A record with the name %s already exists.", values["name"])
    finally:
        session.close()


def add_or_update_module(session, values, instrument):
    try:
        # Check if an module with the given name already exists
        instrument_module = (
            session.query(models.InstrumentModule)
            .filter_by(
                name=values["name"],
                instrument=instrument,
            )
            .first()
        )

        if instrument_module:
            # If it exists, update the description
            instrument_module.description = values.get("description", "")
            instrument_module.instrument = instrument
        else:
            # If not, create a new instrument_module record
            instrument_module = models.InstrumentModule(
                name=values["name"],
                description=values.get("description", ""),
                pixels=values.get("pixels", 1),
                available=True,  # start with all available
                instrument=instrument,
            )
            session.add(instrument_module)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print("A record with the name %s already exists.", values["name"])
    finally:
        session.close()


def add_or_update_instrument(session, values):
    telescope = (
        session.query(models.Telescope).filter_by(name=values["telescope"]).one()
    )
    values["telescope"] = telescope
    try:
        # Check if an instrument with the given name already exists
        instrument = (
            session.query(models.Instrument)
            .filter_by(
                name=values["name"],
                telescope=telescope,
            )
            .first()
        )

        if instrument:
            # If it exists, update the description
            for column in instrument.__table__.columns:
                column_name = column.name
                if column_name in values:
                    setattr(instrument, column_name, values[column_name])

        else:
            # If not, create a new instrument record
            filtered_values = {}
            for key in values:
                if hasattr(models.Instrument, key):
                    filtered_values[key] = values[key]
            filtered_values["available"] = True  # start with all available
            instrument = models.Instrument(
                **filtered_values,
            )
            session.add(instrument)
        for key, values in values["module"].items():
            add_or_update_module(session, values, instrument)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError as e:
        print(e)
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print("A record with the name %s already exists.", values["name"])
    finally:
        session.close()


def add_or_update_role(session, values):
    try:
        # Check if an user with the given name already exists
        role = (
            session.query(models.Role)
            .filter_by(
                name=values["name"],
            )
            .first()
        )

        if role:
            # If it exists, update the description

            for column in role.__table__.columns:
                column_name = column.name
                if column_name in values:
                    setattr(role, column_name, values[column_name])

        else:
            # If not, create a new source record
            filtered_values = {}
            for key in values:
                if hasattr(models.Role, key):
                    filtered_values[key] = values[key]
            role = models.Role(
                **filtered_values,
            )
            session.add(role)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError as e:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print(e)
    finally:
        session.close()


def add_or_update_user(session, values):
    try:
        # Check if an user with the given name already exists
        user = (
            session.query(models.User)
            .filter_by(
                first_name=values["first_name"],
                last_name=values["last_name"],
                username=values["username"],
            )
            .first()
        )
        roles = []
        for role in values["roles"]:
            try:
                roles += [session.query(models.Role).filter_by(name=role).one()]
            except sqlalchemy.orm.exc.NoResultFound:
                print("Role not found", role)
        if user:
            # If it exists, update the description

            for column in user.__table__.columns:
                column_name = column.name
                if column_name in values:
                    setattr(user, column_name, values[column_name])
            setattr(user, "roles", roles)
        else:
            # If not, create a new source record
            filtered_values = {}
            for key in values:
                if hasattr(models.User, key):
                    filtered_values[key] = values[key]
            filtered_values["roles"] = roles
            user = models.User(
                **filtered_values,
            )
            session.add(user)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError as e:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print(e)
    finally:
        session.close()


def add_or_update_sub_observing_program(session, values, observing_program):
    try:
        # Check if an module with the given name already exists
        sub_observing_program = (
            session.query(models.SubObservingProgram)
            .filter_by(
                name=values["name"],
                observing_program=observing_program,
            )
            .first()
        )
        if sub_observing_program:
            # If it exists, update the description
            sub_observing_program.short_name = values.get("short_name", "")
            sub_observing_program.observing_program = observing_program
        else:
            # If not, create a new instrument_module record
            sub_observing_program = models.SubObservingProgram(
                name=values["name"],
                short_name=values.get("short_name", ""),
                observing_program=observing_program,
            )
        session.add(sub_observing_program)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print("A record with the name %s already exists.", values["name"])
    finally:
        session.close()


def add_or_update_observing_program(session, values):
    instruments = []

    for instrument_name in values["instruments"]:
        instruments += [
            session.query(models.Instrument).filter_by(name=instrument_name).one()
        ]

    values["instruments"] = instruments
    try:
        # Check if an instrument with the given name already exists
        observing_program = (
            session.query(models.ObservingProgram)
            .filter_by(
                name=values["name"],
            )
            .first()
        )

        if observing_program:
            # If it exists, update the description
            for column in observing_program.__table__.columns:
                column_name = column.name
                if column_name in values:
                    if column_name == "lead":
                        logging.info("Updating lead")
                        user = (
                            session.query(models.User)
                            .filter_by(username=values["lead"])
                            .one()
                        )
                        setattr(observing_program, column_name, user)
                    else:
                        setattr(observing_program, column_name, values[column_name])

        else:
            # If not, create a new observing_program record
            filtered_values = {}
            for key in values:
                if hasattr(models.ObservingProgram, key):
                    if key == "lead":
                        try:
                            user = (
                                session.query(models.User)
                                .filter_by(username=values["lead"])
                                .one()
                            )
                        except sqlalchemy.orm.exc.NoResultFound:
                            print("User not found", values["lead"])
                            user = None
                        filtered_values[key] = user
                    elif key == "deputy_lead":
                        try:
                            user = (
                                session.query(models.User)
                                .filter_by(username=values["deputy_lead"])
                                .one()
                            )
                        except sqlalchemy.orm.exc.NoResultFound:
                            logging.error("User not found", values["deputy_lead"])
                            user = None
                        filtered_values[key] = user
                    else:
                        filtered_values[key] = values[key]
            observing_program = models.ObservingProgram(
                **filtered_values,
            )
            session.add(observing_program)
        try:
            for key, values in values["sub_observing_program"].items():
                add_or_update_sub_observing_program(session, values, observing_program)
        except KeyError:
            logging.info(
                "No sub_observing_programs found for observing_program %s",
                values["name"],
            )
            pass
        # Commit the transaction
        session.commit()
    except exc.IntegrityError as e:
        print(e)
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print("A record with the name %s already exists.", values["name"])
    finally:
        session.close()


def add_or_update_line(session, key, values):
    values["name"] = key
    try:
        # Check if an user with the given name already exists
        try:
            line = (
                session.query(models.Line)
                .filter_by(
                    name=key,
                )
                .one()
            )
        except sqlalchemy.orm.exc.NoResultFound:
            line = None

        if line:
            # If it exists, update the description

            for column in line.__table__.columns:
                column_name = column.name
                if column_name in values:
                    setattr(line, column_name, values[column_name])
        else:
            # If not, create a new source record
            filtered_values = {}
            for key in values:
                if hasattr(models.Line, key):
                    filtered_values[key] = values[key]
            filtered_values["available"] = True  # start with all available
            line = models.Line(
                **filtered_values,
            )
            session.add(line)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError as e:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print(e)
    finally:
        session.close()


def add_or_update_obs_mode(session, key, values):
    values["name"] = key
    try:
        # Check if an user with the given name already exists
        obs_mode = (
            session.query(models.ObsMode)
            .filter_by(
                name=key,
            )
            .first()
        )
        if obs_mode:
            # If it exists, update the description

            for column in obs_mode.__table__.columns:
                column_name = column.name
                if column_name in values:
                    setattr(obs_mode, column_name, values[column_name])
        else:
            # If not, create a new source record
            filtered_values = {}
            for key in values:
                if hasattr(models.ObsMode, key):
                    filtered_values[key] = values[key]
            obs_mode = models.ObsMode(
                **filtered_values,
            )
            session.add(obs_mode)

        # Commit the transaction
        session.commit()
    except exc.IntegrityError as e:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print(e)
    finally:
        session.close()


def add_or_update_data_transfer_routes(session, values):
    try:
        # Check if an user with the given name already exists
        data_transfer_route = (
            session.query(models.DataTransferRoute)
            .filter_by(
                name=values["name"],
            )
            .first()
        )
        if data_transfer_route:
            # If it exists, update the description
            origin = (
                session.query(models.DataLocation)
                .filter_by(name=values["origin_data_archive_short_name"])
                .one()
            )
            destination = (
                session.query(models.DataLocation)
                .filter_by(name=values["destination_data_archive_short_name"])
                .one()
            )
            data_transfer_route.name = values["name"]
            data_transfer_route.origin = origin
            data_transfer_route.data_transfer_method = values["data_transfer_method"]
            data_transfer_route.primary = values["primary"]
            data_transfer_route.destination = destination

        else:
            # If not, create a new source record
            origin = (
                session.query(models.DataLocation)
                .filter_by(name=values["origin_data_archive_short_name"])
                .one()
            )
            destination = (
                session.query(models.DataLocation)
                .filter_by(name=values["destination_data_archive_short_name"])
                .one()
            )
            data_transfer_route = models.DataTransferRoute(
                name=values["name"],
                origin_data_location=origin,
                destination_data_location=destination,
                data_transfer_method=values["data_transfer_method"],
                primary=values["primary"],
            )
            session.add(data_transfer_route)

        # Commit the transaction
        session.commit()
        logging.info("Added data_transfer_route %s", values["name"])
    except exc.IntegrityError as e:
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        print(f"An error occurred: {e}")
    finally:
        session.close()


def add_or_update_site_and_locations(session, site_key, values):
    """Add or update a site and its associated data locations.

    Args:
        session: SQLAlchemy session
        site_key: The key from the TOML file (e.g. "CologneStaging")
        values: Dictionary containing site and location information
    """
    # Extract site information
    site_values = {
        "name": values["name"],
        "short_name": values["short_name"],
        "site_location": values["site_location"],
    }

    # Add or update site
    site = session.query(Site).filter_by(short_name=site_values["short_name"]).first()
    if site is None:
        site = Site(**site_values)
        session.add(site)
        session.flush()  # Flush to get the site ID
    else:
        for key, value in site_values.items():
            setattr(site, key, value)

    # Process locations
    for key, value in values.items():
        logging.info("Processing key %s", key)
        if key == "location":
            # Process each location under the site
            logging.info("Processing locations %s", value.items())
            for location_name, location_data in value.items():
                logging.info("Processing location %s", location_name)

                # Create location values dict
                location_values = {
                    "name": location_data["name"],
                    "location_type": LocationType(location_data["location_type"]),
                    "site_id": site.id,
                    "storage_type": StorageType(location_data["storage_type"]),
                    "active": location_data.get("active", True),
                    "priority": location_data.get("priority", 0),
                }

                # Add storage-specific fields
                if location_data["storage_type"] == "disk":
                    location_values.update(
                        {
                            "path": location_data["path"],
                            "host": location_data["host"],
                            "user": location_data["user"],
                        }
                    )
                elif location_data["storage_type"] == "s3":
                    location_values.update(
                        {
                            "bucket_name": location_data["bucket_name"],
                            "region": location_data.get("region"),
                            "endpoint_url": location_data.get("endpoint_url"),
                        }
                    )
                elif location_data["storage_type"] == "tape":
                    location_values.update(
                        {
                            "library_name": location_data.get("library_name"),
                            "mount_path": location_data.get("mount_path"),
                        }
                    )

                # Add or update location
                location = (
                    session.query(DataLocation)
                    .filter_by(site_id=site.id, name=location_values["name"])
                    .first()
                )

                if location is None:
                    if location_data["storage_type"] == "disk":
                        location = DiskDataLocation(**location_values)
                    elif location_data["storage_type"] == "s3":
                        location = S3DataLocation(**location_values)
                    elif location_data["storage_type"] == "tape":
                        location = TapeDataLocation(**location_values)
                    session.add(location)
                else:
                    for key, value in location_values.items():
                        setattr(location, key, value)

                logging.info(
                    "Added location %s (id: %s, site_id: %s, type: %s, storage: %s, active: %s, priority: %s)",
                    location.name,
                    location.id,
                    location.site_id,
                    location.location_type,
                    location.storage_type,
                    location.active,
                    location.priority,
                )

    # Commit all changes
    session.commit()


[docs] def opsdb_initialization( verbose=False, drop=False, local_url=None, docker_compose=False, database=None, data_archive_mode="development", ): """This function provisions the ccat_obs_db for testing purposes""" import toml from git import Repo if verbose: logging.getLogger().setLevel(logging.DEBUG) session, engine = init_ccat_ops_db(drop=drop, database=database) repo_url = "git@github.com:ccatobs/data-center.git" if not local_url: logging.info("Cloning data-center repository from %s", repo_url) target_dir = "/tmp/data-center" os.system("rm -rf /tmp/data-center") Repo.clone_from(repo_url, target_dir, branch="main") else: target_dir = local_url # First we setup the observatory reading the defined observatories in the tomls # files in the observatory folder of the data-center repository observatory_folder = target_dir + "/observatory" toml_files = get_toml_files_in_folder(observatory_folder) logging.debug("Adding observatories from %s", observatory_folder) for toml_file in toml_files: # here we read the toml file and for every entry match the defined fields to the # fields in the Observatory sqlalchemy model with open(toml_file, "r") as f: data = toml.load(f) for key, values in data.items(): add_or_update_observatory(session, values) # Now the same for telescope telescope_folder = target_dir + "/telescope" toml_files = get_toml_files_in_folder(telescope_folder) logging.debug("Adding telescopes from %s", telescope_folder) for toml_file in toml_files: with open(toml_file, "r") as f: data = toml.load(f) for key, values in data.items(): add_or_update_telescope(session, values) # Now the same for instruments instrument_folder = target_dir + "/instruments" toml_files = get_toml_files_in_folder(instrument_folder) logging.debug("Adding instruments from %s", instrument_folder) for toml_file in toml_files: with open(toml_file, "r") as f: data = toml.load(f) for key, values in data.items(): add_or_update_instrument(session, values) # roles_folder = target_dir + "/roles" # toml_files = get_toml_files_in_folder(roles_folder) # logging.debug("Adding roles from %s", roles_folder) # for toml_file in toml_files: # with open(toml_file, "r") as f: # data = toml.load(f) # for key, values in data.items(): # add_or_update_role(session, values) # Now the same for users # logging.debug("Adding users from %s", target_dir + "/users") # users_folder = target_dir + "/users" # toml_files = get_toml_files_in_folder(users_folder) # for toml_file in toml_files: # with open(toml_file, "r") as f: # data = toml.load(f) # for key, values in data.items(): # add_or_update_user(session, values) # Now the same for observing_programs observing_programs_folder = target_dir + "/observing_programs" toml_files = get_toml_files_in_folder(observing_programs_folder) logging.debug("Adding observing_programs from %s", observing_programs_folder) for toml_file in toml_files: with open(toml_file, "r") as f: data = toml.load(f) for key, values in data.items(): add_or_update_observing_program(session, values) # Now the same for lines lines_folder = target_dir + "/lines" toml_files = get_toml_files_in_folder(lines_folder) logging.debug("Adding observing_programs from %s", lines_folder) for toml_file in toml_files: with open(toml_file, "r") as f: data = toml.load(f) for key, values in data.items(): add_or_update_line(session, key, values) # Now the same for obs_modes obs_modes_folder = target_dir + "/obs_modes" toml_files = get_toml_files_in_folder(obs_modes_folder) logging.debug("Adding obs_modes from %s", obs_modes_folder) for toml_file in toml_files: with open(toml_file, "r") as f: data = toml.load(f) for key, values in data.items(): add_or_update_obs_mode(session, key, values) # Replace the data archives section with sites sites_folder = target_dir + "/sites" toml_files = get_toml_files_in_folder(sites_folder) logging.debug("Adding sites and locations from %s", sites_folder) for toml_file in toml_files: # Only process the file matching the specified data_archive_mode if f"sites.{data_archive_mode}.toml" in toml_file: with open(toml_file, "r") as f: data = toml.load(f) for key, values in data.items(): # Skip location entries as they'll be handled by add_or_update_site_and_locations if not key.endswith(".location"): logging.info("Adding site %s", values["name"]) add_or_update_site_and_locations(session, key, values) # Now the same for primary data transfer routes data_transfer_routes_folder = target_dir + "/data_transfer_routes" toml_files = get_toml_files_in_folder(data_transfer_routes_folder) logging.debug( "Adding primary data transfer routes from %s", data_transfer_routes_folder, ) # for toml_file in toml_files: # if f"data_transfer_routes.{data_archive_mode}.toml" in toml_file: # with open(toml_file, "r") as f: # data = toml.load(f) # for key, values in data.items(): # # add logging # logging.info( # "Adding primary data transfer route %s", values["name"] # ) # add_or_update_data_transfer_routes(session, values) # Generate soures and obs_units input_folder = target_dir + "/obs_units" init_obs_units( installation_path=input_folder, input_yaml_file="input_file_obs_units_to_db.yaml", session=session, ) # Seed development tokens if in development mode if data_archive_mode == "development": try: from .seed_dev_tokens import seed_dev_tokens seed_dev_tokens(session, data_archive_mode) except ImportError as e: logging.warning(f"Could not import seed_dev_tokens: {e}") except Exception as e: logging.warning(f"Error seeding development tokens: {e}") # Don't fail initialization if seeding fails if docker_compose: successfull_initialization() for _ in range(5): time.sleep(10) return session, engine
# this function is a click entry point and has a click command argument to drop the # database before creating it @click.command() @click.option( "-v", "--verbose", is_flag=True, help="Enable verbose output (debug level)." ) @click.option("--drop", is_flag=True, help="Drop all tables before creating them") # add option to read local_url from command line @click.option("--local_url", default=None, help="Local url to use for git clone") @click.option("--docker_compose", is_flag=True, default=None) @click.option("--database", default=None, help="Database URL") @click.option("--data_archive_mode", default="development", help="Data archive mode") def opsdb_init( verbose=False, drop=False, local_url=None, docker_compose=False, database=None, data_archive_mode=None, ): opsdb_initialization( verbose, drop, local_url, docker_compose, database, data_archive_mode, )