Source code for lib.ana_functions

# ================================================================================================================================================#
# This library contains functions to compute variables from the raw data. They are mostky used in the *Process.py macros.                        #
# ================================================================================================================================================#

import os
import numba
import inquirer
import numpy as np
import matplotlib.pyplot as plt

from itertools import product
from scipy.signal import find_peaks
from rich import print as rprint

# Import from other libraries
from srcs.utils import get_project_root
from .io_functions import check_key
from .cut_functions import generate_cut_array
from .unit_functions import get_run_units
from .head_functions import update_yaml_file

root = get_project_root()

# ===========================================================================#
# ************************* PEAK + PEDESTAL *********************************#
# ===========================================================================#

[docs]def compute_ana_wvfs( my_runs: dict, info: dict, filter: bool = False, debug: bool = False ): """Computes the AnaADC wvfs from the RawADC and the baseline value computed from PED_KEY. :param my_runs: dictionary containing the data. :type my_runs: dict :param info: dictionary containing the info. :type info: dict :param filter: boolean to apply filter to the wvfs, defaults to False :type filter: bool, optional :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: True if the computation was successful, False otherwise. :rtype: bool """ for run, ch in product( np.array(my_runs["NRun"]).astype(str), np.array(my_runs["NChannel"]).astype(str) ): if check_key(my_runs[run][ch], "RawADC") == False: rprint("[red]ERROR: RawADC not found! Please run 00Raw2Npy.py [/red]") return False if check_key(my_runs[run][ch], "Raw" + info["PED_KEY"][0]) == False: rprint( f"[red]ERROR: Raw {info['PED_KEY'][0]} not found! Please run 01PreProcess.py[/red]" ) return False my_runs[run][ch]["AnaADC"] = ( my_runs[run][ch]["PChannel"] * ( my_runs[run][ch]["RawADC"].T - my_runs[run][ch]["Raw" + info["PED_KEY"][0]] ).T ) if filter: # Apply filter to the wvfs filter_array = np.array( len(my_runs[run][ch]["AnaADC"]), len(my_runs[run][ch]["AnaADC"][0]), dtype=np.float32, ) for idx, wvf in enumerate(my_runs[run][ch]["AnaADC"]): filter_array[idx] = filter_wvf(wvf) my_runs[run][ch]["AnaADC"] = filter_array rprint("[green]--> Computed AnaADC Wvfs!!![/green]") return True
[docs]@numba.njit def filter_wvf(wvf): error = 0 filtered_array = np.zeros(len(wvf), dtype=np.float32) for jdx, j in enumerate(wvf): if jdx < 1: error = 0 filtered_array[jdx] = j else: error = 1 / 4 * (j - filtered_array[jdx - 1]) filtered_array[jdx] = filtered_array[jdx - 1] + error return filtered_array
[docs]def compute_fft_wvfs( my_runs: dict, info: dict, key: str, label: str, debug: bool = False ): """Computes the FFT wvfs from the given ADC key. :param my_runs: dictionary containing the data. :type my_runs: dict :param info: dictionary containing the info. :type info: dict :param key: key to be inserted. :type key: str :param label: label to be inserted. :type label: str :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional """ for run, ch in product( np.array(my_runs["NRun"]).astype(str), np.array(my_runs["NChannel"]).astype(str) ): my_runs[run][ch][label + "FFT"] = np.abs(np.fft.rfft(my_runs[run][ch][key])) my_runs[run][ch][label + "Freq"] = [ np.fft.rfftfreq( my_runs[run][ch][key][0].size, d=my_runs[run][ch]["Sampling"] ) ] my_runs[run][ch][label + "MeanFFT"] = [ np.mean(my_runs[run][ch][label + "FFT"], axis=0) ] rprint( f"[green]FFT wvfs have been computed for run {run} ch {ch}[/green]", ) rprint(f"[green]--> Computed AnaFFT Wvfs!!![/green]")
[docs]def compute_peak_variables( my_runs: dict, info: dict, key: str, label: str, buffer: int = 30, debug: bool = False, ): """Computes the peaktime and amplitude for a given ADC key. :param my_runs: dictionary containing the data. :type my_runs: dict :param info: dictionary containing the info. :type info: dict :param key: key to be inserted. :type key: str :param label: label to be inserted. :type label: str :param buffer: size in bins of the buffer to compute the valley amplitude, defaults to 30 :type buffer: int, optional :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional """ for run, ch in product(my_runs["NRun"], my_runs["NChannel"]): aux_ADC = my_runs[run][ch][key] if key == "RawADC" and label == "Raw": my_runs[run][ch][label + "PeakAmp"] = my_runs[run][ch]["PChannel"] * np.max( my_runs[run][ch]["PChannel"] * aux_ADC[:, :], axis=1 ) my_runs[run][ch][label + "PeakTime"] = np.argmax( my_runs[run][ch]["PChannel"] * aux_ADC[:, :], axis=1 ) # Compute valley amplitude in the buffer around the peak to avoid noise i_idx = my_runs[run][ch][label + "PeakTime"] i_idx[i_idx < 0] = 0 this_aux_ADC = shift_ADCs(aux_ADC, -i_idx, debug=debug) my_runs[run][ch][label + "ValleyAmp"] = my_runs[run][ch]["PChannel"] * ( np.min(my_runs[run][ch]["PChannel"] * this_aux_ADC[:, :buffer], axis=1) ) my_runs[run][ch][label + "ValleyTime"] = i_idx + np.argmin( my_runs[run][ch]["PChannel"] * this_aux_ADC[:, :buffer], axis=1 ) else: my_runs[run][ch][label + "PeakAmp"] = np.max(aux_ADC[:, :], axis=1) my_runs[run][ch][label + "PeakTime"] = np.argmax(aux_ADC[:, :], axis=1) # Compute valley amplitude in the buffer around the peak to avoid noise i_idx = my_runs[run][ch][label + "PeakTime"] i_idx[i_idx < 0] = 0 this_aux_ADC = shift_ADCs(aux_ADC, -i_idx, debug=debug) my_runs[run][ch][label + "ValleyAmp"] = np.min( this_aux_ADC[:, :buffer], axis=1 ) my_runs[run][ch][label + "ValleyTime"] = i_idx + np.argmin( this_aux_ADC[:, :buffer], axis=1 ) rprint("[green]--> Computed Peak Variables!!![/green]")
[docs]def compute_pedestal_variables( my_runs, info, key, label, ped_lim="", buffer=50, sliding=200, debug=False ): """Computes the pedestal variables of a collection of a run's collection in several windows. :param my_runs: dictionary containing the data. :type my_runs: dict :param info: dictionary containing the info. :type info: dict :param key: key to be inserted. :type key: str :param label: label to be inserted. Eg: label = Raw, variable = PedSTD --> RawPedSTD. :type label: str :param ped_lim: size in bins of the sliding window, defaults to "" :type ped_lim: str, optional :param buffer: size in bins of the buffer to compute the valley amplitude, defaults to 50 :type buffer: int, optional :param sliding: bins moved between shifts of the window, defaults to 200 :type sliding: int, optional :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional """ for run, ch in product(my_runs["NRun"], my_runs["NChannel"]): if type(ped_lim) != int: values, counts = np.unique( my_runs[run][ch][label + "PeakTime"], return_counts=True ) ped_lim = values[np.argmax(counts)] ped_mode = 0 if ped_lim <= buffer: ped_lim = int(len(my_runs[run][ch][key][0]) * 0.2) rprint( f"[yellow]WARNING: Peak time is smaller than {buffer}. Setting ped_lim = {ped_lim} (20% window length)[/yellow]" ) if key == "RawADC" and label == "Raw": wvf_length = len(my_runs[run][ch][key][0]) values, counts = np.unique( my_runs[run][ch][key][:, : int(0.18 * wvf_length)], return_counts=True, ) ped_mode = values[np.argmax(counts)] if ped_mode == 0: rprint( f"[yellow]WARNING: Pedestal mode is 0. Setting ped_mode = mean of the first 18% of the waveform.[/yellow]" ) ped_mode = np.mean( my_runs[run][ch][key][:, : int(0.18 * wvf_length)], axis=1 ) # Find the most likely rise time wrt the most likely peak time end = (my_runs[run][ch][key][:, ped_lim:] - ped_mode) < 0 start = (my_runs[run][ch][key][:, :ped_lim] - ped_mode) < 0 if label == "Raw" and my_runs[run][ch]["PChannel"] == -1: my_runs[run][ch][label + "SignalEnd"] = ped_lim + np.argmin(end, axis=1) my_runs[run][ch][label + "SignalStart"] = ped_lim - np.argmin( start[:, ::-1], axis=1 ) rprint( f"[cyan]INFO: Negative polarity. Finding signal start and end in negative values.[/cyan]" ) else: my_runs[run][ch][label + "SignalEnd"] = ped_lim + np.argmax(end, axis=1) my_runs[run][ch][label + "SignalStart"] = ped_lim - np.argmax( start[:, ::-1], axis=1 ) # Look for max counts in the signal start for values up to ped_lim - buffer min_signal_start = np.min(my_runs[run][ch][label + "SignalStart"]) max_signal_start = np.max(my_runs[run][ch][label + "SignalStart"]) hist, bins = np.histogram( my_runs[run][ch][label + "SignalStart"], bins=np.arange(min_signal_start, max_signal_start + 1), ) bins = bins[:-1] hist = hist / np.max(hist) # Apply peak finder to the histogram peaks, _ = find_peaks(hist, prominence=0.1, height=0.1) if len(peaks) == 0: rprint( f"[yellow]WARNING:[/yellow] No peak found in the 'SignalStart' histogram. Starting peak search with lower threshold." ) lower_thld = 0.1 while len(peaks) == 0 or lower_thld >= 0.01: lower_thld -= 0.001 peaks, _ = find_peaks( hist, prominence=lower_thld, height=lower_thld ) if lower_thld < 0: rprint(f"[red]ERROR:[/red] No peak found in the 'SignalStart' histogram for any threshold. Exiting.") break elif len(peaks) == 1: ped_lim = bins[peaks[0]] rprint(f"[cyan]INFO: Setting ped_lim = {ped_lim}[/cyan]") elif len(peaks) > 1: rprint( f"[yellow]WARNING: Found more than one peak in the signal start histogram.[/yellow]" ) # Plot the histogram and the peaks plt.ion() plt.plot(bins, hist) plt.plot(bins[peaks], hist[peaks], "x") plt.title(f"Signal Start Histogram for Run {run} Ch {ch}") plt.xlabel("Signal Start (ticks)") plt.ylabel("Norm.") plt.show() while not plt.waitforbuttonpress(-1): pass plt.close() plt.ioff() # Select in the terminal the peak to use (defailt is the first one) questions = [ inquirer.List( "peak", message="Select the peak to use as signal start", choices=[str(i) for i in bins[peaks]], ) ] answers = inquirer.prompt(questions) ped_lim = bins[int(answers["peak"])] if ped_lim <= buffer: ped_lim = int(len(my_runs[run][ch][key][0]) * 0.15) rprint( f"[yellow]WARNING: Peak time is smaller than {buffer}. Setting ped_lim = {ped_lim} (15% window length)[/yellow]" ) ADC_aux = my_runs[run][ch][key] my_runs[run][ch][label + "PreTriggerSTD"] = np.std(ADC_aux[:, :ped_lim], axis=1) my_runs[run][ch][label + "PreTriggerMean"] = np.mean( ADC_aux[:, :ped_lim], axis=1 ) my_runs[run][ch][label + "PreTriggerMax"] = np.max(ADC_aux[:, :ped_lim], axis=1) my_runs[run][ch][label + "PreTriggerMin"] = np.min(ADC_aux[:, :ped_lim], axis=1) ADC, start_window = compute_pedestal_sliding_windows( ADC_aux, ped_lim=ped_lim, sliding=sliding ) my_runs[run][ch][label + "PedLim"] = ped_lim my_runs[run][ch][label + "PedSTD"] = np.std(ADC[:, :sliding], axis=1) my_runs[run][ch][label + "PedMean"] = np.mean(ADC[:, :sliding], axis=1) my_runs[run][ch][label + "PedMax"] = np.max(ADC[:, :sliding], axis=1) my_runs[run][ch][label + "PedMin"] = np.min(ADC[:, :sliding], axis=1) my_runs[run][ch][label + "PedStart"] = start_window my_runs[run][ch][label + "PedEnd"] = start_window + sliding rprint("[green]--> Computed Pedestal Variables!!![/green]")
[docs]def compute_wvf_variables(my_runs, info, key, label, debug=False): """Computes the mean, std and rms of the given ADC key. :param my_runs: dictionary containing the data. :type my_runs: dict :param info: dictionary containing the info. :type info: dict :param key: key to be inserted. :type key: str :param label: label to be inserted. :type label: str :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional """ for run, ch in product(my_runs["NRun"], my_runs["NChannel"]): my_runs[run][ch][label + "Mean"] = np.mean(my_runs[run][ch][key], axis=1) my_runs[run][ch][label + "STD"] = np.std(my_runs[run][ch][key], axis=1) my_runs[run][ch][label + "RMS"] = np.sqrt( np.mean( np.power( (my_runs[run][ch][key].T - my_runs[run][ch][label + "Mean"]).T, 2 ), axis=1, ) ) rprint("[green]--> Computed Wvf Variables!!![/green]")
[docs]def compute_pedestal_sliding_windows(ADC, ped_lim, sliding=200, debug=False): """Taking the best between different windows in pretrigger. Same variables than "compute_pedestal_variables_sliding_window". It checks for the best window. :param ADC: array containing the ADCs. :type ADC: nparray :param ped_lim: size in bins of the sliding window. :type ped_lim: int :param sliding: bins moved between shifts of the window, defaults to 200 :type sliding: int, optional :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: ADC_s, start_window -- shifted ADCs and start window. :rtype: nparray, nparray """ if ped_lim < sliding: ped_lim = sliding rprint( "[yellow]WARNING: Pedestal window is smaller than sliding window. Setting ped_lim = %s[/yellow]" % sliding ) slides = int(ped_lim / sliding) nwvfs = ADC.shape[0] aux = np.zeros((nwvfs, slides)) for i in range(slides): aux[:, i] = np.std(ADC[:, (i * sliding) : ((i + 1) * sliding)], axis=1) try: start_window = np.argmin(aux, axis=1) * sliding except ValueError: rprint("[red]ERROR: There is a problem with the pedestal computation. Check the data![/red]") start_window = np.zeros(nwvfs) ADC_s = shift_ADCs(ADC, (-1) * start_window) if debug: rprint( "[cyan]Calculating pedestal variables from sliding window of %i bins[/cyan]" % (sliding) ) return ADC_s, start_window
[docs]def compute_power_spec(ADC, timebin, debug=False): """Computes the power spectrum of the given events. It returns both axis. :param ADC: array containing the ADCs. :type ADC: nparray :param timebin: timebin of the data. :type timebin: float :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: power spectrum and frequency axis. :rtype: nparray, nparray """ aux = [] aux_X = np.fft.rfftfreq(len(ADC[0]), timebin) for i in range(len(ADC)): aux.append(np.fft.rfft(ADC[i])) return np.absolute(np.mean(aux, axis=0)), np.absolute(aux_X)
[docs]@numba.njit def shift_ADCs(ADC, shift, debug=False): """Used for the sliding window. :param ADC: array containing the ADCs. :type ADC: nparray :param shift: array containing the shift values. :type shift: nparray :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: aux_ADC -- shifted ADCs. :rtype: nparray """ N_wvfs = ADC.shape[0] aux_ADC = np.zeros(ADC.shape) for i in range(N_wvfs): aux_ADC[i] = shift4_numba(ADC[i], int(shift[i])) # Shift the wvfs return aux_ADC
# eficient shifter (c/fortran compiled); https://stackoverflow.com/questions/30399534/shift-elements-in-a-numpy-array
[docs]@numba.njit def shift4_numba( arr, num, fill_value=0, debug=False ): # default shifted value is 0, remember to always substract your pedestal first """Used for the sliding window. :param arr: array containing the ADCs. :type arr: nparray :param num: array containing the shift values. :type num: nparray :param fill_value: value to be inserted in the empty bins, defaults to 0 :type fill_value: int, optional :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: arr -- shifted array. :rtype: nparray """ if num > 0: return np.concatenate((np.full(num, fill_value), arr[:-num])) elif num < 0: return np.concatenate((arr[-num:], np.full(-num, fill_value))) else: return arr
# ===========================================================================# # ********************** AVERAGING FUCNTIONS ********************************# # ===========================================================================#
[docs]def average_wvfs( my_runs, info, key, label, centering="NONE", threshold=0, cut_label="", OPT={}, debug=False, ): """This function calculates the average waveform of a run. :param my_runs: dictionary containing the data. :type my_runs: dict :param info: dictionary containing the info. :type info: dict :param key: key to be inserted. :type key: str :param label: label to be inserted. :type label: str :param centering: centering method (NONE, PEAK, THRESHOLD), defaults to "NONE" :type centering: str, optional :param threshold: threshold value, defaults to 0 :type threshold: int, optional :param cut_label: label for the cut, defaults to "" :type cut_label: str, optional :param OPT: dictionary containing the options, defaults to {} :type OPT: dict, optional :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional """ for run, ch in product(my_runs["NRun"], my_runs["NChannel"]): if check_key(my_runs[run][ch], "MyCuts") == False: generate_cut_array(my_runs) if check_key(my_runs[run][ch], "UnitsDict") == False: get_run_units(my_runs) buffer = 200 aux_ADC = my_runs[run][ch][key][my_runs[run][ch]["MyCuts"] == True] values, counts = np.unique( np.argmax(aux_ADC, axis=1), return_counts=True ) # using the mode peak as reference bin_ref_peak = values[np.argmax(counts)] if bin_ref_peak < buffer: bin_ref_peak = buffer # AveWvf: each event is added without centering. if centering == "NONE": my_runs[run][ch][label + "AveWvf" + cut_label] = np.asarray( [np.mean(aux_ADC, axis=0)] ) # It saves the average waveform as "AveWvf_*" # AveWvfPeak: each event is centered according to wvf argmax. if centering == "PEAK": bin_max_peak = np.argmax( aux_ADC[:, bin_ref_peak - buffer : bin_ref_peak + buffer], axis=1 ) bin_max_peak = bin_max_peak + bin_ref_peak - buffer for ii in range(len(aux_ADC)): aux_ADC[ii] = np.roll( aux_ADC[ii], bin_ref_peak - bin_max_peak[ii] ) # It centers the waveform using the peak my_runs[run][ch][label + "AveWvfPeak" + cut_label] = np.asarray( [np.mean(aux_ADC, axis=0)] ) # It saves the average waveform as "AveWvfPeak_*" # AveWvfThreshold: each event is centered according to first wvf entry exceding a threshold. if centering == "THRESHOLD": if threshold == 0: threshold = np.max(np.mean(aux_ADC, axis=0)) / 2 values, counts = np.unique( np.argmax(aux_ADC > threshold, axis=1), return_counts=True ) # using the mode peak as reference bin_ref_thld = values[np.argmax(counts)] # It is an int bin_max_thld = np.argmax( aux_ADC[:, bin_ref_peak - buffer : bin_ref_peak + buffer] > threshold, axis=1, ) bin_max_thld = bin_max_thld + bin_ref_thld - buffer for ii in range(len(aux_ADC)): aux_ADC[ii] = np.roll( aux_ADC[ii], bin_ref_thld - bin_max_thld[ii] ) # It centers the waveform using the threshold my_runs[run][ch][label + "AveWvfThreshold" + cut_label] = np.asarray( [np.mean(aux_ADC, axis=0)] ) # It saves the average waveform as "AveWvfThreshold_*" rprint( "[green]--> Computed Average Wvfs (centered wrt %s)!!![/green]" % centering )
[docs]def expo_average(my_run, alpha, debug=False): """This function calculates the exponential average with a given alpha. :param my_run: run we want to use. :type my_run: nparray :param alpha: alpha value. :type alpha: float :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: v_averaged -- averaged run computed as average[i+1] = (1-alpha) * average[i] + alpha * my_run[i+1] :rtype: nparray """ v_averaged = np.zeros(len(my_run)) v_averaged[0] = my_run[0] for i in range(len(my_run) - 1): v_averaged[i + 1] = (1 - alpha) * v_averaged[i] + alpha * my_run[ i + 1 ] # e.g. alpha = 0.1 -> average[1] = 0.9 * average[0] + 0.1 * my_run[1] return v_averaged
[docs]def unweighted_average(my_run, debug=False): """This function calculates the unweighted average. :param my_run: run we want to use. :type my_run: nparray :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: v_averaged -- averaged run computed as average[i+1] = (my_run[i] + my_run[i+1] + my_run[i+2]) / 3 :rtype: nparray """ v_averaged = np.zeros(len(my_run)) v_averaged[0] = my_run[0] v_averaged[-1] = my_run[-1] for i in range(len(my_run) - 2): v_averaged[i + 1] = ( my_run[i] + my_run[i + 1] + my_run[i + 2] ) / 3 # e.g. average[1] = (my_run[0] + my_run[1] + my_run[2]) / 3 return v_averaged
[docs]def smooth(my_run, alpha, debug=False): """This function calculates the exponential average and then the unweighted average. :param my_run: run we want to use. :type my_run: nparray :param alpha: alpha value. :type alpha: float :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: my_run -- averaged run computed as average[i+1] = (my_run[i] + my_run[i+1] + my_run[i+2]) / 3 with my_run = (1-alpha) * average[i] + alpha * my_run[i+1] :rtype: nparray """ my_run = expo_average(my_run, alpha) my_run = unweighted_average(my_run) return my_run
# ===========================================================================# # ********************** INTEGRATION FUNCTIONS ******************************# # ===========================================================================#
[docs]def find_baseline_cuts(raw, debug=False): """It finds the cuts with the x-axis. It returns the index of both bins. :param raw: the np array that you want to analize. :type raw: nparray :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: i_idx, f_idx -- initial and final index of the cuts. :rtype: int, int """ max = np.argmax(raw) i_idx = 0 f_idx = 0 for j in range(len(raw[max:])): # Before the peak if raw[max + j] < 0: f_idx = max + j break # Looks for the change of sign for j in range(len(raw[:max])): # After the peak if raw[max - j] < 0: i_idx = max - j + 1 break # Looks for the change of sign return i_idx, f_idx
[docs]def find_amp_decrease(raw, thrld, debug=False): """It finds bin where the amp has fallen above a certain threshold relative to the main peak. It returns the index of both bins. :param raw: the np array that you want to analize. :type raw: nparray :param thrld: the relative amp that you want to analize. :type thrld: float :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: i_idx, f_idx -- initial and final index of the cuts. :rtype: int, int """ max = np.argmax(raw) i_idx = 0 f_idx = 0 for j in range(len(raw[max:])): # Before the peak if raw[max + j] < np.max(raw) * thrld: f_idx = max + j break # Looks for the change of sign (including thrld) for j in range(len(raw[:max])): # After the peak if raw[max - j] < np.max(raw) * thrld: i_idx = max - j + 1 break # Looks for the change of sign (including thrld) return i_idx, f_idx
[docs]def integrate_wvfs(my_runs, info, key, label, cut_label="", debug=False): """This function integrates each event waveform. There are several ways to do it and we choose it with the argument "types". :param my_runs: dictionary containing the data. :type my_runs: dict :param info: dictionary containing the info. :type info: dict :param key: key to be inserted (by default any ADC). :type key: str :param label: label to be inserted. :type label: str :param cut_label: label for the cut, defaults to "" :type cut_label: str, optional :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return my_runs: dictionary containing the data with the integrated values. :rtype: dict """ # If I_RANGE == -1 it fixes t0 to pedestal time and it integrates the time indicated in F_RANGE, e.g. I_RANGE = -1 F_RANGE = 6e-6 it integrates 6 microsecs from pedestal time. # If I_RANGE != -1 it integrates from the indicated time to the F_RANGE value, e.g. I_RANGE = 2.1e-6 F_RANGE = 4.3e-6 it integrates in that range. # I_RANGE must have same length than F_RANGE! integration_dict = {"ChargeAveRange": {}, "ChargePedRange": {}, "ChargeRange": {}} conversion_factor = ( info["DYNAMIC_RANGE"][0] / info["BITS"][0] ) # Amplification factor of the system ch_amp = dict( zip(info["CHAN_TOTAL"], info["CHAN_AMPLI"]) ) # Creates a dictionary with amplification factors according to each detector i_range = info["I_RANGE"] # Get initial time(s) to start the integration f_range = info["F_RANGE"] # Get final time(s) to finish the integration if debug: rprint( "[magenta]\n--- Integrating RUN %s CH %s TYPE %s, REF %s ---[/magenta]" % (my_runs["NRun"], my_runs["NChannel"], info["TYPE"], info["REF"]) ) for run, ch, typ, ref in product( my_runs["NRun"], my_runs["NChannel"], info["TYPE"], info["REF"] ): if check_key(my_runs[run][ch], "MyCuts") == False: generate_cut_array(my_runs) if check_key(my_runs[run][ch], "UnitsDict") == False: get_run_units(my_runs) # If there are no units, it calculates them if check_key(my_runs[run][ch], label + "ChargeRangeDict") == False: my_runs[run][ch][ label + "ChargeRangeDict" ] = {} # Creates a dictionary with ranges for each ChargeRange entry ave = my_runs[run][ch][ label + ref + cut_label ] # Load the reference average waveform aux_ADC = my_runs[run][ch][key] for i in range(len(ave)): if typ == "ChargeAveRange": # Integrated charge from the average waveform i_idx, f_idx = find_baseline_cuts(ave[i]) if f_idx - i_idx <= 0: rprint( "[yellow]WARNING: Invalid integration range for RUN %s CH %s TYPE %s, REF %s[/yellow]" % (run, ch, typ, label + ref) ) idx, f_idx = find_amp_decrease(ave[i], 1e-3) if debug: rprint( "[magenta]Using amp decrease instead: [%.2f, %.2f] \u03BCs[/magenta]" % ( idx * my_runs[run][ch]["Sampling"], f_idx * my_runs[run][ch]["Sampling"], ) ) charge_name = label + typ + ref.split("Wvf")[-1] + cut_label my_runs[run][ch][charge_name] = np.sum( aux_ADC[:, i_idx:f_idx], axis=1 ) # Integrated charge from the DECONVOLUTED average waveform rprint( "[green]--> Computed %s (according to type **%s** from %.2E to %.2E)!!![/green]" % ( charge_name, typ, i_idx * my_runs[run][ch]["Sampling"], f_idx * my_runs[run][ch]["Sampling"], ) ) integration_dict[typ][charge_name] = [int(i_idx), int(f_idx)] if typ == "ChargePedRange": for j in range(len(f_range)): i_idx = my_runs[run][ch][label + "PedLim"] f_idx = i_idx + int( np.round(f_range[j] * 1e-6 / my_runs[run][ch]["Sampling"]) ) charge_name = label + typ + str(j) + cut_label my_runs[run][ch][charge_name] = np.sum(aux_ADC[:, i_idx:f_idx], axis=1) rprint( "[green]--> Computed %s (according to type **%s** from %.2E to %.2E)!!![/green]" % ( charge_name, typ, i_idx * my_runs[run][ch]["Sampling"], f_idx * my_runs[run][ch]["Sampling"], ) ) integration_dict[typ][charge_name] = [int(i_idx), int(f_idx)] if typ == "ChargeRange": for k in range(len(f_range)): i_idx = int(np.round(i_range[k] * 1e-6 / my_runs[run][ch]["Sampling"])) f_idx = int(np.round(f_range[k] * 1e-6 / my_runs[run][ch]["Sampling"])) this_aux_ADC = shift_ADCs( aux_ADC, -np.asarray(my_runs[run][ch][label + "PeakTime"]) + i_idx, debug=debug, ) charge_name = label + typ + str(k) + cut_label my_runs[run][ch][charge_name] = np.sum(this_aux_ADC[:, :f_idx], axis=1) rprint( "[green]--> Computed %s (according to type **%s** from %.2E to %.2E)!!![/green]" % ( charge_name, typ, i_idx * my_runs[run][ch]["Sampling"], f_idx * my_runs[run][ch]["Sampling"], ) ) integration_dict[typ][charge_name] = [int(i_idx), int(f_idx)] # rprintintegration_dict) out_path = info["NPY_PATH"][0] out_path = os.path.expandvars(out_path) filename = f'{root}/{info["NPY_PATH"][0]}/run{run.zfill(2)}/ch{ch}/ChargeDict.yml' update_yaml_file(filename, integration_dict, convert=False, debug=debug) return my_runs
[docs]def compute_peak_RMS(my_runs, info, key, label, debug=False): """This function uses the calculated average wvf for a given run and computes the RMS of the peak in the given buffer. :param my_runs: dictionary containing the data. :type my_runs: dict :param info: dictionary containing the info. :type info: dict :param key: key to be inserted. :type key: str :param label: label to be inserted. :type label: str :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return my_runs: dictionary containing the data with the RMS values. :rtype: dict """ for run, ch in product(my_runs["NRun"], my_runs["NChannel"]): ref = np.asarray(my_runs[run][ch][label + "AveWvf"][0]) i_idx, f_idx = find_baseline_cuts(ref) if f_idx - i_idx <= 0: rprint( "[yellow]WARNING: Invalid integration range for RUN %s CH %s, REF %s[/yellow]" % (run, ch, label + "AveWvf") ) idx, f_idx = find_amp_decrease(ref, 1e-3) if debug: rprint( "[magenta]Using amp decrease instead: [%.2E, %.2E] \u03BCs[/magenta]" % ( idx * my_runs[run][ch]["Sampling"], f_idx * my_runs[run][ch]["Sampling"], ) ) if f_idx - i_idx <= 0: rprint( "[red]ERROR: Invalid integration range for RUN %s CH %s, REF %s[/red]" % (run, ch, label + "AveWvf")) idx, f_idx = ( my_runs[run][ch][label + "PedLim"], my_runs[run][ch][label + "PedLim"] + 1000, ) if debug: rprint( "[magenta]Using pedlim instead: [%.2E, %.2E] \u03BCs[/magenta]" % ( idx * my_runs[run][ch]["Sampling"], f_idx * my_runs[run][ch]["Sampling"], ) ) pulse_peak = np.argmax(ref) pulse_max = np.max(ref) pulse_width = f_idx - i_idx shift_idx = my_runs[run][ch][label + "PeakTime"] - (pulse_peak - i_idx) shift_idx[shift_idx < 0] = 0 aux_ADC = shift_ADCs(my_runs[run][ch][key], -shift_idx, debug=debug) aux_ADC = np.asarray(aux_ADC) my_runs[run][ch][label + "RMS"] = np.sqrt( np.mean( np.power( aux_ADC[:, :pulse_width].T - np.tile(ref[i_idx:f_idx], (len(aux_ADC), 1)).T * np.max(aux_ADC[:, :pulse_width], axis=1) / pulse_max, 2, ), axis=0, ) ) rprint("[green]--> Computed Peak RMS!!![/green]")
# ===========================================================================# # ************************ GENERAL FUNCTIONS ********************************# # ===========================================================================#
[docs]def insert_variable(my_runs, var, key, debug=False): """Insert values for each type of signal. :param my_runs: dictionary containing the data. :type my_runs: dict :param var: array of values to be inserted. :type var: nparray :param key: key to be inserted. :type key: str :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional """ for run, ch in product( np.array(my_runs["NRun"]).astype(str), np.array(my_runs["NChannel"]).astype(str) ): i = np.where(np.array(my_runs["NRun"]).astype(str) == run)[0][0] j = np.where(np.array(my_runs["NChannel"]).astype(str) == ch)[0][0] try: my_runs[run][ch][key] = var[j] except KeyError: if debug: rprint("[magenta]Inserting value...[/magenta]")
[docs]def get_ADC_key(my_runs, key, debug=False): """This function returns the ADC key for a given run. :param my_runs: dictionary containing the data. :type my_runs: dict :param key: key to be inserted. :type key: str :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: key, label -- key and label for the ADC. :rtype: str, str """ found_duplicate = 0 if key == "": for this_key in my_runs[my_runs["NRun"][0]][my_runs["NChannel"][0]].keys(): if "ADC" in this_key: key = this_key label = this_key.split("ADC")[0] found_duplicate += 1 if debug: rprint("[magenta]Found ADC branch: %s[/magenta]" % key) if found_duplicate > 1: rprint( "[red]ERROR: Found more than one ADC key! Please check load preset.[/red]" ) exit() if found_duplicate == 0: rprint("[yellow]WARNING: No ADC branch found![/yellow]") label = "" if debug: rprint( "[green]--> Returning key: '%s' and label: '%s'!!![/green]" % (key, label) ) else: label = key.split("ADC")[0] if debug: rprint("[magenta]Returning label from provided key[/magenta]") return key, label
[docs]def get_wvf_label(my_runs, key, label, debug=False): """This function returns the label for a given run. This depends on the found ADC key or the one provided by the user. :param my_runs: dictionary containing the data. :type my_runs: dict :param key: key to be inserted. :type key: str :param label: label to be inserted. :type label: str :param debug: boolean to print debug messages, defaults to False :type debug: bool, optional :return: out_key, out_label -- key and label for the ADC. :rtype: str, str """ if key == "" and label == "": found_key, found_label = get_ADC_key(my_runs, key, debug=debug) out_key = found_key out_label = found_label elif key == "" and label != "": found_key, found_label = get_ADC_key(my_runs, key, debug=debug) out_key = found_key if found_label != label: if debug: rprint( "[yellow]WARNING: Provided label does not match found label![/yellow]" ) user_confirmation = input( "Do you want to continue with coustom selection? [y/n]: " ) if user_confirmation.lower() in ["y", "yes"]: out_label = found_label if debug: rprint("[magenta]Selected label %s[/magenta]" % label) else: out_label = label if debug: rprint("[magenta]Found label %s[/magenta]" % found_label) else: out_label = found_label if debug: rprint("[magenta]Found label %s[/magenta]" % label) elif key != "" and label == "": if debug: rprint( "[yellow]WARNING: Selected input ADC but no label provided![/yellow]" ) found_key, found_label = get_ADC_key(my_runs, key, debug=debug) out_key = found_key out_label = found_label if found_label != key.split("ADC")[0]: rprint( "[red]ERROR: Found label does not match label from provided key![/red]" ) exit() else: if debug: rprint( "[yellow]WARNING: Using full coustom mode for label and ADC key selection![/yellow]" ) out_key = key out_label = label return out_key, out_label