Source code for lib.group_functions

import numpy as np
import inquirer
from itertools import product
from rich import print as rprint


[docs]def group_selector(data, remove=False, operation="add", debug=False): """Ask the user to select runs and/or chs to group and generate a combination dictionary :param data: dictionary containing the data :type data: dict :param remove: flag to remove the runs/chs from the data dictionary, defaults to False :type remove: bool, optional :param operation: operation to perform on the data, defaults to "add" :type operation: str, optional :param debug: flag to print debug information, defaults to False :type debug: bool, optional :return: comb_data :rtype: dict """ # Ask the user to select runs and/or chs to group rprint("Select runs and/or chs to group (Type 'done' when finished)") combination_dict = {"NRun": [], "NChannel": []} for selector in ["NRun", "NChannel"]: while True: if type(data[selector]) == np.ndarray: data[selector] = data[selector].tolist() q1 = [ inquirer.Checkbox( selector, message=f"Choose the {selector} to group", choices=data[selector], ) ] user_input = inquirer.prompt(q1)[selector] # Check if user input is empty if len(user_input) > 1: user_input = tuple(user_input) combination_dict[selector].append(user_input) rprint(combination_dict) if len(user_input) == 1: rprint( "[yellow]WARINNG: You must select at least two runs/chs to group[yellow]" ) if len(user_input) == 0: rprint("No runs/chs selected") break # group the data comb_data = group_data( data, combination_dict, remove=remove, operation=operation, debug=debug ) return comb_data
[docs]def group_data(data, combination_dict, remove=False, operation="add", debug=False): """Group runs and/or chs in the data dictionary. :param data: dictionary containing the data :type data: dict :param combination_dict: dictionary containing the runs and chs to group :type combination_dict: dict :param remove: flag to remove the runs/chs from the data dictionary, defaults to False :type remove: bool, optional :param operation: operation to perform on the data, defaults to "add" :type operation: str, optional :param debug: flag to print debug information, defaults to False :type debug: bool, optional :return: data :rtype: dict """ # First check that the combination dictionary has the correct structure if not all(key in ["NRun", "NChannel"] for key in combination_dict.keys()): raise ValueError( "The combination dictionary should have the structure combination_dict = {'runs':[(run_i,run_j,...),...],'chns':[(ch_i,ch_j,...),...]}" ) # Check if tuples contain strings or integers and convert to strings combination_dict["NRun"] = [ tuple(np.asarray(run).astype(str)) for run in combination_dict["NRun"] ] combination_dict["NChannel"] = [ tuple(np.asarray(chn).astype(str)) for chn in combination_dict["NChannel"] ] # group data for runs in combination_dict["NRun"]: data = group_runs(data, runs, remove=remove, operation=operation, debug=debug) for chns in combination_dict["NChannel"]: data = group_chns(data, chns, remove=remove, operation=operation, debug=debug) return data
[docs]def group_runs(data, runs: tuple, operation, remove=True, debug=False): """Group runs in the data dictionary :param data: dictionary containing the data :type data: dict :param runs: tuple containing the runs to group :type runs: tuple :param operation: operation to perform on the data :type operation: str :param remove: flag to remove the runs from the data dictionary, defaults to True :type remove: bool, optional :param debug: flag to print debug information, defaults to False :type debug: bool, optional :return: grouped_runs :rtype: dict """ # First check if the runs exist in the data dictionary if not all(run in data["NRun"] for run in runs): raise ValueError("Runs not found in data dictionary") # Make a new data dictionary grouped_runs = {} grouped_runs["NRun"] = [] grouped_runs["NChannel"] = data["NChannel"] new_run = "+".join(np.asarray(runs).astype(str)) grouped_runs[new_run] = {} for run, ch, var in product( runs, data["NChannel"], data[data["NRun"][0]][data["NChannel"][0]].keys() ): if ch not in grouped_runs[new_run].keys(): grouped_runs[new_run][ch] = {} elif var not in grouped_runs[new_run][ch].keys(): grouped_runs[new_run][ch][var] = data[run][ch][var] else: grouped_runs = group_vars( operation, data, grouped_runs, new_run, ch, var, run, debug=debug ) grouped_runs["NRun"].append(new_run) for run, ch, var in product( data["NRun"], data["NChannel"], data[data["NRun"][0]][data["NChannel"][0]].keys(), ): # rprint(run, runs, remove) if run in runs and remove: # if debug: rprint("Skipping: run %s - ch %s", run, ch) continue # Check if the run already exists in the grouped_runs dictionary if run not in grouped_runs.keys(): grouped_runs[run] = {} if ch not in grouped_runs[run].keys(): grouped_runs[run][ch] = {} try: grouped_runs[run][ch][var] = data[run][ch][var] except KeyError: grouped_runs[run][ch][var] = [] if debug: rprint(f"Could not find {var} in run{run}_{ch}") if run not in grouped_runs["NRun"]: grouped_runs["NRun"].append(run) return grouped_runs
[docs]def group_chns(data, chs: tuple, operation, remove=True, debug=False): """Group chs in the data dictionary :param data: dictionary containing the data :type data: dict :param chs: tuple containing the chs to group :type chs: tuple :param operation: operation to perform on the data :type operation: str :param remove: flag to remove the chs from the data dictionary, defaults to True :type remove: bool, optional :param debug: flag to print debug information, defaults to False :type debug: bool, optional :return: grouped_chns :rtype: dict """ # First check if the chs exist in the data dictionary if not all(ch in data["NChannel"] for ch in chs): raise ValueError("chs not found in data dictionary") # Make a new data dictionary grouped_chns = {} grouped_chns["NRun"] = data["NRun"] grouped_chns["NChannel"] = [] new_ch = "+".join(np.asarray(chs).astype(str)) grouped_chns["NChannel"].append(new_ch) for run in data["NRun"]: appended_vars = [] grouped_chns[run] = {} grouped_chns[run][new_ch] = {} for ch, var in product(chs, data[data["NRun"][0]][data["NChannel"][0]].keys()): if var not in appended_vars: try: grouped_chns[run][new_ch][var] = data[run][ch][var] except KeyError: grouped_chns[run][new_ch][var] = [] if debug: rprint(f"Could not find {var} in run{run}_{ch}") appended_vars.append(var) else: grouped_chns = group_vars( operation, data, grouped_chns, run, new_ch, var, ch, debug=debug ) for run, ch, var in product( data["NRun"], data["NChannel"], data[data["NRun"][0]][data["NChannel"][0]].keys(), ): # rprint(ch, chs, remove) if ch in chs and remove: # if debug: rprint("Skipping: run %s - ch %s", run, ch) continue # Check if the ch already exists in the grouped_chns dictionary if ch not in grouped_chns[run].keys(): grouped_chns[run][ch] = {} try: grouped_chns[run][ch][var] = data[run][ch][var] except KeyError: grouped_chns[run][ch][var] = [] if debug: rprint(f"Could not find {var} in run{run}_{ch}") if ch not in grouped_chns["NChannel"]: grouped_chns["NChannel"].append(ch) return grouped_chns
[docs]def group_vars(operation, data, grouped_chns, run, new_ch, var, ch, debug=False): """Group variables in the data dictionary :param operation: operation to perform on the data :type operation: str :param data: dictionary containing the data :type data: dict :param grouped_chns: dictionary containing the grouped data :type grouped_chns: dict :param run: run number :type run: str :param new_ch: new channel :type new_ch: str :param var: variable :type var: str :param ch: channel :type ch: str :param debug: flag to print debug information, defaults to False :type debug: bool, optional :return: grouped_chns :rtype: dict """ try: if operation == "append": grouped_chns[run][new_ch][var] = np.concatenate( (grouped_chns[run][new_ch][var], data[run][ch][var]) ) elif operation == "add": grouped_chns[run][new_ch][var] = ( grouped_chns[run][new_ch][var] + data[run][ch][var] ) elif operation == "multiply": grouped_chns[run][new_ch][var] = ( grouped_chns[run][new_ch][var] * data[run][ch][var] ) except ValueError: rprint(f"Could not {operation} {var} arrays") except TypeError: rprint(f"Could not {operation} {var} arrays") except KeyError: grouped_chns[run][new_ch][var] = [] if debug: rprint(f"Could not find {var} in run{run}_{ch}") return grouped_chns