Source code for ccat_ops_db.init_obs_units

import logging
import os
import re

import yaml
import json

from .utils import (
    add_or_update_source,
    add_or_update_obs_unit,
    add_pre_scheduled_slot,
)

# Instead, get a logger for this module
logger = logging.getLogger(__name__)


def read_tiling_parameters(tiling_parameters_file):
    """reads the tiling parameter file

    The format of the file is
    tile_iD / tile_offset_x / tile_offset_y / x_or_y / tile_unit_scaling_x / tile_unit_scaling_y / edge / inpar_file / goal_ncycle / status_ncycle

    lines starting with # are comments
    empty lines are ignored
    priority_in_tiling is the order in the file

    """

    logger.debug("Reading tiling parameters from %s", tiling_parameters_file)
    with open(tiling_parameters_file, "r") as f:
        lines = f.readlines()

    tiling_parameters = []
    priority_in_tiling = 1
    for line in lines:
        if not line.startswith("#") and line.strip():
            values = line.strip().split("/")
            tiling_parameter = {
                "priority_in_tiling": priority_in_tiling,
                "tile_id": values[0].strip(),
                "tile_offset_x": values[1].strip(),
                "tile_offset_y": values[2].strip(),
                "x_or_y": values[3].strip(),
                "tile_unit_scaling_x": values[4].strip(),
                "tile_unit_scaling_y": values[5].strip(),
                "edge": values[6].strip(),
                "inpar_file": values[7].strip(),
                "goal_ncycle": values[8].strip(),
                "status_ncycle": values[9].strip(),
            }
            tiling_parameters.append(tiling_parameter)
            priority_in_tiling += 1
    return tiling_parameters


def read_inpar_parameters(inpar_parameters_file):
    """reads the inpar parameter file

    This file is a text file with the following format:

    # This is a comment
    KEYWORD1=VALUE1
    KEYWORD2=VALUE2

    There can be any amount of spaces around the = sign
    Empty lines are ignored

    """
    logger.debug("Reading inpar parameters from %s", inpar_parameters_file)
    with open(inpar_parameters_file, "r") as f:
        lines = f.readlines()

    inpar_parameters = {}
    for line in lines:
        if not line.startswith("#") and line.strip():
            key, value = line.strip().split("=")[0:2]
            key = key.strip()
            value = value.split("#")[0].strip()
            if value.isdigit():
                value = int(value)
            elif value.replace(".", "", 1).lstrip("-").isdigit():
                value = float(value)
            elif value == "UNKNOWN":
                continue
            # TODO: in principle changing all flags to Boolean is good,
            # but it needs to be consistent with the observing scripts
            #
            # elif value.upper() == "Y":
            #     value = True
            # elif value.upper() == "N":
            #     value = False
            inpar_parameters[key] = value

    inpar_parameters["name"] = os.path.basename(inpar_parameters_file)

    return inpar_parameters


def clean_line(line):
    return ",".join(value.strip() for value in line.split(","))


def extract_from_json_list(tbl, obsunit_name):
    """
    Extract a json string for a given obsunit

    Parameters
    ----------
    tbl : list of json
        Each json string has the name of obsunit and parameters. For example,
        If it is table_azimuth, the combination of
        elevation, rising/setting, az_min/max and duration as a list
        If it is table_mapping_params, then the mapping parameters.
    obsunit_name : str
        Name of the obsunit to be searched in the json string list

    Returns
    -------
    extracted : json
        A json string extracted from tbl that specifies the parameters
        of a given obsunit
    """

    matching_dicts = [d for d in tbl if d.get("name") == obsunit_name]

    if len(matching_dicts) == 0:
        return None
    elif len(matching_dicts) == 1:
        return json.dumps(matching_dicts[0])
    else:
        raise ValueError(f"Multiple entry found with name '{obsunit_name}'")


