import logging
import os
import re
import yaml
import json
from .utils import (
add_or_update_source,
add_or_update_obs_unit,
add_pre_scheduled_slot,
)
# Instead, get a logger for this module
logger = logging.getLogger(__name__)
def read_tiling_parameters(tiling_parameters_file):
"""reads the tiling parameter file
The format of the file is
tile_iD / tile_offset_x / tile_offset_y / x_or_y / tile_unit_scaling_x / tile_unit_scaling_y / edge / inpar_file / goal_ncycle / status_ncycle
lines starting with # are comments
empty lines are ignored
priority_in_tiling is the order in the file
"""
logger.debug("Reading tiling parameters from %s", tiling_parameters_file)
with open(tiling_parameters_file, "r") as f:
lines = f.readlines()
tiling_parameters = []
priority_in_tiling = 1
for line in lines:
if not line.startswith("#") and line.strip():
values = line.strip().split("/")
tiling_parameter = {
"priority_in_tiling": priority_in_tiling,
"tile_id": values[0].strip(),
"tile_offset_x": values[1].strip(),
"tile_offset_y": values[2].strip(),
"x_or_y": values[3].strip(),
"tile_unit_scaling_x": values[4].strip(),
"tile_unit_scaling_y": values[5].strip(),
"edge": values[6].strip(),
"inpar_file": values[7].strip(),
"goal_ncycle": values[8].strip(),
"status_ncycle": values[9].strip(),
}
tiling_parameters.append(tiling_parameter)
priority_in_tiling += 1
return tiling_parameters
def read_inpar_parameters(inpar_parameters_file):
"""reads the inpar parameter file
This file is a text file with the following format:
# This is a comment
KEYWORD1=VALUE1
KEYWORD2=VALUE2
There can be any amount of spaces around the = sign
Empty lines are ignored
"""
logger.debug("Reading inpar parameters from %s", inpar_parameters_file)
with open(inpar_parameters_file, "r") as f:
lines = f.readlines()
inpar_parameters = {}
for line in lines:
if not line.startswith("#") and line.strip():
key, value = line.strip().split("=")[0:2]
key = key.strip()
value = value.split("#")[0].strip()
if value.isdigit():
value = int(value)
elif value.replace(".", "", 1).lstrip("-").isdigit():
value = float(value)
elif value == "UNKNOWN":
continue
# TODO: in principle changing all flags to Boolean is good,
# but it needs to be consistent with the observing scripts
#
# elif value.upper() == "Y":
# value = True
# elif value.upper() == "N":
# value = False
inpar_parameters[key] = value
inpar_parameters["name"] = os.path.basename(inpar_parameters_file)
return inpar_parameters
def clean_line(line):
return ",".join(value.strip() for value in line.split(","))
def extract_from_json_list(tbl, obsunit_name):
"""
Extract a json string for a given obsunit
Parameters
----------
tbl : list of json
Each json string has the name of obsunit and parameters. For example,
If it is table_azimuth, the combination of
elevation, rising/setting, az_min/max and duration as a list
If it is table_mapping_params, then the mapping parameters.
obsunit_name : str
Name of the obsunit to be searched in the json string list
Returns
-------
extracted : json
A json string extracted from tbl that specifies the parameters
of a given obsunit
"""
matching_dicts = [d for d in tbl if d.get("name") == obsunit_name]
if len(matching_dicts) == 0:
return None
elif len(matching_dicts) == 1:
return json.dumps(matching_dicts[0])
else:
raise ValueError(f"Multiple entry found with name '{obsunit_name}'")
[docs]
def read_obs_units(
obsunits_list,
obs_unit_type="chai_obs_unit",
inpar_parameters_folder=None,
tiling_parameters_folder=None,
table_azimuth_file=None,
table_mapping_params_file=None,
session=None,
):
"""reads csv file with first line as header and returns a list of dictionaries"""
log = logging.getLogger(__name__)
with open(obsunits_list, "r") as f:
lines = f.readlines()
header = lines[0].strip().split(",")
obs_units = []
for line in lines[1:]:
cleaned_line = clean_line(line)
values = cleaned_line.split(",")
obs_unit = dict(zip(header, values))
obs_unit = {k: v for k, v in obs_unit.items() if v}
obs_units.append(obs_unit)
if table_azimuth_file is not None:
with open(table_azimuth_file, "r") as f:
table_azimuth = json.load(f)
else:
table_azimuth = None
if table_mapping_params_file is not None:
with open(table_mapping_params_file, "r") as f:
table_mapping_params = json.load(f)
for obs_unit in obs_units:
source = obs_unit["source"]
instrument = obs_unit["instrument"]
# inpar/tiling_parameters_folder have the structure
# inpar_parameters_folder/line/inpar_...
if obs_unit_type == "chai_obs_unit":
primary_line = obs_unit["p_line"]
if obs_unit_type == "prime_cam_obs_unit":
module = obs_unit["module"]
# convert string to boolean
obs_unit["available"] = obs_unit["available"] == "True"
obs_unit["pre_scheduled_basis"] = obs_unit["pre_scheduled_basis"] == "True"
# azimuth range
azimuth_range = extract_from_json_list(table_azimuth, obs_unit["name"])
# prime cam mapping parameters
mapping_parameters = None
if table_mapping_params_file is not None:
mapping_parameters = extract_from_json_list(
table_mapping_params, obs_unit["name"]
)
# I have files with the format inpar_otf_totalpower_[map|cross]_SOURCEMAME_[x|y]
# we read the filenames in the inpar_parameters_folder and match the source name
# to the source name in the obs_unit
inpar_parameters = None
if inpar_parameters_folder is not None:
inpar_parameter_files_used = [
file
for file in os.listdir(inpar_parameters_folder)
if file.startswith(
f"inpar_{obs_unit['obs_mode']}_{source}_{instrument}_{primary_line}"
)
and file[-1] != "~"
]
# inpar_parameter_files_used = []
# for inpar_parameters_file in inpar_parameters_files:
# # WARNING: this works only under the assumption that
# # there is no pair of sources like Orion and Orion-KL
# if source in inpar_parameters_file and primary_line in inpar_parameters_file:
# inpar_parameter_files_used.append(inpar_parameters_file)
obs_unit["inpar_parameters"] = inpar_parameter_files_used
inpar_parameters = []
for inpar_parameters_file in inpar_parameter_files_used:
inpar_parameters_file_full = os.path.join(
inpar_parameters_folder, inpar_parameters_file
)
inpar_parameters.append(
read_inpar_parameters(inpar_parameters_file_full)
)
tiling_parameters = None
if tiling_parameters_folder is not None:
tiling_parameters_files = [
file
for file in os.listdir(tiling_parameters_folder)
if file.startswith(f"tiling_{obs_unit['obs_mode']}_{source}_{instrument}_{primary_line}")
and file[-1] != "~"
]
if len(tiling_parameters_files) == 0:
log.error(f"No associated tiling file for ObsUnit {obs_unit['name']} (start with tiling_{obs_unit['obs_mode']}_{source}_{instrument}_{primary_line}")
exit(1)
elif len(tiling_parameters_files) > 1:
log.error(
f"More than one tiling input file {tiling_parameters_files} for an ObsUnit is not foreseen"
)
exit(1)
obs_unit["tiling_parameters"] = tiling_parameters_files[0]
tiling_parameters_file_full = os.path.join(
tiling_parameters_folder, tiling_parameters_files[0]
)
tiling_parameters = read_tiling_parameters(tiling_parameters_file_full)
if obs_unit_type == "chai_obs_unit":
instrument_module_configurations = [
{
"type": "chai_module_configuration",
"instrument": obs_unit["instrument"],
"instrument_module": obs_unit["p_array"],
"primary": True,
"line": primary_line,
}
]
if "s_array" in obs_unit.keys():
instrument_module_configurations.append(
{
"type": "chai_module_configuration",
"instrument": obs_unit["instrument"],
"instrument_module": obs_unit["s_array"],
"primary": False,
"line": obs_unit["s_line"],
}
)
elif obs_unit_type == "prime_cam_obs_unit":
instrument_module_configurations = [
{
"type": "prime_cam_module_configuration",
"instrument": obs_unit["instrument"],
"instrument_module": module,
"primary": True,
},
]
add_or_update_obs_unit(
session,
obs_unit,
instrument_module_configurations=instrument_module_configurations,
obs_unit_type=obs_unit_type,
inpar_parameters_list=inpar_parameters,
tiling_parameters=tiling_parameters,
azimuth_range=azimuth_range,
mapping_parameters=mapping_parameters,
)
return obs_units
def match_kosma_source_patterns(source_string):
pattern = r"SNAM\s*=\s*(.*?);.*?SLAM\s*=\s*(.*?)(?:s)?;.*?SBET\s*=\s*(.*?);.*?VLSR\s*=\s*(.*?);"
# pattern = r"SNAM=(.*?);.*?SLAM=(.*?)(?:s)?;.*?SBET=(.*?);.*?VLSR=(.*?);"
match = re.search(pattern, source_string)
if match:
source_name = match.group(1)
slam = match.group(2)
sbet = match.group(3)
vlsr = match.group(4)
return source_name, slam, sbet, vlsr
[docs]
def read_sources(source_file, file_format, source_type, session):
"""
Read a source file and add sources to the database session
Parameters
----------
source_file : str
Input file name with source namd and coordinates
file_format : str
kosma (kosma_software source file format) or csv.
If csv, the coordinate system is assumed to be icrs.
source_type : str
None is fixed target with coordinates. Other names should be compatible with
the object in models.
session : database session
"""
logger.debug("Reading sources from file %s", source_file)
if file_format == "kosma":
with open(source_file, "r") as f:
sources = f.readlines()
for source_string in sources:
logger.debug("Processing line: %s", source_string)
if source_string.startswith("COMMON"):
sbas = source_string.split("=")[1].strip()
if sbas == "-1":
frame = "icrs"
logger.info("Selected frame: icrs")
elif sbas == "0":
frame = "galactic"
logger.info("Selected frame: galactic")
elif source_string.startswith("SNAM"):
logger.debug(
"Adding or updating source: %s with the frame %s",
source_string,
frame,
)
source_name, slam, sbet, vlsr = match_kosma_source_patterns(
source_string
)
values = {}
values["name"] = source_name
values["slam"] = slam
values["sbet"] = sbet
values["vlsr"] = vlsr
values["frame"] = frame
add_or_update_source(session, values, source_type=source_type)
elif source_string.startswith("#"):
logger.info("Skipping comment: %s", source_string)
else:
with open(source_file, "r") as f:
sources_lines = f.readlines()
# comma separated values
header = sources_lines[0].strip().split(",")
sources = []
for source in sources_lines[1:]:
if not source.startswith("#"):
values = source.strip().split(",")
source = dict(zip(header, values))
if source_type is None or source_type == "fixed_source":
source["frame"] = "icrs"
key_mapping = {"RA": "slam", "DEC": "sbet"}
new_source = {
key_mapping.get(k, k): v for k, v in source.items()
}
sources.append(new_source)
elif source_type == "solar_system_object":
sources.append(source)
elif source_type == "constant_elevation_source":
source["frame"] = "icrs"
key_mapping = {
"RAmin": "slam_min",
"RAmax": "slam_max",
"DECmin": "sbet_min",
"DECmax": "sbet_max",
}
new_source = {
key_mapping.get(k, k): v for k, v in source.items()
}
sources.append(new_source)
for source in sources:
add_or_update_source(
session,
source,
source_type=source_type,
)
[docs]
def read_pre_scheduled_slots(pre_scheduled_file, session):
"""
Read pre-scheduled slots and add them to the database session
Parameters
----------
pre_scheduled_file : str
Input file name with pre-scheduled obsunits and time
session : database session
"""
with open(pre_scheduled_file, "r") as f:
lines = f.readlines()
header = lines[0].strip().split(",")
slots = []
for line in lines[1:]:
cleaned_line = clean_line(line)
values = cleaned_line.split(",")
slot = dict(zip(header, values))
slot = {k: v for k, v in slot.items() if v}
slots.append(slot)
for slot in slots:
add_pre_scheduled_slot(
session,
slot,
)
[docs]
def init_obs_units(installation_path, input_yaml_file, session):
input_yaml_file = os.path.join(installation_path, input_yaml_file)
logger.info("Reading input yaml file %s", input_yaml_file)
with open(input_yaml_file) as f:
config_dict = yaml.safe_load(f)
for type_in_dict, source_type in zip(
["sources", "sources_solar_system_objects", "sources_constant_elevation"],
["fixed_source", "solar_system_object", "constant_elevation_source"],
):
logger.info("Adding %s sources", config_dict[type_in_dict].items())
for file_format, source_files in config_dict[type_in_dict].items():
if not isinstance(source_files, list):
source_files = [source_files]
for source_file in source_files:
# load the source file relative to the installation path of this file
logger.info("Adding sources from file %s", source_file)
logger.info("Installation path is %s", installation_path)
logger.info("Source file is %s", source_file)
logger.info("Source type is %s", source_type)
source_file = os.path.join(installation_path, source_file)
read_sources(
source_file,
file_format=file_format,
source_type=source_type,
session=session,
)
# Now fill the obs units
for instrument, obsunits_list in config_dict["obsunits"].items():
# load the obsunits file relative to the installation path of this file
logger.info("Adding obs units for instrument %s", instrument)
table_azimuth_file = os.path.join(
installation_path, config_dict["table_azimuth"]
)
if instrument == "chai":
obsunits_list = config_dict["obsunits"]["chai"]["obsunits_list"]
obsunits_list = os.path.join(installation_path, obsunits_list)
inpar_parameters_folder = config_dict["obsunits"]["chai"][
"inpar_parameters_folder"
]
inpar_parameters_folder = os.path.join(
installation_path, inpar_parameters_folder
)
tilings_parameters_folder = config_dict["obsunits"]["chai"][
"tiling_parameters_folder"
]
tilings_parameters_folder = os.path.join(
installation_path, tilings_parameters_folder
)
read_obs_units(
obsunits_list=obsunits_list,
obs_unit_type="chai_obs_unit",
inpar_parameters_folder=inpar_parameters_folder,
tiling_parameters_folder=tilings_parameters_folder,
table_azimuth_file=table_azimuth_file,
session=session,
)
if instrument == "prime_cam":
obsunits_list = config_dict["obsunits"]["prime_cam"]["obsunits_list"]
obsunits_list = os.path.join(installation_path, obsunits_list)
table_mapping_params_file = config_dict["obsunits"]["prime_cam"][
"table_mapping_params"
]
table_mapping_params_file = os.path.join(
installation_path, table_mapping_params_file
)
read_obs_units(
obsunits_list,
obs_unit_type="prime_cam_obs_unit",
table_azimuth_file=table_azimuth_file,
table_mapping_params_file=table_mapping_params_file,
session=session,
)
# Fill PreScheduledSlots
pre_scheduled_slots = os.path.join(
installation_path, config_dict["pre_scheduled_slots"]
)
read_pre_scheduled_slots(pre_scheduled_slots, session=session)