Source code for lib.head_functions

import sys, inquirer, os, yaml, ast

import numpy as np
from typing import Optional
from rich import print as rprint

from srcs.utils import get_project_root
from .io_functions import (
    check_key,
    read_input_file,
    read_yaml_file,
    cuts_info2dict,
)

root = get_project_root()


[docs]def get_flag_dict(): """This function returns a dictionary with the available flags for the macro. :params None: :returns: flag_dict (dict) -- Dictionary with the available flags for the macro. :rtype: dict """ flag_dict = { ("-c", "--channels"): "channels \t(optional)", ("-d", "--debug"): "debug \t(True/False)", ("-f", "--filter"): "filter \t(optional)", ("-g", "--group"): "group \t(True/False)", ("-i", "--input_file"): "input_file", ("-k", "--key"): "key \t(AnaADC, RawADC, etc.)", ("-pl", "--preset_load"): "preset_load \t(RAW, ANA, etc.)", ("-ps", "--preset_save"): "preset_save \t(RAW, ANA, etc.)", ("-v", "--variables"): "variables \t(ChargeAveRange, ChargeRange0, etc.)", ("-r", "--runs"): "runs \t(optional)", ("-s", "--save"): "save \t(optional)", } return flag_dict
[docs]def initialize_macro( macro, input_list: Optional[list] = ["input_file", "debug"], default_dict: Optional[dict] = None, debug: bool = False ): """This function initializes the macro by reading the input file and the user input. :param macro: Name of the macro to be executed. :type macro: str :param input_list: List with the keys of the user input that need to be updated, defaults to ["input_file", "debug"] :type input_list: list, optional :param default_dict: Dictionary with the default values for the user input, defaults to None :type default_dict: dict, optional :param debug: Debug mode, defaults to False :type debug: bool, optional :return: user_input, info -- Dictionary with the user input and dictionary with the information from the input file. :rtype: tuple """ flag_dict = get_flag_dict() user_input = dict() if default_dict is None: default_dict = dict() else: if check_key(default_dict, "input_file") == True: user_input["input_file"] = default_dict["input_file"] print_header() if len(sys.argv) > 1: for arg in sys.argv: if arg == "-h" or arg == "--help": for flag in flag_dict: print_macro_info(macro) rprint("[white,bold]Usage: python3 %s.py -i config_file *--flags[/whitecyan,bold]" % macro) rprint("[cyan]Available Flags:[/cyan]") for flag in flag_dict: rprint("[cyan]%s: %s[/cyan]" % (flag[0], flag_dict[flag])) exit() for flag in flag_dict: if arg == flag[0] or arg == flag[1]: try: user_input[flag[1].split("--")[1]] = sys.argv[ sys.argv.index(arg) + 1 ].split(",") rprint( "[cyan]Using %s from command line %s[/cyan]" % ( flag_dict[flag], sys.argv[sys.argv.index(arg) + 1].split(","), ) ) except IndexError: rprint("Provide argument for flag %s" % flag_dict[flag]) exit() if check_key(user_input, "input_file") == False: user_input = select_input_file(user_input, debug=debug) user_input["input_file"] = user_input["input_file"] info = read_input_file(user_input["input_file"][0], debug=debug) user_input, info = update_user_input(user_input, input_list, info, debug=debug) for flag in ["group", "save", "debug"]: try: user_input[flag] = user_input[flag][0].lower() in [ "true", "1", "t", "y", "yes", ] except KeyError: rprint("WARNING: No %s flag provided --> Using False as default." % flag) user_input = use_default_input(user_input, default_dict, info, debug=debug) if debug: rprint("[cyan]\nUser input:[/cyan]") rprint(user_input) return user_input, info
[docs]def update_user_input(user_input, new_input_list, info, debug=False): """This function updates the user input by asking the user to provide the missing information. :param user_input: Dictionary with the user input. :type user_input: dict :param new_input_list: List with the keys of the user input that need to be updated. :type new_input_list: list :param info: Dictionary with the information from the input file. :type info: dict :param debug: Debug mode, defaults to False :type debug: bool, optional :return: new_user_input, info -- Dictionary with the updated user input and dictionary with the information from the input file. :rtype: tuple """ new_user_input = user_input.copy() defaults = { "preset_load": "ANA", "preset_save": "ANA", "key": "AnaADC", "variables": "AnaChargeAveRange", "runs": "1", "channels": "0", "group": "n", "save": "y", "debug": "y", } flags = { "preset_load": "-pl", "preset_save": "-ps", "key": "-k", "variables": "-v", "runs": "-r", "channels": "-c", "group": "-g", "save": "-s", "debug": "-d", } for key_label in new_input_list: if check_key(user_input, key_label) == False: if key_label != "filter": q = [ inquirer.Text( key_label, message=" select %s [flag: %s]" % (key_label, flags[key_label]), default=defaults[key_label], ) ] new_user_input[key_label] = inquirer.prompt(q)[key_label].split(",") # rprint("Using %s from user input %s"%(key_label,new_user_input[key_label]),"WARNING") else: new_user_input["filter"] = apply_cuts(user_input, info, debug=debug) rprint( f"[yellow][WARNING] Using {key_label} from user input {new_user_input[key_label]}[/yellow]" ) else: pass return new_user_input, info
[docs]def select_input_file(user_input, debug=False): """This function asks the user to select the input file. :param user_input: Dictionary with the user input. :type user_input: dict :param debug: Debug mode, defaults to False :type debug: bool, optional :return: new_user_input -- Dictionary with the updated user input. :rtype: dict """ new_user_input = user_input.copy() if check_key(user_input, "input_file") == False: file_names = [ file_name.split(".")[0] for file_name in os.listdir(f"{root}/config/input") ] q = [ inquirer.List( "input_file", message=" select input file [flag: -i]", choices=file_names, default="TUTORIAL", ) ] new_user_input["input_file"] = [inquirer.prompt(q)["input_file"]] if debug: rprint("[cyan]Using input file %s[/cyan]" % new_user_input["input_file"][0]) return new_user_input
[docs]def use_default_input(user_input, default_dict, info, debug=False): """This function updates the user input by asking the user to provide the missing information. :param user_input: Dictionary with the user input. :type user_input: dict :param default_dict: Dictionary with the default values for the user input. :type default_dict: dict :param info: Dictionary with the information from the input file. :type info: dict :param debug: Debug mode, defaults to False :type debug: bool, optional :return: new_user_input -- Dictionary with the updated user input. :rtype: dict """ new_user_input = user_input.copy() for default_key in default_dict: if check_key(new_user_input, default_key) == False: runs = [] if type(default_dict[default_key]): for key in default_dict[default_key]: if check_key(info, key): for run in info[key]: if run not in runs: runs.append(run) new_user_input[default_key] = runs if type(default_dict[default_key]) == dict: for key in default_dict[default_key]: if check_key(info, key): value = info[key][default_dict[default_key][key]] new_user_input[default_key] = [value] if new_user_input["debug"]: rprint( "[yellow]No %s provided. Using all %s from input file. %s[/yellow]" % (default_key, default_key, runs) ) return new_user_input
[docs]def apply_cuts(user_input, info, debug=False): """This function asks the user to select the cuts to be apply to your events. :param user_input: Dictionary with the user input. :type user_input: dict :param info: Dictionary with the information from the input file. :type info: dict :param debug: Debug mode, defaults to False :type debug: bool, optional :return: cut_dict -- Dictionary with the cuts to be applied to your events. :rtype: dict """ cuts_choices = ["cut_df", "cut_lin_rel", "cut_peak_finder"] cut_dict = cuts_info2dict(user_input, info, debug=True) for cut in cuts_choices: if cut_dict[cut][0] == True: # if debug: rprint("[cyan]Using cuts options %s[/cyan]"%cut_dict) return cut_dict if cut_dict[cut][0] == False: ask4cuts = True q = [ inquirer.Checkbox( "filter", message=" select the cuts you want to apply", choices=cuts_choices, ) ] my_cuts = [inquirer.prompt(q)["filter"]][0] for cut in cuts_choices: if cut in my_cuts: if cut == "cut_df": while ask4cuts: if cut_dict[cut][0] != True: cut_dict[cut] = [True, []] channels = [ inquirer.Text( "channels", message="Select channels for applying **%s**" % cut, default="0,1", ) ] key = [ inquirer.Text( "key", message="Select key for applying **%s**" % cut, default="AnaPedSTD", ) ] logic = [ inquirer.Text( "logic", message="Select logic for applying **%s**" % cut, default="bigger", ) ] value = [ inquirer.Text( "value", message="Select value for applying **%s**" % cut, default="1", ) ] inclusive = [ inquirer.Text( "inclusive", message="Select inclusive for applying **%s**" % cut, default="False", ) ] cut_dict[cut][1].append( [ inquirer.prompt(channels)["channels"].split(","), inquirer.prompt(key)["key"], inquirer.prompt(logic)["logic"], tuple( float(x) for x in inquirer.prompt(value)["value"].split( "," ) ), inquirer.prompt(inclusive)["inclusive"].lower() in ["true", "1", "t", "y", "yes"], ] ) ask4cuts = input( "\nDo you want to add another cut? (y/n) " ).lower() in ["true", "1", "t", "y", "yes"] if cut == "cut_lin_rel": while ask4cuts: if cut_dict[cut][0] != True: cut_dict[cut] = [True, []] key = [ inquirer.Text( "key", message="Select 2 keys for applying **%s**" % cut, default="AnaPeakAmp,AnaChargeAveRange", ) ] compare = [ inquirer.Text( "compare", message="NONE, RUNS, CHANNELS to decide the histogram to use", default="NONE", ) ] cut_dict[cut][1].append( [ inquirer.prompt(key)["key"].split(","), inquirer.prompt(compare)["compare"], ] ) ask4cuts = input( "\nDo you want to add another cut? (y/n) " ).lower() in ["true", "1", "t", "y", "yes"] if cut == "cut_peak_finder": while ask4cuts: if cut_dict[cut][0] != True: cut_dict[cut] = [True, []] n_peaks = [ inquirer.Text( "n_peaks", message="Select number of peaks for applying **%s**" % cut, default="1", ) ] cut_dict[cut][1].append( [inquirer.prompt(n_peaks)["n_peaks"]] ) ask4cuts = input( "\nDo you want to add another cut? (y/n) " ).lower() in ["true", "1", "t", "y", "yes"] if debug: rprint("[cyan]Using cuts options %s[/cyan]" % cut_dict) return cut_dict
[docs]def opt_selector(filename: str = "options", arguments: Optional[list] = None, debug: bool = False): """This function reads the options from a YAML file and allows the user to select the options to be used. :param filename: Name of the YAML file, defaults to "options" :type filename: str, optional :param arguments: List with the arguments to be used, defaults to None :type arguments: list, optional :param debug: Debug mode, defaults to False :type debug: bool, optional :return: updated_opt -- Dictionary with the updated options. :rtype: dict """ my_opt = read_yaml_file(filename, path=f"{root}/config/", debug=debug) if arguments is None: new_opt = my_opt.copy() rprint(my_opt) elif isinstance(arguments, list) and len(arguments) == 0: rprint(f"[cyan][INFO] No arguments provided. Returning all options from {filename}.yml[/cyan]") return my_opt else: new_opt = dict() for arg in arguments: if arg in my_opt.keys(): new_opt[arg] = my_opt[arg] rprint(new_opt) q = [ inquirer.List( "change", message="Do you want to change a line? (yes/no)", choices=["yes", "no"], default="no", ) ] change_line = inquirer.prompt(q)["change"].strip().lower() if change_line in ["yes", "y", "true", "1"]: q = [ inquirer.Checkbox( "lines", message="Choose the lines to change", choices=new_opt.keys() ) ] line_label = inquirer.prompt(q)["lines"] for line in line_label: options = "" if type(new_opt[line]) == bool: options = "(True/False)" if type(new_opt[line]) == str: options = "" if type(new_opt[line]) == list: options = "(comma separated list)" new_text = input(f"Enter new entry for line {line} {options}: ") new_opt[line] = new_text update_yaml_file(f"{root}/config/{filename}.yml", new_opt) updated_opt = read_yaml_file(filename, path=f"{root}/config/", debug=debug) return updated_opt
[docs]def convert_str_to_type(value, debug=False): """This function converts a string to its corresponding type. :param value: Value to be converted. :type value: str :param debug: Debug mode, defaults to False :type debug: bool, optional :return: value -- Converted value. :rtype: any """ try: value = ast.literal_eval(value) # If value is numpy array, convert it to list if type(value) == np.ndarray: value = value.tolist() if type(value) == tuple: value = list(value) return value except (ValueError, SyntaxError): return value
[docs]def update_yaml_file( file_path: str, new_data: dict, convert: bool = True, debug: bool = False ): """This function updates a YAML file with new data. :param file_path: Path to the YAML file. :type file_path: str :param new_data: Dictionary with the new data to be added. :type new_data: dict :param convert: Convert string to type, defaults to True :type convert: bool, optional :param debug: Debug mode, defaults to False :type debug: bool, optional :return: None """ # If file path doesn't exist, create it. Take into account that the file name is included in the path. if not os.path.exists(file_path): rprint(f"YAML file '{file_path}' doesn't exist. Creating it...") os.makedirs(os.path.dirname(file_path), mode=0o777, exist_ok=True) # Update folder permissions os.system(f"chmod -R 770 {os.path.dirname(file_path)}") with open(file_path, "w") as file: yaml.dump(new_data, file, default_flow_style=False, sort_keys=False) rprint(f"YAML file '{file_path}' successfully created.") return # Update YAML file try: with open(file_path, "r") as file: existing_data = yaml.safe_load(file) if convert: new_data = { key: convert_str_to_type(value) for key, value in new_data.items() } existing_data.update(new_data) with open(file_path, "w") as file: yaml.dump(existing_data, file, default_flow_style=False, sort_keys=False) rprint(f"YAML file '{file_path}' successfully updated.") except Exception as e: rprint(f"Error updating YAML file: {e}")