[docs] def read_obs_units( obsunits_list, obs_unit_type="chai_obs_unit", inpar_parameters_folder=None, tiling_parameters_folder=None, table_azimuth_file=None, table_mapping_params_file=None, session=None, ): """reads csv file with first line as header and returns a list of dictionaries""" log = logging.getLogger(__name__) with open(obsunits_list, "r") as f: lines = f.readlines() header = lines[0].strip().split(",") obs_units = [] for line in lines[1:]: cleaned_line = clean_line(line) values = cleaned_line.split(",") obs_unit = dict(zip(header, values)) obs_unit = {k: v for k, v in obs_unit.items() if v} obs_units.append(obs_unit) if table_azimuth_file is not None: with open(table_azimuth_file, "r") as f: table_azimuth = json.load(f) else: table_azimuth = None if table_mapping_params_file is not None: with open(table_mapping_params_file, "r") as f: table_mapping_params = json.load(f) for obs_unit in obs_units: source = obs_unit["source"] instrument = obs_unit["instrument"] # inpar/tiling_parameters_folder have the structure # inpar_parameters_folder/line/inpar_... if obs_unit_type == "chai_obs_unit": primary_line = obs_unit["p_line"] if obs_unit_type == "prime_cam_obs_unit": module = obs_unit["module"] # convert string to boolean obs_unit["available"] = obs_unit["available"] == "True" obs_unit["pre_scheduled_basis"] = obs_unit["pre_scheduled_basis"] == "True" # azimuth range azimuth_range = extract_from_json_list(table_azimuth, obs_unit["name"]) # prime cam mapping parameters mapping_parameters = None if table_mapping_params_file is not None: mapping_parameters = extract_from_json_list( table_mapping_params, obs_unit["name"] ) # I have files with the format inpar_otf_totalpower_[map|cross]_SOURCEMAME_[x|y] # we read the filenames in the inpar_parameters_folder and match the source name # to the source name in the obs_unit inpar_parameters = None if inpar_parameters_folder is not None: inpar_parameter_files_used = [ file for file in os.listdir(inpar_parameters_folder) if file.startswith( f"inpar_{obs_unit['obs_mode']}_{source}_{instrument}_{primary_line}" ) and file[-1] != "~" ] # inpar_parameter_files_used = [] # for inpar_parameters_file in inpar_parameters_files: # # WARNING: this works only under the assumption that # # there is no pair of sources like Orion and Orion-KL # if source in inpar_parameters_file and primary_line in inpar_parameters_file: # inpar_parameter_files_used.append(inpar_parameters_file) obs_unit["inpar_parameters"] = inpar_parameter_files_used inpar_parameters = [] for inpar_parameters_file in inpar_parameter_files_used: inpar_parameters_file_full = os.path.join( inpar_parameters_folder, inpar_parameters_file ) inpar_parameters.append( read_inpar_parameters(inpar_parameters_file_full) ) tiling_parameters = None if tiling_parameters_folder is not None: tiling_parameters_files = [ file for file in os.listdir(tiling_parameters_folder) if file.startswith(f"tiling_{obs_unit['obs_mode']}_{source}_{instrument}_{primary_line}") and file[-1] != "~" ] if len(tiling_parameters_files) == 0: log.error(f"No associated tiling file for ObsUnit {obs_unit['name']} (start with tiling_{obs_unit['obs_mode']}_{source}_{instrument}_{primary_line}") exit(1) elif len(tiling_parameters_files) > 1: log.error( f"More than one tiling input file {tiling_parameters_files} for an ObsUnit is not foreseen" ) exit(1) obs_unit["tiling_parameters"] = tiling_parameters_files[0] tiling_parameters_file_full = os.path.join( tiling_parameters_folder, tiling_parameters_files[0] ) tiling_parameters = read_tiling_parameters(tiling_parameters_file_full) if obs_unit_type == "chai_obs_unit": instrument_module_configurations = [ { "type": "chai_module_configuration", "instrument": obs_unit["instrument"], "instrument_module": obs_unit["p_array"], "primary": True, "line": primary_line, } ] if "s_array" in obs_unit.keys(): instrument_module_configurations.append( { "type": "chai_module_configuration", "instrument": obs_unit["instrument"], "instrument_module": obs_unit["s_array"], "primary": False, "line": obs_unit["s_line"], } ) elif obs_unit_type == "prime_cam_obs_unit": instrument_module_configurations = [ { "type": "prime_cam_module_configuration", "instrument": obs_unit["instrument"], "instrument_module": module, "primary": True, }, ] add_or_update_obs_unit( session, obs_unit, instrument_module_configurations=instrument_module_configurations, obs_unit_type=obs_unit_type, inpar_parameters_list=inpar_parameters, tiling_parameters=tiling_parameters, azimuth_range=azimuth_range, mapping_parameters=mapping_parameters, ) return obs_units
def match_kosma_source_patterns(source_string): pattern = r"SNAM\s*=\s*(.*?);.*?SLAM\s*=\s*(.*?)(?:s)?;.*?SBET\s*=\s*(.*?);.*?VLSR\s*=\s*(.*?);" # pattern = r"SNAM=(.*?);.*?SLAM=(.*?)(?:s)?;.*?SBET=(.*?);.*?VLSR=(.*?);" match = re.search(pattern, source_string) if match: source_name = match.group(1) slam = match.group(2) sbet = match.group(3) vlsr = match.group(4) return source_name, slam, sbet, vlsr
[docs] def read_sources(source_file, file_format, source_type, session): """ Read a source file and add sources to the database session Parameters ---------- source_file : str Input file name with source namd and coordinates file_format : str kosma (kosma_software source file format) or csv. If csv, the coordinate system is assumed to be icrs. source_type : str None is fixed target with coordinates. Other names should be compatible with the object in models. session : database session """ logger.debug("Reading sources from file %s", source_file) if file_format == "kosma": with open(source_file, "r") as f: sources = f.readlines() for source_string in sources: logger.debug("Processing line: %s", source_string) if source_string.startswith("COMMON"): sbas = source_string.split("=")[1].strip() if sbas == "-1": frame = "icrs" logger.info("Selected frame: icrs") elif sbas == "0": frame = "galactic" logger.info("Selected frame: galactic") elif source_string.startswith("SNAM"): logger.debug( "Adding or updating source: %s with the frame %s", source_string, frame, ) source_name, slam, sbet, vlsr = match_kosma_source_patterns( source_string ) values = {} values["name"] = source_name values["slam"] = slam values["sbet"] = sbet values["vlsr"] = vlsr values["frame"] = frame add_or_update_source(session, values, source_type=source_type) elif source_string.startswith("#"): logger.info("Skipping comment: %s", source_string) else: with open(source_file, "r") as f: sources_lines = f.readlines() # comma separated values header = sources_lines[0].strip().split(",") sources = [] for source in sources_lines[1:]: if not source.startswith("#"): values = source.strip().split(",") source = dict(zip(header, values)) if source_type is None or source_type == "fixed_source": source["frame"] = "icrs" key_mapping = {"RA": "slam", "DEC": "sbet"} new_source = { key_mapping.get(k, k): v for k, v in source.items() } sources.append(new_source) elif source_type == "solar_system_object": sources.append(source) elif source_type == "constant_elevation_source": source["frame"] = "icrs" key_mapping = { "RAmin": "slam_min", "RAmax": "slam_max", "DECmin": "sbet_min", "DECmax": "sbet_max", } new_source = { key_mapping.get(k, k): v for k, v in source.items() } sources.append(new_source) for source in sources: add_or_update_source( session, source, source_type=source_type, )
[docs] def read_pre_scheduled_slots(pre_scheduled_file, session): """ Read pre-scheduled slots and add them to the database session Parameters ---------- pre_scheduled_file : str Input file name with pre-scheduled obsunits and time session : database session """ with open(pre_scheduled_file, "r") as f: lines = f.readlines() header = lines[0].strip().split(",") slots = [] for line in lines[1:]: cleaned_line = clean_line(line) values = cleaned_line.split(",") slot = dict(zip(header, values)) slot = {k: v for k, v in slot.items() if v} slots.append(slot) for slot in slots: add_pre_scheduled_slot( session, slot, )
[docs] def init_obs_units(installation_path, input_yaml_file, session): input_yaml_file = os.path.join(installation_path, input_yaml_file) logger.info("Reading input yaml file %s", input_yaml_file) with open(input_yaml_file) as f: config_dict = yaml.safe_load(f) for type_in_dict, source_type in zip( ["sources", "sources_solar_system_objects", "sources_constant_elevation"], ["fixed_source", "solar_system_object", "constant_elevation_source"], ): logger.info("Adding %s sources", config_dict[type_in_dict].items()) for file_format, source_files in config_dict[type_in_dict].items(): if not isinstance(source_files, list): source_files = [source_files] for source_file in source_files: # load the source file relative to the installation path of this file logger.info("Adding sources from file %s", source_file) logger.info("Installation path is %s", installation_path) logger.info("Source file is %s", source_file) logger.info("Source type is %s", source_type) source_file = os.path.join(installation_path, source_file) read_sources( source_file, file_format=file_format, source_type=source_type, session=session, ) # Now fill the obs units for instrument, obsunits_list in config_dict["obsunits"].items(): # load the obsunits file relative to the installation path of this file logger.info("Adding obs units for instrument %s", instrument) table_azimuth_file = os.path.join( installation_path, config_dict["table_azimuth"] ) if instrument == "chai": obsunits_list = config_dict["obsunits"]["chai"]["obsunits_list"] obsunits_list = os.path.join(installation_path, obsunits_list) inpar_parameters_folder = config_dict["obsunits"]["chai"][ "inpar_parameters_folder" ] inpar_parameters_folder = os.path.join( installation_path, inpar_parameters_folder ) tilings_parameters_folder = config_dict["obsunits"]["chai"][ "tiling_parameters_folder" ] tilings_parameters_folder = os.path.join( installation_path, tilings_parameters_folder ) read_obs_units( obsunits_list=obsunits_list, obs_unit_type="chai_obs_unit", inpar_parameters_folder=inpar_parameters_folder, tiling_parameters_folder=tilings_parameters_folder, table_azimuth_file=table_azimuth_file, session=session, ) if instrument == "prime_cam": obsunits_list = config_dict["obsunits"]["prime_cam"]["obsunits_list"] obsunits_list = os.path.join(installation_path, obsunits_list) table_mapping_params_file = config_dict["obsunits"]["prime_cam"][ "table_mapping_params" ] table_mapping_params_file = os.path.join( installation_path, table_mapping_params_file ) read_obs_units( obsunits_list, obs_unit_type="prime_cam_obs_unit", table_azimuth_file=table_azimuth_file, table_mapping_params_file=table_mapping_params_file, session=session, ) # Fill PreScheduledSlots pre_scheduled_slots = os.path.join( installation_path, config_dict["pre_scheduled_slots"] ) read_pre_scheduled_slots(pre_scheduled_slots, session=session)