Source code for lib.dec_functions

# ================================================================================================================================================#
# This library contains functions to perform deconvolution to our waveforms.                                                                     #
# ================================================================================================================================================#

import numpy as np
import matplotlib.pyplot as plt
import scipy.interpolate
from scipy.optimize import curve_fit
from itertools import product
from curve import Curve
from rich import print as rprint
from rich.progress import track

# Imports from other libraries
from .io_functions import check_key, save_figure
from .fit_functions import dec_gauss, fit_dec_gauss
from .ana_functions import compute_power_spec, find_amp_decrease, find_baseline_cuts, smooth


[docs]def generate_SER(my_runs, light_runs, SPE_runs, scaling_type="Amplitude", debug=False): """This function rescales AveWvfs from light runs to SPE level to be used for wvf deconvolution. :param my_runs: dictionary containing the wvf to be deconvolved :type my_runs: dict :param light_runs: dictionary containing the wvfs that work as detector response (light runs) :type light_runs: dict :param SPE_runs: dictionary containing the SPE wvf that serve as reference to rescale dec_runs :type SPE_runs: dict :param scaling_type: type of scaling to be used, defaults to "Amplitude" :type scaling_type: str, optional :param debug: debug flag, defaults to False :type debug: bool, optional """ for ii in range(len(my_runs["NRun"])): for jj in range(len(my_runs["NChannel"])): try: det_response = light_runs[light_runs["NRun"][0]][ my_runs["NChannel"][jj] ]["AnaAveWvf"][0] except KeyError: exit( f"Key error in light run: run {my_runs['NRun'][ii]} channel {my_runs['NChannel'][jj]} key AnaAveWvf" ) try: single_response = SPE_runs[SPE_runs["NRun"][0]][ my_runs["NChannel"][jj] ]["AnaAveWvfSPE"][0] except KeyError: exit( f"Key error in calib run: run {my_runs['NRun'][ii]} channel {my_runs['NChannel'][jj]} key AnaAveWvfSPE" ) if scaling_type == "Amplitude": SER = np.max(single_response) * det_response / np.max(det_response) if scaling_type == "Area": SER = np.sum(single_response) * det_response / np.sum(det_response) i_idx, f_idx = find_amp_decrease(SER, 1e-4) SER = np.roll(SER, -i_idx) my_runs[my_runs["NRun"][ii]][my_runs["NChannel"][jj]]["AnaAveWvfSER"] = [ SER ] if debug: rprint("[green]\n---SER generated![/green]")
[docs]def deconvolve(my_runs, info, keys=[], noise_run=[], peak_buffer=20, OPT={}, debug=False): """This function deconvolves any given number of arrays according to a provided SPE template. :param my_runs: dictionary containing the wvf to be deconvolved :type my_runs: dict :param info: dictionary containing the output path :type info: dict :param keys: list containing the keys of [wvf, template, outputkey] :type keys: list :param noise_run: dictionary containing the noise run :type noise_run: dict :param peak_buffer: left distance from peak to calculate baseline :type peak_buffer: int :param OPT: dictionary with settings and vis options ("SHOW", "LOGY", "NORM", "FILTER": Gauss/Wiener, etc.). :type OPT: dict :param debug: debug flag, defaults to False :type debug: bool, optional """ for run, ch in product(my_runs["NRun"], my_runs["NChannel"]): aux = [] trimm = 0 template = my_runs[run][ch][keys[1]][0] # Use a track for loop to print a progress bar for i in track( range(len(my_runs[run][ch][keys[0]])), description="Processing..." ): # Select required runs and parameters signal = my_runs[run][ch][keys[0]][i] if "Raw" in keys[0]: label = "Raw" signal = my_runs[run][ch]["PChannel"] * ( signal - my_runs[run][ch][label + "PedMean"][i] ) # Arrays must be even for fft signal = check_array_even(signal) template = check_array_even(template) # Arrays must have the same length template, signal = check_array_len(template, signal) # Roll signal to align wvfs # template = np.roll(template, np.argmax(signal)-np.argmax(template)) # Can be used for test to trimm array if check_key(OPT, "TRIMM") == True: trimm = OPT["TRIMM"] if check_key(OPT, "AUTO_TRIMM") == True and OPT["AUTO_TRIMM"] == True: j = 0 while 2**j < len(signal): j = j + 1 trimm = len(signal) - 2 ** (j - 1) # rprint(template) if check_key(OPT, "SMOOTH") == True: if OPT["SMOOTH"] > 0: signal = smooth(signal, OPT["SMOOTH"]) else: rprint("[red]Invalid value encountered in smooth[/red]") try: timebin = my_runs[run][ch]["Sampling"] except: rprint("[red]\n---Sampling key not found![/red]") if check_key(OPT, "TIMEBIN") == True: timebin = OPT["TIMEBIN"] X = timebin * np.arange(len(signal)) # Define noise (should be imported in future iterations) noise_amp = 0.5 if check_key(OPT, "NOISE_AMP") == True: noise_amp = OPT["NOISE_AMP"] if noise_run == []: noise = np.random.normal( 0, noise_amp * np.max(template), size=len(signal) ) fft_noise = np.fft.rfft(noise) else: fft_noise, fft_noise_X = compute_power_spec(noise_run, timebin) # Calculate fft arrays fft_signal = np.fft.rfft(signal) fft_signal_X = np.fft.rfftfreq(len(signal), my_runs[run][ch]["Sampling"]) i_template, f_template = find_baseline_cuts(template) # conv_template = np.convolve(template, ) fft_template = np.fft.rfft(template) wiener = abs(fft_template) ** 2 / ( abs(fft_template) ** 2 + abs(fft_noise) ** 2 ) if ( check_key(OPT, "NORM_DET_RESPONSE") == True and OPT["NORM_DET_RESPONSE"] == True ): fft_template = fft_template / np.max(fft_template) fft_template_X = np.fft.rfftfreq(len(template), timebin) if check_key(OPT, "FILTER") == True and OPT["FILTER"] == "WIENER": fft_filter_signal = fft_signal * wiener filter_signal = np.fft.irfft(fft_filter_signal) label = "Wiener" # Generate deconvoluted function fft_dec = fft_filter_signal / np.array(fft_template) else: # Interpolate wiener envelop for fit of gaussian filter wiener_buffer = 800 if check_key(OPT, "WIENER_BUFFER") == True: wiener_buffer = OPT["WIENER_BUFFER"] wiener_curve = Curve( fft_template_X[:-wiener_buffer], (-1 * wiener[:-wiener_buffer]) + 2 ) env = wiener_curve.envelope2(tc=1e6) env_wiener = scipy.interpolate.interp1d(env.x, env.y) env_wiener_Y = env_wiener(fft_signal_X[:-wiener_buffer]) env_wiener_min = np.argmin(-1 * (env_wiener_Y - 2)) # Select fit parameters and perform fit to determin cut-off if check_key(OPT, "FIXED_CUTOFF") == True and OPT["FIXED_CUTOFF"]: params = [my_runs[run][ch]["GaussCutOff"], 2] else: try: if ( check_key(OPT, "FREE_EXP") == False or OPT["FREE_EXP"] == False ): p0 = 200 lim = [10, 2000] popt, cov = curve_fit( lambda f, fc: fit_dec_gauss(f, fc, 2), np.arange(len(fft_signal_X))[:env_wiener_min], np.log10(-1 * (env_wiener_Y[:env_wiener_min] - 2)), p0=p0, bounds=lim, ) perr = np.sqrt(np.diag(cov)) params = [popt, 2] rprint( "\n-------------- GAUSS FILTER FIT VALUES --------------" ) rprint( "%s:\t%.2E\t%.2E" % ("CUT-OFF FREQUENCY", popt[0], perr[0]) ) rprint( "-----------------------------------------------------\n" ) my_runs[run][ch]["GaussCutOff"] = popt[0] elif ( check_key(OPT, "FREE_EXP") == True and OPT["FREE_EXP"] == True ): p0 = [50, 2] lim = [[10, 1], [500, 8]] params, cov = curve_fit( fit_dec_gauss, np.arange(len(fft_signal_X))[:env_wiener_min], np.log10(-1 * (env_wiener_Y[:env_wiener_min] - 2)), p0=p0, bounds=lim, ) except: params = [50, 2] rprint("[red]FIT COULD NOT BE PERFORMED![/red]") rprint("Filter strengh %f and exp %f" % (params[0], params[1])) # Generate gauss filter and filtered signal fft_gauss = dec_gauss(np.arange(len(fft_signal)), *params) fft_gauss[0] = 0 # if check_key(OPT, "PRO_RODRIGO") == True and OPT["PRO_RODRIGO"] == True: fft_filter_signal = fft_signal * fft_gauss filter_signal = np.fft.irfft(fft_filter_signal) label = "Gauss" # Generate deconvoluted function fft_dec = fft_filter_signal / np.array(fft_template) dec = np.fft.irfft(fft_dec) # dec = np.roll(dec, np.argmax(template)) # Roll the function to match original position dec_std = np.mean(dec[: np.argmax(dec) - peak_buffer]) dec = dec - dec_std i_signal, f_signal = find_baseline_cuts(signal) i_resp, f_resp = find_baseline_cuts(template) i_dec, f_dec = find_baseline_cuts(dec) dec_baseline_charge = np.trapz( dec[i_signal:f_signal], my_runs[run][ch]["Sampling"] * np.arange(len(dec[i_signal:f_signal])), ) dec_charge = np.trapz( dec, my_runs[run][ch]["Sampling"] * np.arange(len(dec)) ) if check_key(OPT, "CONVERT_ADC") == True and OPT["CONVERT_ADC"] == True: # dec = dec*np.sum(abs(fft_signal))/np.sum(abs(fft_dec)) dec = dec * np.max(abs(fft_signal)) / np.max(abs(fft_dec)) dec_baseline_charge = np.trapz( dec[i_signal:f_signal], my_runs[run][ch]["Sampling"] * np.arange(len(dec[i_signal:f_signal])), ) dec_charge = np.trapz( dec, my_runs[run][ch]["Sampling"] * np.arange(len(dec)) ) if ( check_key(OPT, "TERMINAL_OUTPUT") == True and OPT["TERMINAL_OUTPUT"] == True ): rprint( "\nDECONVOLUTION: baseline int = \t %.2E" % dec_baseline_charge ) if ( check_key(OPT, "TERMINAL_OUTPUT") == True and OPT["TERMINAL_OUTPUT"] == True ): rprint("DECONVOLUTION: total int = \t %.2E\n" % dec_charge) # rprint("Converting dec wvf amp to ADC") aux.append(dec) # ------------------------------------------------------------------------------------------------------------------- # Plot results: left shows process in time space; right in frequency space. # ------------------------------------------------------------------------------------------------------------------- if check_key(OPT, "SHOW") == True and OPT["SHOW"] == True: plt.ion() next_plot = False fig = plt.figure() plt.rcParams["figure.figsize"] = [16, 8] plt.subplot(1, 2, 1) plt.title("DECONVOLUTION RUN %s CH %s" % (run, ch)) if check_key(OPT, "NORM") == True and OPT["NORM"] == True: plt.plot( X, signal / np.max(signal), label="SIGNAL: int = %.4E" % (np.trapz(signal[i_signal:f_signal], X[i_signal:f_signal])), c="tab:blue", ds="steps", ) if check_key(OPT, "SHOW_GAUSS_SIGNAL") != False: plt.plot( X, filter_signal / np.max(filter_signal), label="GAUSS_SIGNAL: int = %.4E" % ( np.trapz( filter_signal[i_signal:f_signal], X[i_signal:f_signal], ) ), c="blue", ) plt.plot( X, template / np.max(template), label="DET_RESPONSE: int = %.4E" % (np.trapz(template[i_resp:f_resp], X[i_resp:f_resp])), c="tab:orange", ds="steps", ) if ( check_key(OPT, "CONVERT_ADC") == True and OPT["CONVERT_ADC"] == True ): plt.plot( X, dec, label="DECONVOLUTION: int = %.2E" % dec_baseline_charge, c="tab:red", ds="steps", lw=2.0, ) else: plt.plot( X, dec, label="DECONVOLUTION: int = %.2f PE" % (np.sum(dec[i_dec:f_dec])), c="tab:red", ds="steps", lw=2.0, ) else: plt.plot( X, signal, label="SIGNAL: int = %.4E" % (np.trapz(signal[i_signal:f_signal], X[i_signal:f_signal])), c="tab:blue", ds="steps", ) if check_key(OPT, "SHOW_GAUSS_SIGNAL") != False: plt.plot( X, filter_signal, label="GAUSS_SIGNAL: int = %.4E" % ( np.trapz( filter_signal[i_signal:f_signal], X[i_signal:f_signal], ) ), c="blue", ) plt.plot( X, template, label="DET_RESPONSE: int = %.4E" % (np.trapz(template[i_resp:f_resp], X[i_resp:f_resp])), c="tab:orange", ds="steps", ) if ( check_key(OPT, "CONVERT_ADC") == True and OPT["CONVERT_ADC"] == True ): plt.plot( X, dec, label="DECONVOLUTION: int = %.2E" % dec_baseline_charge, c="tab:red", ds="steps", lw=2.0, ) else: plt.plot( X, dec, label="DECONVOLUTION: int = %.2f PE" % (np.sum(dec[i_dec:f_dec])), c="tab:red", ds="steps", lw=2.0, ) # plt.axhline(0, label = "Total # PE in deconvolved signal %f"%np.sum(dec), c = "black", alpha = 0.5, ls = "--") if ( check_key(OPT, "CONVERT_ADC") == False or OPT["CONVERT_ADC"] == False ): if ( check_key(OPT, "TERMINAL_OUTPUT") == True and OPT["TERMINAL_OUTPUT"] == True ): rprint( "\nDECONVOLUTION: baseline int = \t %.2f PE" % (np.sum(dec[i_dec:f_dec])) ) if ( check_key(OPT, "TERMINAL_OUTPUT") == True and OPT["TERMINAL_OUTPUT"] == True ): rprint("DECONVOLUTION: total int = \t %.2f PE\n" % np.sum(dec)) plt.axhline(0, c="black", alpha=0.5, ls="--") # rprint("# PE in deconvolved signal %f"%np.sum(dec[i_dec:f_dec])) plt.ylabel("ADC Counts") plt.xlabel("Time in [s]") if check_key(OPT, "LOGY") == True and OPT["LOGY"] == True: if check_key(OPT, "THRLD") == True: thrld = OPT["THRLD"] else: thrld = 1e-6 plt.semilogy() plt.ylim(thrld) if check_key(OPT, "FOCUS") == True and OPT["FOCUS"] == True: plt.xlim( 4e-9 * np.array([np.argmax(signal) - 100, np.argmax(signal) + 1000]) ) plt.ylim([np.min(signal) * 1.1, np.max(dec) * 1.1]) plt.legend() plt.subplot(1, 2, 2) if check_key(OPT, "SHOW_F_SIGNAL") != False: plt.plot( fft_signal_X, np.abs(fft_signal), label="SIGNAL", c="tab:blue" ) if check_key(OPT, "SHOW_F_GSIGNAL") != False: plt.plot( fft_signal_X, np.abs(fft_filter_signal), label="GAUSS_SIGNAL", c="blue", ) if check_key(OPT, "SHOW_F_DET_RESPONSE") != False: plt.plot( fft_template_X, np.abs(fft_template), label="DET_RESPONSE", c="tab:orange", ) plt.axhline(1, ls="--", c="grey") if check_key(OPT, "SHOW_F_DEC") != False: plt.plot( fft_signal_X, np.abs(fft_dec), label="DECONVOLUTION", c="tab:red", ) if check_key(OPT, "SHOW_F_WIENER") != False: plt.plot( fft_signal_X, wiener, label="WIENER", c="tab:orange", ls="--" ) # plt.plot(env_wiener.x[:env_wiener_min], -1*(env_wiener.y[:env_wiener_min]-2), label = "ENV_WIENER", c = "tab:pink", ls = "--") if check_key(OPT, "SHOW_F_GAUSS") != False: if OPT["SHOW_F_GAUSS"] == True: plt.plot(fft_signal_X, fft_gauss, label="GAUSS", c="tab:green") plt.ylabel("a.u.") plt.xlabel("Frequency in [Hz]") plt.ylim(1e-8, np.max(fft_signal) * 100) plt.semilogy() plt.semilogx() plt.legend() while not plt.waitforbuttonpress(-1): pass if check_key(OPT, "SAVE") == True and OPT["SAVE"] == True: save_figure(fig, f"{info['OUT_PATH'][0]}/images/", run, ch, "Deconvolution", debug=debug) # plt.savefig( # f"{info['OUT_PATH'][0]}/images/run{run}/ch{ch}/run{run}_ch{ch}_Deconvolution.png", # dpi=500, # ) plt.clf() plt.ioff() my_runs[run][ch][label + keys[2]] = np.asarray(aux) if check_key(OPT, "CONVERT_ADC") == True and OPT["CONVERT_ADC"] == True: my_runs[run][ch][label + keys[2] + "ADC"] = np.asarray(aux) rprint("Generated wvfs with key %s" % (label + keys[2])) plt.close()
# def convolve(my_runs, keys = [], OPT = {}): # rprint("\n### WELCOME TO THE CONVOLUTION STUDIES ###\n", "blue", bold=True) # for run, ch in product(my_runs["NRun"], my_runs["NChannel"]): # aux = dict() # for i in range(len(my_runs[run][ch][keys[0]])): # # Select required runs and parameters # signal = my_runs[run][ch][keys[0]][i] # timebin = my_runs[run][ch]["Sampling"] # if check_key(OPT, "TIMEBIN") == True: timebin = OPT["TIMEBIN"] # X = timebin*np.arange(len(signal)) # ######################################################################## # #_____________________CONVOLUTION_AND_FIT_PARAMETERS___________________# # ######################################################################## # # MUON SC CONFIG # t_fast = 2e-8; t_fast_low = 1e-8; t_fast_high = 4e-8 # t_slow = 1e-6; t_slow_low = 6e-7; t_slow_high = 5e-6 # amp_fast = 2e-8; amp_fast_low = 1e-8; amp_fast_high = 3e-8 # amp_slow = 5e-8; amp_slow_low = 1e-8; amp_slow_high = 9e-8 # sigma = 2e-8; sigma_low = 9e-9; sigma_high = 3e-8 # # MUON SiPM CONFIG # t_fast = 2e-8; t_fast_low = 1e-8; t_fast_high = 4e-8 # t_slow = 1.2e-6; t_slow_low = 1e-6; t_slow_high = 5e-6 # amp_fast = 2e-8; amp_fast_low = 1e-8; amp_fast_high = 3e-8 # amp_slow = 2e-8; amp_slow_low = 8e-9; amp_slow_high = 9e-8 # sigma = 2e-8; sigma_low = 9e-9; sigma_high = 3e-8 # fit_initials = (t_fast, t_slow, amp_fast, amp_slow, sigma) # fit_finals = [t_fast, t_slow, amp_fast, amp_slow, sigma] # limits_low = [t_fast_low, t_slow_low, amp_fast_low, amp_slow_low, sigma_low] # limits_high = [t_fast_high, t_slow_high, amp_fast_high, amp_slow_high, sigma_high] # fit_limits = (limits_low, limits_high) # popt, pcov = curve_fit(conv_func2, [laser.wvf_x[:-limit], laser.wvf[:-limit]], alpha.wvf[:-limit], p0 = fit_initials, bounds = fit_limits, method = "trf") # perr = np.sqrt(np.diag(pcov)) # conv = conv_func2([laser.wvf_x[:-limit], laser.wvf[:-limit]], *popt) # func = func2(alpha.wvf_x, *popt) # conv_int, f_conv, i_conv = signal_int("CONV FUNC", func2(np.arange(0, alpha.wvf_x[-1], 5e-10), *popt), timebin, "SiPM", "ALL", th = thrld, out = True) # labels = ["TFAST", "TSLOW", "AFAST", "ASLOW", "SIGMA"] # rprint("\n--- FIT VALUES ---") # for i in range(len(fit_initials)): # fit_finals[i] = popt[i] # rprint("%s: %.2E \u00B1 %.2E"%(labels[i], popt[i], perr[i])) # rprint("------------------\n") # rprint("SLOW = %.2f%%"%(100*popt[3]/(popt[3]+popt[2]))) # ######################################################################## # #________________________PLOT_FIRST_RESULT_____________________________# # ######################################################################## # # fig1, axs = plt.subplots(2, 1, sharex = True) # fig1, axs = plt.subplots(2, 1) # plt.title(decon_runs) # fig1.subplots_adjust(hspace = 0.25) # fig1.set_figheight(6) # fig1.set_figwidth(6) # # axs[0] = plt.subplot2grid(shape = (1, 1), loc = (0, 0), colspan = 3) # # axs[1] = plt.subplot2grid(shape = (1, 1), loc = (3, 0), colspan = 3) # axs[0].plot(laser.wvf_x, laser.wvf, label = label_luz) # axs[0].plot(alpha.wvf_x, alpha.wvf, label = label_alp) # axs[0].plot(laser.wvf_x[:-limit], conv, label = "Fitted Convolution") # axs[0].axvline(laser.wvf_x[-limit], color = "grey", ls = ":") # axs[0].set_ylabel("Normalized Amplitude") # axs[0].axhline(0, color = "grey", ls = ":") # axs[0].set_ylim(1e-4, np.max(alpha.wvf)*1.5) # axs[0].legend() # if logy == True: # axs[0].semilogy() # axs[1].semilogy() # axs[1].plot(alpha.wvf_x[np.argmax(alpha.wvf)-np.argmax(func):], func[:np.argmax(func)-np.argmax(alpha.wvf)], label = "Convolution Func.") # axs[1].axhline(0, color = "grey", ls = ":") # axs[1].set_ylim(1e-6, 10) # axs[1].set_xlabel("Time in [s]"); axs[1].set_ylabel("Convolution signal") # plt.show() # # output_file.write("%.2E \t\u00B1\t %.2E\n"%(p, perr2[0])) # # output_file.write("%.2E \t\u00B1\t %.2E\n"%(t0, perr1[4])) # output_file.write("%.2E \t\u00B1\t %.2E\n"%(fit_finals[4], perr[4])) # output_file.write("%.2E \t\u00B1\t %.2E\n"%(fit_finals[2], perr[2])) # output_file.write("%.2E \t\u00B1\t %.2E\n"%(fit_finals[0], perr[0]))
[docs]def check_array_len(wvf1, wvf2): if len(wvf1) < len(wvf2): rprint("[yellow]RAW WVF IS LONGER THAN WVF TEMPLATE[/yellow]") wvf2 = wvf2[: -(len(wvf2) - len(wvf1))] if len(wvf1) > len(wvf2): rprint("[yellow]RAW WVF IS SHORTER THAN WVF TEMPLATE[/yellow]") wvf1 = wvf1[: -(len(wvf1) - len(wvf2))] return wvf1, wvf2
[docs]def check_array_even(wvf): if len(wvf) % 2 > 0: return wvf[:-1] else: return wvf
[docs]def conv_func2(wvf, t0, sigma, tau1, a1, tau2, a2): resp = func2(wvf[0], 0, t0, sigma, a1, tau1, a2, tau2) conv = convolve(wvf[1], resp) conv = conv / np.max(conv) wvf_max = np.argmax(wvf[1]) conv_max = np.argmax(conv) return conv[conv_max - wvf_max : conv_max + len(wvf[1]) - wvf_max]
[docs]def logconv_func2(wvf, t0, sigma, tau1, a1, tau2, a2): resp = logfunc2(wvf[0], 0, t0, sigma, a1, tau1, a2, tau2) conv = convolve(wvf[1], resp) conv = conv / np.max(conv) wvf_max = np.argmax(wvf[1]) conv_max = np.argmax(conv) return conv[conv_max - wvf_max : conv_max + len(wvf[1]) - wvf_max]