Source code for lib.head_functions
import sys, inquirer, os, yaml, ast
import numpy as np
from typing import Optional
from rich import print as rprint
from srcs.utils import get_project_root
from .io_functions import (
check_key,
read_input_file,
read_yaml_file,
cuts_info2dict,
)
root = get_project_root()
[docs]def get_flag_dict():
"""This function returns a dictionary with the available flags for the macro.
:params None:
:returns: flag_dict (dict) -- Dictionary with the available flags for the macro.
:rtype: dict
"""
flag_dict = {
("-c", "--channels"): "channels \t(optional)",
("-d", "--debug"): "debug \t(True/False)",
("-f", "--filter"): "filter \t(optional)",
("-g", "--group"): "group \t(True/False)",
("-i", "--input_file"): "input_file",
("-k", "--key"): "key \t(AnaADC, RawADC, etc.)",
("-pl", "--preset_load"): "preset_load \t(RAW, ANA, etc.)",
("-ps", "--preset_save"): "preset_save \t(RAW, ANA, etc.)",
("-v", "--variables"): "variables \t(ChargeAveRange, ChargeRange0, etc.)",
("-r", "--runs"): "runs \t(optional)",
("-s", "--save"): "save \t(optional)",
}
return flag_dict
[docs]def initialize_macro(
macro, input_list: Optional[list] = ["input_file", "debug"], default_dict: Optional[dict] = None, debug: bool = False
):
"""This function initializes the macro by reading the input file and the user input.
:param macro: Name of the macro to be executed.
:type macro: str
:param input_list: List with the keys of the user input that need to be updated, defaults to ["input_file", "debug"]
:type input_list: list, optional
:param default_dict: Dictionary with the default values for the user input, defaults to None
:type default_dict: dict, optional
:param debug: Debug mode, defaults to False
:type debug: bool, optional
:return: user_input, info -- Dictionary with the user input and dictionary with the information from the input file.
:rtype: tuple
"""
flag_dict = get_flag_dict()
user_input = dict()
if default_dict is None:
default_dict = dict()
else:
if check_key(default_dict, "input_file") == True:
user_input["input_file"] = default_dict["input_file"]
print_header()
if len(sys.argv) > 1:
for arg in sys.argv:
if arg == "-h" or arg == "--help":
for flag in flag_dict:
print_macro_info(macro)
rprint("[white,bold]Usage: python3 %s.py -i config_file *--flags[/whitecyan,bold]" % macro)
rprint("[cyan]Available Flags:[/cyan]")
for flag in flag_dict:
rprint("[cyan]%s: %s[/cyan]" % (flag[0], flag_dict[flag]))
exit()
for flag in flag_dict:
if arg == flag[0] or arg == flag[1]:
try:
user_input[flag[1].split("--")[1]] = sys.argv[
sys.argv.index(arg) + 1
].split(",")
rprint(
"[cyan]Using %s from command line %s[/cyan]"
% (
flag_dict[flag],
sys.argv[sys.argv.index(arg) + 1].split(","),
)
)
except IndexError:
rprint("Provide argument for flag %s" % flag_dict[flag])
exit()
if check_key(user_input, "input_file") == False:
user_input = select_input_file(user_input, debug=debug)
user_input["input_file"] = user_input["input_file"]
info = read_input_file(user_input["input_file"][0], debug=debug)
user_input, info = update_user_input(user_input, input_list, info, debug=debug)
for flag in ["group", "save", "debug"]:
try:
user_input[flag] = user_input[flag][0].lower() in [
"true",
"1",
"t",
"y",
"yes",
]
except KeyError:
rprint("WARNING: No %s flag provided --> Using False as default." % flag)
user_input = use_default_input(user_input, default_dict, info, debug=debug)
if debug:
rprint("[cyan]\nUser input:[/cyan]")
rprint(user_input)
return user_input, info
[docs]def update_user_input(user_input, new_input_list, info, debug=False):
"""This function updates the user input by asking the user to provide the missing information.
:param user_input: Dictionary with the user input.
:type user_input: dict
:param new_input_list: List with the keys of the user input that need to be updated.
:type new_input_list: list
:param info: Dictionary with the information from the input file.
:type info: dict
:param debug: Debug mode, defaults to False
:type debug: bool, optional
:return: new_user_input, info -- Dictionary with the updated user input and dictionary with the information from the input file.
:rtype: tuple
"""
new_user_input = user_input.copy()
defaults = {
"preset_load": "ANA",
"preset_save": "ANA",
"key": "AnaADC",
"variables": "AnaChargeAveRange",
"runs": "1",
"channels": "0",
"group": "n",
"save": "y",
"debug": "y",
}
flags = {
"preset_load": "-pl",
"preset_save": "-ps",
"key": "-k",
"variables": "-v",
"runs": "-r",
"channels": "-c",
"group": "-g",
"save": "-s",
"debug": "-d",
}
for key_label in new_input_list:
if check_key(user_input, key_label) == False:
if key_label != "filter":
q = [
inquirer.Text(
key_label,
message=" select %s [flag: %s]" % (key_label, flags[key_label]),
default=defaults[key_label],
)
]
new_user_input[key_label] = inquirer.prompt(q)[key_label].split(",")
# rprint("Using %s from user input %s"%(key_label,new_user_input[key_label]),"WARNING")
else:
new_user_input["filter"] = apply_cuts(user_input, info, debug=debug)
rprint(
f"[yellow][WARNING] Using {key_label} from user input {new_user_input[key_label]}[/yellow]"
)
else:
pass
return new_user_input, info
[docs]def select_input_file(user_input, debug=False):
"""This function asks the user to select the input file.
:param user_input: Dictionary with the user input.
:type user_input: dict
:param debug: Debug mode, defaults to False
:type debug: bool, optional
:return: new_user_input -- Dictionary with the updated user input.
:rtype: dict
"""
new_user_input = user_input.copy()
if check_key(user_input, "input_file") == False:
file_names = [
file_name.split(".")[0] for file_name in os.listdir(f"{root}/config/input")
]
q = [
inquirer.List(
"input_file",
message=" select input file [flag: -i]",
choices=file_names,
default="TUTORIAL",
)
]
new_user_input["input_file"] = [inquirer.prompt(q)["input_file"]]
if debug:
rprint("[cyan]Using input file %s[/cyan]" % new_user_input["input_file"][0])
return new_user_input
[docs]def use_default_input(user_input, default_dict, info, debug=False):
"""This function updates the user input by asking the user to provide the missing information.
:param user_input: Dictionary with the user input.
:type user_input: dict
:param default_dict: Dictionary with the default values for the user input.
:type default_dict: dict
:param info: Dictionary with the information from the input file.
:type info: dict
:param debug: Debug mode, defaults to False
:type debug: bool, optional
:return: new_user_input -- Dictionary with the updated user input.
:rtype: dict
"""
new_user_input = user_input.copy()
for default_key in default_dict:
if check_key(new_user_input, default_key) == False:
runs = []
if type(default_dict[default_key]):
for key in default_dict[default_key]:
if check_key(info, key):
for run in info[key]:
if run not in runs:
runs.append(run)
new_user_input[default_key] = runs
if type(default_dict[default_key]) == dict:
for key in default_dict[default_key]:
if check_key(info, key):
value = info[key][default_dict[default_key][key]]
new_user_input[default_key] = [value]
if new_user_input["debug"]:
rprint(
"[yellow]No %s provided. Using all %s from input file. %s[/yellow]"
% (default_key, default_key, runs)
)
return new_user_input
[docs]def print_macro_info(macro, debug=False):
"""This function prints the information about the macro.
:param macro: Name of the macro to be executed
:type macro: str
:param debug: Debug mode, defaults to False
:type debug: bool, optional
"""
f = open("info/" + macro + ".txt", "r")
file_contents = f.read()
rprint(file_contents + "\n")
f.close
if debug:
rprint("[magenta]....... Debug Mode Activated! .......[/magenta]")
[docs]def print_header():
"""This function prints the header of the macro.
"""
f = open("info/header.txt", "r")
file_contents = f.read()
rprint(file_contents)
f.close
rprint(f"\n[cyan]....... Starting Macro .......[/cyan]")
[docs]def apply_cuts(user_input, info, debug=False):
"""This function asks the user to select the cuts to be apply to your events.
:param user_input: Dictionary with the user input.
:type user_input: dict
:param info: Dictionary with the information from the input file.
:type info: dict
:param debug: Debug mode, defaults to False
:type debug: bool, optional
:return: cut_dict -- Dictionary with the cuts to be applied to your events.
:rtype: dict
"""
cuts_choices = ["cut_df", "cut_lin_rel", "cut_peak_finder"]
cut_dict = cuts_info2dict(user_input, info, debug=True)
for cut in cuts_choices:
if cut_dict[cut][0] == True:
# if debug: rprint("[cyan]Using cuts options %s[/cyan]"%cut_dict)
return cut_dict
if cut_dict[cut][0] == False:
ask4cuts = True
q = [
inquirer.Checkbox(
"filter",
message=" select the cuts you want to apply",
choices=cuts_choices,
)
]
my_cuts = [inquirer.prompt(q)["filter"]][0]
for cut in cuts_choices:
if cut in my_cuts:
if cut == "cut_df":
while ask4cuts:
if cut_dict[cut][0] != True:
cut_dict[cut] = [True, []]
channels = [
inquirer.Text(
"channels",
message="Select channels for applying **%s**" % cut,
default="0,1",
)
]
key = [
inquirer.Text(
"key",
message="Select key for applying **%s**" % cut,
default="AnaPedSTD",
)
]
logic = [
inquirer.Text(
"logic",
message="Select logic for applying **%s**" % cut,
default="bigger",
)
]
value = [
inquirer.Text(
"value",
message="Select value for applying **%s**" % cut,
default="1",
)
]
inclusive = [
inquirer.Text(
"inclusive",
message="Select inclusive for applying **%s**"
% cut,
default="False",
)
]
cut_dict[cut][1].append(
[
inquirer.prompt(channels)["channels"].split(","),
inquirer.prompt(key)["key"],
inquirer.prompt(logic)["logic"],
tuple(
float(x)
for x in inquirer.prompt(value)["value"].split(
","
)
),
inquirer.prompt(inclusive)["inclusive"].lower()
in ["true", "1", "t", "y", "yes"],
]
)
ask4cuts = input(
"\nDo you want to add another cut? (y/n) "
).lower() in ["true", "1", "t", "y", "yes"]
if cut == "cut_lin_rel":
while ask4cuts:
if cut_dict[cut][0] != True:
cut_dict[cut] = [True, []]
key = [
inquirer.Text(
"key",
message="Select 2 keys for applying **%s**" % cut,
default="AnaPeakAmp,AnaChargeAveRange",
)
]
compare = [
inquirer.Text(
"compare",
message="NONE, RUNS, CHANNELS to decide the histogram to use",
default="NONE",
)
]
cut_dict[cut][1].append(
[
inquirer.prompt(key)["key"].split(","),
inquirer.prompt(compare)["compare"],
]
)
ask4cuts = input(
"\nDo you want to add another cut? (y/n) "
).lower() in ["true", "1", "t", "y", "yes"]
if cut == "cut_peak_finder":
while ask4cuts:
if cut_dict[cut][0] != True:
cut_dict[cut] = [True, []]
n_peaks = [
inquirer.Text(
"n_peaks",
message="Select number of peaks for applying **%s**"
% cut,
default="1",
)
]
cut_dict[cut][1].append(
[inquirer.prompt(n_peaks)["n_peaks"]]
)
ask4cuts = input(
"\nDo you want to add another cut? (y/n) "
).lower() in ["true", "1", "t", "y", "yes"]
if debug:
rprint("[cyan]Using cuts options %s[/cyan]" % cut_dict)
return cut_dict
[docs]def opt_selector(filename: str = "options", arguments: Optional[list] = None, debug: bool = False):
"""This function reads the options from a YAML file and allows the user to select the options to be used.
:param filename: Name of the YAML file, defaults to "options"
:type filename: str, optional
:param arguments: List with the arguments to be used, defaults to None
:type arguments: list, optional
:param debug: Debug mode, defaults to False
:type debug: bool, optional
:return: updated_opt -- Dictionary with the updated options.
:rtype: dict
"""
my_opt = read_yaml_file(filename, path=f"{root}/config/", debug=debug)
if arguments is None:
new_opt = my_opt.copy()
rprint(my_opt)
elif isinstance(arguments, list) and len(arguments) == 0:
rprint(f"[cyan][INFO] No arguments provided. Returning all options from {filename}.yml[/cyan]")
return my_opt
else:
new_opt = dict()
for arg in arguments:
if arg in my_opt.keys():
new_opt[arg] = my_opt[arg]
rprint(new_opt)
q = [
inquirer.List(
"change",
message="Do you want to change a line? (yes/no)",
choices=["yes", "no"],
default="no",
)
]
change_line = inquirer.prompt(q)["change"].strip().lower()
if change_line in ["yes", "y", "true", "1"]:
q = [
inquirer.Checkbox(
"lines", message="Choose the lines to change", choices=new_opt.keys()
)
]
line_label = inquirer.prompt(q)["lines"]
for line in line_label:
options = ""
if type(new_opt[line]) == bool:
options = "(True/False)"
if type(new_opt[line]) == str:
options = ""
if type(new_opt[line]) == list:
options = "(comma separated list)"
new_text = input(f"Enter new entry for line {line} {options}: ")
new_opt[line] = new_text
update_yaml_file(f"{root}/config/{filename}.yml", new_opt)
updated_opt = read_yaml_file(filename, path=f"{root}/config/", debug=debug)
return updated_opt
[docs]def convert_str_to_type(value, debug=False):
"""This function converts a string to its corresponding type.
:param value: Value to be converted.
:type value: str
:param debug: Debug mode, defaults to False
:type debug: bool, optional
:return: value -- Converted value.
:rtype: any
"""
try:
value = ast.literal_eval(value)
# If value is numpy array, convert it to list
if type(value) == np.ndarray:
value = value.tolist()
if type(value) == tuple:
value = list(value)
return value
except (ValueError, SyntaxError):
return value
[docs]def update_yaml_file(
file_path: str, new_data: dict, convert: bool = True, debug: bool = False
):
"""This function updates a YAML file with new data.
:param file_path: Path to the YAML file.
:type file_path: str
:param new_data: Dictionary with the new data to be added.
:type new_data: dict
:param convert: Convert string to type, defaults to True
:type convert: bool, optional
:param debug: Debug mode, defaults to False
:type debug: bool, optional
:return: None
"""
# If file path doesn't exist, create it. Take into account that the file name is included in the path.
if not os.path.exists(file_path):
rprint(f"YAML file '{file_path}' doesn't exist. Creating it...")
os.makedirs(os.path.dirname(file_path), mode=0o777, exist_ok=True)
# Update folder permissions
os.system(f"chmod -R 770 {os.path.dirname(file_path)}")
with open(file_path, "w") as file:
yaml.dump(new_data, file, default_flow_style=False, sort_keys=False)
rprint(f"YAML file '{file_path}' successfully created.")
return
# Update YAML file
try:
with open(file_path, "r") as file:
existing_data = yaml.safe_load(file)
if convert:
new_data = {
key: convert_str_to_type(value) for key, value in new_data.items()
}
existing_data.update(new_data)
with open(file_path, "w") as file:
yaml.dump(existing_data, file, default_flow_style=False, sort_keys=False)
rprint(f"YAML file '{file_path}' successfully updated.")
except Exception as e:
rprint(f"Error updating YAML file: {e}")