# ================================================================================================================================================#
# This library contains functions to perform fits to the data. #
# ================================================================================================================================================#
from srcs.utils import get_project_root
import os, stat, math, scipy
from lmfit import models
from typing import Optional
from itertools import product
from scipy.signal import find_peaks
from scipy.optimize import curve_fit
import numpy as np
from scipy.special import erf
import matplotlib.pyplot as plt
from math import factorial as fact
from rich import print as rprint
from scipy.ndimage import gaussian_filter1d
# Imports from other libraries
from .io_functions import check_key, read_yaml_file, save_figure
from .ana_functions import get_run_units, find_amp_decrease
np.seterr(divide="ignore")
root = get_project_root()
# ===========================================================================#
# ************************** THEORETICAL FUNCTIONS **************************#
# ===========================================================================#
[docs]def chi_squared(x, y, popt):
fit_y = np.sum(
[gaussian(x, popt[j], popt[j + 1], popt[j + 2]) for j in range(0, len(popt), 3)]
)
return np.sum((y - fit_y) ** 2 / fit_y) / (y.size - len(popt))
[docs]def pure_scint(time, t0, a1, a2, tau1, tau2):
y = a1 / tau1 * np.exp(-(time - t0) / tau1) + a2 / tau2 * np.exp(
-(time - t0) / tau2
)
[docs]def gauss(x, a, x0, sigma):
return (
a
/ (sigma * math.sqrt(2 * math.pi))
* np.exp(-0.5 * np.power((x - x0) / sigma, 2))
)
[docs]def gaussian_train(x, *params):
y = np.zeros_like(x)
for i in range(0, len(params), 3):
center = params[i]
height = params[i + 1]
width = params[i + 2]
y = y + gaussian(x, center, height, width)
return y
[docs]def loggaussian_train(x, *params):
y = gaussian_train(x, *params)
y[y <= 0] = 1e-1
return np.log(y)
[docs]def gaussian(x, center, height, width):
return height * np.exp(-0.5 * ((x - center) / width) ** 2)
[docs]def test_gaussian(x, center, width):
return np.exp(-0.5 * ((x - center) / width) ** 2)
# def pmt_spe(x, height, center, width):
# return height * np.exp(-0.5*((x - center)/width)**2)
[docs]def loggaussian(x, center, height, width):
return np.log10(gaussian(x, center, height, width))
[docs]def func(t, t0, sigma, a, tau):
return (
(2 * a / tau)
* np.exp((sigma / (np.sqrt(2) * tau)) ** 2 - (np.array(t) - t0) / tau)
* (1 - erf((sigma**2 - tau * (np.array(t) - t0)) / (np.sqrt(2) * sigma * tau)))
)
[docs]def func2(t, p, t0, sigma, a1, tau1, sigma2, a2, tau2):
return p + func(t, t0, sigma, a1, tau1) + func(t, t0, sigma2, a2, tau2)
[docs]def logfunc2(t, p, t0, sigma1, a1, tau1, sigma2, a2, tau2):
return np.log(p + func(t, t0, sigma1, a1, tau1) + func(t, t0, sigma2, a2, tau2))
[docs]def logfunc3(t, p, t0, sigma, a1, tau1, a2, tau2, a3, tau3):
return np.log(
p
+ func(t, t0, sigma, a1, tau1)
+ func(t, t0, sigma, a2, tau2)
+ func(t, t0, sigma, a3, tau3)
)
[docs]def func3(t, p, t0, sigma, a1, tau1, a2, tau2, a3, tau3):
return (
p
+ func(t, t0, sigma, a1, tau1)
+ func(t, t0, sigma, a2, tau2)
+ func(t, t0, sigma, a3, tau3)
)
[docs]def scfunc(t, a, b, c, d, e, f):
return (
a * np.exp(-(t - c) / b) / np.power(2 * np.pi, 0.5) * np.exp(-(d**2) / (b**2))
) * (1 - erf(((c - t) / d + d / b) / np.power(2, 0.5))) + (
e * np.exp(-(t - c) / f) / np.power(2 * np.pi, 0.5) * np.exp(-(d**2) / (f**2))
) * (
1 - erf(((c - t) / d + d / f) / np.power(2, 0.5))
)
[docs]def dec_gauss(f, fc, n):
y = np.exp(-0.5 * (f / fc) ** n)
return y
[docs]def fit_dec_gauss(f, fc, n):
y = np.log10(dec_gauss(f, fc, n))
y[0] = 0
return y
[docs]def purity(t, p, t0, a1, a3, sigma, quenching):
tau1 = (1 / 7.1e-9 + quenching) ** -1
tau3 = (1 / 1.66e-6 + quenching) ** -1
a1_prime = a1 / (1 + tau1 * quenching)
a3_prime = a3 / (1 + tau3 * quenching)
return p + func(t, t0, sigma, a1_prime, tau1) + func(t, t0, sigma, a3_prime, tau3)
[docs]def simple_purity(t, p, t0, a0, a1, sigma, quenching):
tau1 = (1 / 7.1e-9 + quenching) ** -1
tau3 = (1 / 1.66e-6 + quenching) ** -1
a1_prime = a0 / (1 + tau1 * quenching)
a3_prime = (a0 * (1 - a1) / a1) / (1 + tau3 * quenching)
return p + func(t, t0, sigma, a1_prime, tau1) + func(t, t0, sigma, a3_prime, tau3)
[docs]def logpurity(t, p, t0, a1, a3, sigma, quenching):
return np.log(purity(t, p, t0, a1, a3, sigma, quenching))
[docs]def logsimple_purity(t, p, t0, a0, a1, sigma, quenching):
return np.log(simple_purity(t, p, t0, a0, a1, sigma, quenching))
[docs]def lmfit_models(function):
# if function == "gaussian": return models.GaussianModel()
if function == "linear":
return models.LinearModel()
if function == "lorentzian":
return models.LorentzianModel()
if function == "exponential":
return models.ExponentialModel()
if function == "powerlaw":
return models.PowerLawModel()
# Lmfit for initial parameters
# model = lmfit_models(function)
# params = model.guess(ydata, x=xdata)
# result = model.fit (ydata, params, x=xdata)
# rprint(f'Chi-square = {result.chisqr:.4f}, Reduced Chi-square = {result.redchi:.4f}')
# --------------------------------------------------------------------------- #
# THIS LIBRARY NEED MIMO PORQUE HAY COSAS REDUNDANTES QUE SE PUEDEN UNIFICAR
[docs]def fit_gaussians(x, y, *p0):
assert x.shape == y.shape, "Input arrays must have the same shape."
popt, pcov = curve_fit(gaussian_train, x, y, p0=p0[0])
fit_y = gaussian_train(x, *popt)
chi_squared = np.sum(
(y[abs(fit_y) > 0.1] - fit_y[abs(fit_y) > 0.1]) ** 2 / fit_y[abs(fit_y) > 0.1]
) / (y.size - len(popt))
return popt, fit_y, chi_squared
##Binomial+Poisson distribution
[docs]def B(i, k, debug=False):
"""
Factorial factor of F
"""
if (i == 0) & (k == 0):
return 1
if (i == 0) & (k > 0):
return 0
else:
return fact(k - 1) * fact(k) / (fact(i - 1) * fact(i) * fact(k - i))
[docs]def F(K, p, L, debug=False):
"""
Computes prob of the kth point in a convoluted poisson+binomial distribution.
L is the mean value of the poisson, p is the binomial coef, i.e. the crosstalk we want to compute
"""
aux_sum = 0
if debug:
rprint(K)
for i in range(K + 1):
aux_sum += B(i, K) * ((L * (1 - p)) ** i) * (p ** (K - i))
return np.exp(-L) * aux_sum / fact(K)
[docs]def PoissonPlusBinomial(x, N, p, L, debug=False):
# N = len(x)
N = int(N)
aux = np.zeros(shape=N)
for i in range(N):
if debug:
rprint(x, i, x[i])
aux[i] = F(int(x[i]), p, L)
return aux / sum(aux)
# ===========================================================================#
# *********************** FITTING FUNCTIONS *********************************#
# ===========================================================================#
[docs]def gaussian_fit(counts, bins, bars, thresh, fit_function="gaussian", custom_fit=[0]):
"""This function fits the histogram, to a gaussians.
:params counts: counts of the histogram
:type counts: np.array
:params bins: bins of the histogram
:type bins: np.array
:params bars: bars of the histogram
:type bars: np.array
:params thresh: threshold value (for height of peaks and valleys)
:type thresh: int
:params fit_function: function to fit to, defaults to "gaussian"
:type fit_function: str, optional
:params custom_fit: custom fit, defaults to [0]
:type custom_fit: list, optional
:return: x, popt, pcov, perr -- x values, fit parameters, covariance matrix and errors
:rtype: tuple
"""
#### PEAK FINDER PARAMETERS #### thresh = int(len(my_runs[run][ch][key])/1000), wdth = 10 and prom = 0.5 work well
wdth = 10
prom = 0.5
acc = 1000
## Create linear interpolation between bins to search peaks in these variables ##
if len(custom_fit) == 2:
mean = custom_fit[0]
sigma = custom_fit[1]
x = np.linspace(mean - sigma, mean + sigma, acc)
y_intrp = scipy.interpolate.interp1d(bins[:-1], counts)
y = y_intrp(x)
else:
x = np.linspace(bins[1], bins[-2], acc)
y_intrp = scipy.interpolate.interp1d(bins[:-1], counts)
y = y_intrp(x)
rprint("\n...Fitting to a gaussian...")
## Find indices of peaks ##
if fit_function == "gaussian":
peak_idx, _ = find_peaks(y, height=thresh, width=wdth, prominence=prom)
if fit_function == "loggaussian":
peak_idx, _ = find_peaks(
np.log10(y), height=np.log10(thresh), width=wdth, prominence=prom
)
if len(custom_fit) == 2:
rprint("\n--- Customized fit ---")
mean = float(custom_fit[0])
sigma = float(custom_fit[1])
best_peak_idx = peak_idx[np.abs(x[peak_idx] - mean).argmin()]
best_peak_idx1 = best_peak_idx + 50
x_gauss = x
y_gauss = y
rprint("Taking peak at: ", x[best_peak_idx])
else:
sigma = abs(wdth * (bins[0] - bins[1]))
best_peak_idx = peak_idx[0]
best_peak_idx1 = best_peak_idx + 50
x_space = np.linspace(
x[best_peak_idx] - sigma, x[best_peak_idx1] + sigma, acc
) # Array with values between the x_coord of 2 consecutives peaks
step = x_space[1] - x_space[0]
x_gauss = x_space - int(acc / 2) * step
x_gauss = x_gauss[x_gauss >= bins[0]]
y_gauss = y_intrp(x_gauss)
rprint("Taking peak at: ", x[best_peak_idx])
# try:
popt, pcov = curve_fit(
gaussian,
x_gauss,
y_gauss,
p0=[x[best_peak_idx1], y[best_peak_idx], sigma],
maxfev=5000,
)
# popt, pcov = curve_fit(gaussian,x_gauss,y_gauss,p0=[y[best_peak_idx],x[best_peak_idx1]],sigma = sigma, absolute_sigma=True, maxfev=5000)
perr = np.sqrt(np.diag(pcov))
chi2 = chi_squared(x_gauss, y_gauss, popt)
# except:
# rprint("WARNING: Peak could not be fitted")
return x, popt, pcov, perr
# return x, popt, pcov, perr, chi2 # UPLOAD WHEN POSSIBLE MERGING IN MAIN BRANCH; upgrade all the fit functions
[docs]def peak_valley_finder(x, y, params):
"""This function finds the peaks and valleys of the histogram.
"""
dist = params["PEAK_DISTANCE"]
thresh = params["THRESHOLD"]
wdth = params["WIDTH"]
prom = params["PROMINENCE"]
acc = params["ACCURACY"]
max_y = np.max(y)
y = y / max_y
peak_idx, _ = find_peaks(
y, height=thresh, width=wdth, prominence=prom, distance=dist
)
valley_idx, _ = find_peaks(
1 - y, height=0, width=wdth, prominence=prom, distance=dist
)
rprint("Peaks found at: ", peak_idx)
rprint("Valleys found at: ", valley_idx)
rprint(
"[cyan]PeakFinder using parameters: dist = %i, thresh = %.2f, wdth = %i, prom = %.2f, acc = %i[/cyan]"
% (dist, thresh, wdth, prom, acc)
)
return peak_idx, valley_idx[: len(peak_idx)]
[docs]def gaussian_train_fit(fig, x, y, y_intrp, peak_idx, valley_idx, params, debug=False):
"""This function fits the histogram, to a train of gaussians.
"""
initial = [] # Saving for input to the TRAIN FIT
if len(peak_idx) < len(valley_idx):
n_peaks = len(peak_idx) # Number of peaks found by find_peak
else:
n_peaks = len(valley_idx)
labels = [""] * (n_peaks - 1) + ["Initial fit"]
for i in range(n_peaks):
if i == 0:
x_gauss = np.linspace(
x[peak_idx[i]] - (x[valley_idx[i]] - x[peak_idx[i]]),
x[valley_idx[i]],
params["ACCURACY"],
) # Array with values between the x_coord of 2 consecutives peaks
else:
x_gauss = np.linspace(
x[valley_idx[i - 1]], x[valley_idx[i]], params["ACCURACY"]
)
x_gauss = x_gauss[x_gauss >= x[0]]
y_gauss = y_intrp(x_gauss)
# plt.plot(x_gauss,y_gauss,ls="--",alpha=0.9)
try:
# (y[peak_idx[i]]),x[peak_idx[i]],abs(wdth*(bins[0]-bins[1]))])
popt, pcov = curve_fit(
gaussian,
x_gauss,
y_gauss,
p0=[
x[peak_idx[i]],
y[peak_idx[i]],
abs(params["WIDTH"] * (x_gauss[0] - x_gauss[1])),
],
bounds=(
[x[peak_idx[i]] - params["WIDTH"], 0, 0],
[
x[peak_idx[i]] + params["WIDTH"],
np.inf,
10 * abs(params["WIDTH"] * (x_gauss[0] - x_gauss[1])),
],
),
)
perr = np.sqrt(np.diag(pcov))
## FITTED to gaussian(x, height, center, width) ##
initial.append(popt[0]) # CENTER
initial.append(popt[1]) # HEIGHT
initial.append(np.abs(popt[2])) # WIDTH
fig.plot(
x_gauss,
gaussian(x_gauss, *popt),
ls="--",
c="black",
alpha=0.5,
label=labels[i],
)
except:
continue
pcov = np.zeros((len(initial), len(initial)))
perr = np.zeros(len(initial))
popt = initial
## GAUSSIAN TRAIN FIT ## Taking as input parameters the individual gaussian fits with initial
try:
popt, pcov = curve_fit(
gaussian_train, x[: valley_idx[-1]], y[: valley_idx[-1]], p0=initial
)
perr = np.sqrt(np.diag(pcov))
except ValueError:
rprint("[red]Full fit could not be performed[/red]")
except RuntimeError:
rprint("[red]Full fit could not be performed[/red]")
return popt, pcov
[docs]def pmt_spe_fit(counts, bins, bars, thresh):
"""This function fits the histogram, to a train of gaussians
\n[es muy parecida a gaussian_train_fit; hay algunas cosas que las coge en log pero igual se pueden unificar]
\n[se le puede dedicar un poco mas de tiempo para tener un ajuste mas fino pero parece que funciona]
"""
## Threshold value (for height of peaks and valleys) ##
# thresh = int(len(my_runs[run][ch][key])/1000)
wdth = 10
prom = 0.5
acc = 1000
## Create linear interpolation between bins to search peaks in these variables ##
x = np.linspace(bins[1], bins[-2], acc)
y_intrp = scipy.interpolate.interp1d(bins[:-1], counts)
y = y_intrp(x)
## Find indices of peaks ##
peak_idx, _ = find_peaks(
np.log(y), height=np.log(thresh), width=wdth, prominence=prom
)
# peak_idx, _ = find_peaks(y, height = thresh, width = wdth, prominence = prom)
## Find indices of valleys (from inverting the signal) ##
valley_idx, _ = find_peaks(
-np.log(y),
height=[-np.max(np.log(counts)), -np.log(thresh)],
width=wdth,
prominence=prom,
)
# valley_idx, _ = find_peaks(-y, height = [-np.max(counts), -thresh], width = wdth)
n_peaks = 4 # Fit of ped+1pe+2pe
initial = [] # Saving for input to the TRAIN FIT
if len(peak_idx) - 1 < n_peaks:
n_peaks = len(peak_idx) - 1 # Number of peaks found by find_peak
for i in range(n_peaks):
x_space = np.linspace(
x[peak_idx[i]], x[peak_idx[i + 1]], acc
) # Array with values between the x_coord of 2 consecutives peaks
step = x_space[1] - x_space[0]
x_gauss = x_space - int(acc / 2) * step
x_gauss = x_gauss[x_gauss >= bins[0]]
y_gauss = y_intrp(x_gauss)
# plt.plot(x_gauss,y_gauss,ls="--",alpha=0.9)
try:
popt, pcov = curve_fit(
gaussian,
x_gauss,
y_gauss,
p0=[y[peak_idx[i]], x[peak_idx[i]], abs(wdth * (bins[0] - bins[1]))],
)
perr = np.sqrt(np.diag(pcov))
## FITTED to gaussian(x, height, center, width) ##
initial.append(popt[1]) # HEIGHT
initial.append(popt[0]) # CENTER
initial.append(np.abs(popt[2])) # WIDTH
# plt.plot(x_gauss,gaussian(x_gauss, *popt), ls = "--", c = "black", alpha = 0.5)
# plt.ylim((1e-2,1e5))
except:
initial.append(x[peak_idx[i]])
initial.append(y[peak_idx[i]])
initial.append(abs(wdth * (bins[0] - bins[1])))
rprint("[red]Peak %i could not be fitted[/red]" % i)
try:
# GAUSSIAN TRAIN FIT ## Taking as input parameters the individual gaussian fits with initial
# popt, pcov = curve_fit(loggaussian_train,x[:peak_idx[-1]],np.log10(y[:peak_idx[-1]]),p0=initial)
popt, pcov = curve_fit(
gaussian_train, x[: peak_idx[-1]], y[: peak_idx[-1]], p0=initial
)
perr = np.sqrt(np.diag(pcov))
except:
popt = initial
rprint("[red]Full fit could not be performed[/red]")
return x, y, peak_idx, valley_idx, popt, pcov, perr
[docs]def cut_threshold(raw, thrld):
for i in range(len(raw)):
if raw[i] <= thrld:
raw[i] = thrld
if np.isnan(raw[i]):
raw[i] = thrld
if np.isinf(raw[i]):
raw[i] = thrld
return raw
[docs]def peak_fit(
fit_raw, raw_x, buffer, thrld, sigma_fast=1e-9, a_fast=1, tau_fast=1e-8, OPT={}
):
"""This function fits the peak to a gaussian function, and returns the parameters
"""
raw_max = np.argmax(fit_raw)
if check_key(OPT, "CUT_THRESHOLD") == True and OPT["CUT_THRESHOLD"] == True:
fit_raw = cut_threshold(fit_raw, thrld)
guess_t0 = raw_x[raw_max]
p = np.mean(fit_raw[: raw_max - buffer])
t0 = guess_t0
t0_low = guess_t0 * 0.02
t0_high = guess_t0 * 50
sigma = sigma_fast
sigma_low = sigma * 1e-2
sigma_high = sigma * 1e2
a1 = a_fast
a1_low = a_fast * 1e-2
a1_high = a_fast * 1e2
tau1 = tau_fast
tau1_low = 6e-9
tau1_high = tau1 * 1e2
bounds = (
[t0_low, sigma_low, a1_low, tau1_low],
[t0_high, sigma_high, a1_high, tau1_high],
)
initial = [t0, sigma, a1, tau1]
labels = ["TIME", "SIGM", "AMP1", "TAU1"]
# FIT PEAK
# try:
popt, pcov = curve_fit(
func,
raw_x[raw_max - buffer : raw_max + int(buffer / 2)],
fit_raw[raw_max - buffer : raw_max + int(buffer / 2)],
p0=initial,
)
perr = np.sqrt(np.diag(pcov))
# except:
# rprint("Peak fit could not be performed")
# popt = initial
# perr = np.zeros(len(initial))
# PRINT FIRST FIT VALUE
if check_key(OPT, "TERMINAL_OUTPUT") == True and OPT["TERMINAL_OUTPUT"] == True:
rprint("\n--- FISRT FIT VALUES (FAST) ---")
for i in range(len(initial)):
rprint("%s:\t%.2E\t%.2E" % (labels[i], popt[i], perr[i]))
rprint("-------------------------------")
# EXPORT FIT PARAMETERS
# a1 = popt[2];sigma = popt[1];tau1 = popt[3];t0 = popt[0]
return popt, perr
[docs]def sipm_fit(info, labels, raw, raw_x, fit_range, thrld=1e-6, OPT:Optional[dict]=None, save:bool=False, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
max = np.argmax(raw)
# thrld = 1e-4
buffer1 = fit_range[0]
buffer2 = fit_range[1]
OPT["CUT_THRESHOLD"] = True
popt1, perr1 = peak_fit(raw, raw_x, buffer1, thrld=thrld, OPT=OPT)
p = np.mean(raw[: max - buffer1])
a1 = 2e-5
a1_low = 1e-8
a1_high = 9e-2
a2 = 2e-5
a2_low = 1e-8
a2_high = 9e-2
a3 = 2e-5
a3_low = 1e-8
a3_high = 9e-2
tau1 = 9e-8
tau1_low = 6e-9
tau1_high = 1e-7
tau2 = 9e-7
tau2_low = tau1_high
tau2_high = 1e-6
tau3 = 9e-6
tau3_low = tau2_high
tau3_high = 1e-5
sigma2 = popt1[1] * 10
sigma2_low = popt1[1]
sigma2_high = popt1[1] * 100
# USING VALUES FROM FIRST FIT PERFORM SECONG FIT FOR THE SLOW COMPONENT
bounds2 = (
[sigma2_low, a2_low, tau2_low, a3_low, tau3_low],
[sigma2_high, a2_high, tau2_high, a3_high, tau3_high],
)
initial2 = (sigma2, a2, tau2, a3, tau3)
labels2 = ["SIGM", "AMP2", "TAU2", "AMP3", "TAU3"]
popt, pcov = curve_fit(
lambda t, sigma2, a2, tau2, a3, tau3: logfunc3(
t, p, popt1[0], sigma2, popt1[2], popt1[3], a2, tau2, a3, tau3
),
raw_x[max - buffer1 : max + buffer2],
np.log(raw[max - buffer1 : max + buffer2]),
p0=initial2,
bounds=bounds2,
method="trf",
)
perr2 = np.sqrt(np.diag(pcov))
sigma2 = popt[0]
a2 = popt[1]
tau2 = popt[2]
a3 = popt[3]
tau3 = popt[4]
param = [p, a1, sigma2, tau1, popt1[3], a2, tau2, a3, tau3]
# if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(OPT, "SHOW") == False:
# # CHECK FIRST FIT
# plt.rcParams['figure.figsize'] = [16, 8]
# plt.subplot(1, 2, 1)
# plt.title("First fit to determine peak")
# plt.plot(raw_x, raw, label="raw")
# plt.plot(raw_x[max-buffer1:max+int(buffer1/2)], func(raw_x[max-buffer1:max+int(buffer1/2)], *popt1), label="FIT")
# # plt.axvline(raw_x[-buffer2], ls = "--", c = "k")
# plt.xlabel("Time in [s]"); plt.ylabel("ADC Counts")
# if check_key(OPT, "LOGY") != False: plt.semilogy();plt.ylim(thrld, raw[max]*1.1)
# plt.legend()
# plt.subplot(1, 2, 2)
# plt.title("Second fit with full wvf")
# plt.plot(raw_x, raw, zorder=0, c="tab:blue", label="raw")
# plt.plot(raw_x[max-buffer1:max+buffer2], func3(raw_x[max-buffer1:max+buffer2], *param), c="tab:orange", label="FIT")
# plt.plot(raw_x, func3(raw_x, *param), c="tab:green", label="FIT_FULL_LENGHT")
# plt.xlabel("Time in [s]"); plt.ylabel("ADC Counts")
# # plt.axvline(raw_x[-buffer2], ls = "--", c = "k")
# if check_key(OPT, "LOGY") != False: plt.semilogy();plt.ylim(thrld, raw[max]*1.1)
# plt.legend()
# if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(OPT, "SHOW") == False:
# while not plt.waitforbuttonpress(-1): pass
# plt.clf()
aux = func3(raw_x, *param)
return aux, raw, param, perr2, labels2
[docs]def scint_fit(info, labels, raw, raw_x, fit_range, thrld=1e-6, i_param={}, OPT:Optional[dict]=None, save:bool=False, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
next_plot = False
OPT["CUT_THRESHOLD"] = True
# Define input parameters from dictionary
sigma = i_param["sigma"]
a_fast = i_param["a_fast"]
tau_fast = i_param["tau_fast"]
a_slow = i_param["a_slow"]
tau_slow = i_param["tau_slow"]
# Find peak and perform fit
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
popt1, perr1 = peak_fit(raw, raw_x, buffer1, thrld, sigma, a_fast, tau_fast, OPT)
# USING VALUES FROM FIRST FIT PERFORM SECONG FIT FOR THE SLOW COMPONENT
p = np.mean(raw[: raw_max - buffer1])
p_std = np.std(raw[: raw_max - buffer1])
sigma1 = popt1[1]
sigma1_low = sigma1 * 0.9
sigma1_high = sigma1 * 1.1
a1 = popt1[2]
a1_low = a1 * 0.9
a1_high = a1 * 1.1
tau1 = popt1[3]
tau1_low = tau1 * 0.9
tau1_high = tau1 * 1.1
sigma2 = sigma
sigma2_low = sigma * 0.9
sigma2_high = sigma * 1.1
a2 = a_slow
a2_low = a_slow * 1e-2
a2_high = a_slow * 1e2
tau2 = tau_slow
tau2_low = tau_slow * 1e-2
tau2_high = tau_slow * 1e2
initial2 = (a1, sigma2, a2, tau2)
# bounds2 = ([a1_low, sigma2_low, a2_low, tau2_low], [a1_high, sigma2_high, a2_high, tau2_high])
# labels2 = ["AMP1", "SIG2", "AMP2", "TAU2"]
try:
popt2, pcov2 = curve_fit(
lambda t, a1, sigma2, a2, tau2: logfunc2(
t, p, popt1[0], popt1[1], a1, popt1[3], sigma2, a2, tau2
),
raw_x[raw_max - buffer1 : raw_max + buffer2],
np.log(raw[raw_max - buffer1 : raw_max + buffer2]),
p0=initial2,
)
perr2 = np.sqrt(np.diag(pcov2))
except:
rprint("[red]Fit could not be performed[/red]")
popt2 = initial2
perr2 = np.zeros(len(popt2))
t0 = popt1[0]
a1 = popt2[0]
sigma2 = popt2[1]
a2 = popt2[2]
tau2 = popt2[3]
labels = ["PED", "T0", "SIG1", "AMP1", "TAU1", "SIG2", "AMP2", "TAU2"]
param = [p, popt1[0], popt1[1], popt2[0], popt1[2], popt2[1], popt2[2], popt2[3]]
perr = [p_std, perr1[0], perr1[1], perr2[0], perr1[2], perr2[1], perr2[2], perr2[3]]
# if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(OPT, "SHOW") == False:
# # rprint("SHOW key not included in OPT")
# # CHECK FIRST FIT
# plt.rcParams['figure.figsize'] = [16, 8]
# plt.subplot(1, 2, 1)
# plt.title("First fit to determine peak")
# plt.plot(raw_x, raw, label="raw", c=color)
# plt.plot(raw_x[max-buffer1:max+int(buffer1/2)], func(raw_x[max-buffer1:max+int(buffer1/2)], *popt1), label="FIT")
# # plt.axvline(raw_x[-buffer2], ls = "--", c = "k")
# plt.xlabel("Time in [s]"); plt.ylabel("ADC Counts")
# if check_key(OPT, "LOGY") == True and OPT["LOGY"] == True: plt.semilogy();plt.ylim(thrld, raw[max]*1.1)
# plt.legend()
# plt.subplot(1, 2, 2)
# plt.title("Second fit with full wvf")
# plt.plot(raw_x, raw, zorder=0, label="raw", c=color)
# plt.plot(raw_x[max-buffer1:max+buffer2], func2(raw_x[max-buffer1:max+buffer2], *param), label="FIT")
# plt.xlabel("Time in [s]"); plt.ylabel("ADC Counts")
# plt.axvline(raw_x[max+buffer2], ls = "--", c = "k")
# if check_key(OPT, "LOGY") == True and OPT["LOGY"] == True: plt.semilogy();plt.ylim(thrld, raw[max]*1.1)
# plt.legend()
# while not plt.waitforbuttonpress(-1): pass
# plt.clf()
aux = func2(raw_x, *param)
return aux, raw, param, perr, labels
[docs]def purity_fit(info, labels, raw, raw_x, fit_range, thrld=1e-6, i_param={}, OPT:Optional[dict]=None, save:bool=False, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
initial = []
labels = ["PED", "T0", "A1", "A3", "SIGMA", "QUENCH"]
for init in labels:
if init not in i_param:
if init == "PED":
initial.append(np.mean(raw[: raw_max - buffer1]))
if init == "T0":
initial.append(raw_x[raw_max])
else:
initial.append(i_param[init])
# if np.any(np.isnan(raw)) or np.any(np.isinf(raw)) or np.any(raw <= 0):
# rprint("[red]Negative/Infinite/Zero/Nan values found in raw[/red]")
if check_key(OPT, "FILTER") and isinstance(OPT["FILTER"], int):
rprint(
"Filtering the signal with a gaussian filter of sigma = %i" % OPT["FILTER"]
)
raw = gaussian_filter1d(raw, sigma=OPT["FILTER"])
if check_key(OPT, "LOG") and OPT["LOG"]:
if check_key(OPT, "CUT_THRESHOLD") and OPT["CUT_THRESHOLD"]:
raw = cut_threshold(raw, thrld)
popt, pcov = curve_fit(
logpurity,
raw_x[raw_max - buffer1 : raw_max + buffer2],
np.log(raw[raw_max - buffer1 : raw_max + buffer2]),
p0=initial,
)
else:
popt, pcov = curve_fit(
purity,
raw_x[raw_max - buffer1 : raw_max + buffer2],
raw[raw_max - buffer1 : raw_max + buffer2],
p0=initial,
)
perr = np.sqrt(np.diag(pcov))
return purity(raw_x, *popt), raw, popt, perr, labels
[docs]def simple_purity_fit(info, labels, raw, raw_x, fit_range, thrld=1e-6, i_param={}, OPT:Optional[dict]=None, save:bool=False, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
initial = []
labels = ["PED", "T0", "A0", "A1", "SIGMA", "QUENCH"]
for init in labels:
if init not in i_param:
if init == "PED":
initial.append(np.mean(raw[: raw_max - buffer1]))
if init == "T0":
initial.append(raw_x[raw_max])
else:
initial.append(i_param[init])
# if np.any(np.isnan(raw)) or np.any(np.isinf(raw)) or np.any(raw <= 0):
# rprint("[red]Negative/Infinite/Zero/Nan values found in raw[/red]", "ERROR")
if check_key(OPT, "FILTER") and isinstance(OPT["FILTER"], int):
rprint(
"Filtering the signal with a gaussian filter of sigma = %i" % OPT["FILTER"]
)
raw = gaussian_filter1d(raw, sigma=OPT["FILTER"])
if check_key(OPT, "LOG") and OPT["LOG"]:
if check_key(OPT, "CUT_THRESHOLD") and OPT["CUT_THRESHOLD"]:
raw = cut_threshold(raw, thrld)
popt, pcov = curve_fit(
logsimple_purity,
raw_x[raw_max - buffer1 : raw_max + buffer2],
np.log(raw[raw_max - buffer1 : raw_max + buffer2]),
p0=initial,
)
else:
popt, pcov = curve_fit(
simple_purity,
raw_x[raw_max - buffer1 : raw_max + buffer2],
raw[raw_max - buffer1 : raw_max + buffer2],
p0=initial,
)
perr = np.sqrt(np.diag(pcov))
return simple_purity(raw_x, *popt), raw, popt, perr, labels
[docs]def sc_fit(info, labels, raw, raw_x, fit_range, thrld=1e-6, OPT:Optional[dict]=None, save:bool=False, debug:bool=False):
run, ch, key = labels
# Prepare plot vis
next_plot = False
plt.rcParams["figure.figsize"] = [8, 8]
t0 = np.argmax(raw)
raw_x = np.arange(len(raw))
initial = (1500, 150, t0, 8, -700, 300)
# bounds = ([-200, 10, t0*0.1, 1, -1500, 10], [10000, 3000, t0*10, 20, 1500, 1000])
labels = ["AMP", "tau1", "T0", "sigma", "AMP2", "tau2"]
try:
popt, pcov = curve_fit(
scfunc,
raw_x[fit_range[0] : fit_range[1]],
raw[fit_range[0] : fit_range[1]],
p0=initial,
method="trf",
)
perr = np.sqrt(np.diag(pcov))
except:
rprint("[red]Fit did not succeed[/red]")
popt = initial
perr = np.zeros(len(initial))
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
plt.title("Fit with full wvf")
plt.plot(raw_x, raw, zorder=0, c="tab:blue", label="raw")
plt.plot(raw_x, scfunc(raw_x, *popt), "tab:orange", label="FIT")
plt.xlabel("Time in [s]")
plt.ylabel("ADC Counts")
if check_key(OPT, "LOGY") == True and OPT["LOGY"] == True:
plt.semilogy()
plt.ylim(thrld, raw[t0] * 1.1)
plt.legend()
if save:
save_figure(fig, f"{info['OUT_PATH'][0]}/images", run, ch, "TauFit", debug=debug)
while not plt.waitforbuttonpress(-1):
pass
plt.clf()
aux = scfunc(raw_x, *popt)
# rprint("\n")
return aux, raw, popt, perr, labels
[docs]def fit_wvfs(
my_runs,
info,
signal_type,
thrld,
fit_range=[0, 200],
i_param={},
in_key=["ADC"],
out_key="",
OPT:Optional[dict]=None,
save:bool=False,
debug:bool=False,
):
"""
\nDOC
"""
if OPT is None:
OPT = {}
fit_dict, ref_dict, popt_dict, perr_dict, label_dict = {}, {}, {}, {}, {}
i_param = get_initial_parameters(i_param)
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
plt.ion()
for run, ch, key in product(my_runs["NRun"], my_runs["NChannel"], in_key):
aux = dict()
ref = dict()
raw = my_runs[run][ch][key]
raw_x = my_runs[run][ch]["Sampling"] * np.arange(len(raw[0]))
for i in range(len(raw)):
raw_max = np.max(raw[i])
raw[i] = raw[i] / raw_max
if signal_type == "SiPM":
fit, new_raw, popt, perr, labels = sipm_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thrld, OPT, save, debug
)
elif signal_type == "SC":
fit, new_raw, popt, perr, labels = sc_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thrld, OPT, save, debug
)
elif signal_type == "Scint":
fit, new_raw, popt, perr, labels = scint_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thrld, i_param, OPT, save, debug
)
elif signal_type == "Purity":
fit, new_raw, popt, perr, labels = purity_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thrld, i_param, OPT, save, debug
)
elif signal_type == "SimplePurity":
fit, new_raw, popt, perr, labels = simple_purity_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thrld, i_param, OPT, save, debug
)
elif signal_type == "SimpleScint":
fit, new_raw, popt, perr, labels = simple_scint_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, i_param, OPT, save, debug
)
elif signal_type == "TauSlow":
fit, new_raw, popt, perr, labels = tau_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, i_param, OPT, save, debug
)
else:
rprint("[red]Fit type not recognized[/red]")
return
aux[i] = fit * raw_max
ref[i] = new_raw * raw_max
raw[i] = raw[i] * raw_max
i_idx, f_idx = find_amp_decrease(aux[i], 1e-3)
PE = np.sum(raw[i])
PE_std = np.std(raw[i][:i_idx])
folder_path = (
f'{root}/{info["OUT_PATH"][0]}/analysis/fits/run{run}/ch{ch}/'
)
if not os.path.exists(folder_path):
os.makedirs(name=folder_path, mode=0o777, exist_ok=True)
os.chmod(folder_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
term_output = ""
term_output = term_output + "Fitting wvf %s for run %s, ch %s\n" % (
i,
run,
ch,
)
term_output = term_output + "--------------------------------\n"
for i in range(len(labels)):
term_output = term_output + "%s:\t%.2E\t%.2E\n" % (
labels[i],
popt[i],
perr[i],
)
term_output = term_output + "--------------------------------\n"
if (
check_key(OPT, "TERMINAL_OUTPUT") == True
and OPT["TERMINAL_OUTPUT"] == True
):
rprint(term_output)
if save:
with open(f"{folder_path}/{signal_type}Fit_{run}_{ch}.txt", "w+") as f:
if signal_type == "Scint" or signal_type == "SimpleScint":
f.write("%s:\t%.2f\t%.2f\n" % ("PE", PE, PE_std))
for i in range(len(labels)):
f.write("%s:\t%.4E\t%.4E\n" % (labels[i], popt[i], perr[i]))
if debug:
rprint("File saved in: %s" % folder_path)
fit_dict[(run, ch, key)] = fit
ref_dict[(run, ch, key)] = ref
popt_dict[(run, ch, key)] = popt
perr_dict[(run, ch, key)] = perr
label_dict[(run, ch, key)] = labels
my_runs[run][ch]["Fit" + signal_type + out_key] = aux
my_runs[run][ch]["Ref" + signal_type + out_key] = ref
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
plt.ioff()
return fit_dict, ref_dict, popt_dict, perr_dict, label_dict
[docs]def get_initial_parameters(i_param):
"""
\nDOC
"""
# Define input parameters from dictionary
if check_key(i_param, "ped") == False:
i_param["ped"] = 1e-6
if check_key(i_param, "t0") == False:
i_param["t0"] = 1e-6
if check_key(i_param, "sigma") == False:
i_param["sigma"] = 1e-8
if check_key(i_param, "const") == False:
i_param["const"] = 1e-8
if check_key(i_param, "a_fast") == False:
i_param["a_fast"] = 1e-2
if check_key(i_param, "tau_fast") == False:
i_param["tau_fast"] = 1e-8
if check_key(i_param, "a_slow") == False:
i_param["a_slow"] = 1e-2
if check_key(i_param, "tau_slow") == False:
i_param["tau_slow"] = 1e-6
return i_param
[docs]def scint_profile(x, const, a_f, tau_f, tau_s):
return const * (
2 * a_f / tau_f * np.exp(-(x) / tau_f)
+ 2 * (1 - a_f) / tau_s * np.exp(-(x) / tau_s)
)
[docs]def log_scint_profile(x, const, a_f, tau_f, tau_s):
return np.log(scint_profile(x, const, a_f, tau_f, tau_s))
[docs]def tau_slow_profile(x, a_s, tau_s):
return 2 * a_s / tau_s * np.exp(-(x) / tau_s)
[docs]def log_tau_slow_profile(x, a_s, tau_s):
y = np.log(tau_slow_profile(x, a_s, tau_s))
# Replace infs and nans from the array by 0
y[np.isinf(y)] = 0
y[np.isnan(y)] = 0
return y
[docs]def simple_scint_fit(info, labels, raw, raw_x, fit_range, i_param={}, OPT:Optional[dict]=None, save:bool=False, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
next_plot = False
thrld = 1e-10
for i in range(len(raw)):
if raw[i] <= thrld:
raw[i] = thrld
if np.isnan(raw[i]):
raw[i] = thrld
# Define input parameters from dictionary
const = i_param["const"]
a_fast = i_param["a_fast"]
tau_fast = i_param["tau_fast"]
a_slow = i_param["a_slow"]
tau_slow = i_param["tau_slow"]
# Find peak and perform fit
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
# USING VALUES FROM FIRST FIT PERFORM SECONG FIT FOR THE SLOW COMPONENT
a1 = a_fast
a1_low = a1 * 0.9
a1_high = a1 * 1.1
tau1 = tau_fast
tau1_low = tau1 * 0.9
tau1_high = tau1 * 1.1
a2 = a_slow
a2_low = a_slow * 1e-2
a2_high = a_slow * 1e2
tau2 = tau_slow
tau2_low = tau_slow * 1e-2
tau2_high = tau_slow * 1e2
const_high = const * 100
const_low = const * 0.01
bounds2 = (
[const_low, a1_low, tau1_low, tau2_low],
[const_high, a1_high, tau1_high, tau2_high],
)
initial2 = (const, a1, tau1, tau2)
labels2 = ["CONST", "AMP1", "TAU1", "TAU2"]
# try:
# popt, pcov = curve_fit(scint_profile, raw_x[:buffer2] ,raw[raw_max:raw_max+buffer2],p0=initial2, bounds=bounds2)
popt, pcov = curve_fit(
log_scint_profile,
raw_x[:buffer2],
np.log(raw[raw_max : raw_max + buffer2]),
p0=initial2,
bounds=bounds2,
)
perr = np.sqrt(np.diag(pcov))
# except:
# rprint("Fit could not be performed")
# popt2 = initial2
# perr2 = np.zeros(len(popt2))
zeros_aux = np.zeros(raw_max)
zeros_aux2 = np.zeros(len(raw) - raw_max - buffer2)
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
# rprint("SHOW key not included in OPT")
# CHECK FIRST FIT
fig = plt.figure()
plt.rcParams["figure.figsize"] = [16, 8]
plt.subplot(1, 1, 1)
plt.title("First fit to determine peak")
plt.plot(raw_x, raw, label="raw", c="black")
plt.plot(
raw_x,
np.concatenate([zeros_aux, scint_profile(raw_x[:-raw_max], *popt)]),
label="FIT",
)
# plt.axvline(raw_x[-buffer2], ls = "--", c = "k")
plt.xlabel("Time in [s]")
plt.ylabel("ADC Counts")
if check_key(OPT, "LOGY") == True and OPT["LOGY"] == True:
plt.semilogy()
plt.legend()
if save:
save_figure(fig, f"{root}/{info['OUT_PATH'][0]}/images", run, ch, "TauFit", debug=debug)
while not plt.waitforbuttonpress(-1):
pass
plt.clf()
aux = np.concatenate([zeros_aux, scint_profile(raw_x[:buffer2], *popt), zeros_aux2])
return aux, raw, popt, perr, labels2
[docs]def tau_fit(info, labels, raw, raw_x, fit_range, i_param={}, OPT:Optional[dict]=None, save:bool=False, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
if OPT == None:
OPT = {}
next_plot = False
# Define input parameters from dictionary
a_slow = i_param["a_slow"]
tau_slow = i_param["tau_slow"]
# Find peak and perform fit
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
# USING VALUES FROM FIRST FIT PERFORM SECONG FIT FOR THE SLOW COMPONENT
a2 = a_slow
a2_low = a_slow * 1e-2
a2_high = a_slow * 1e2
tau2 = tau_slow
tau2_low = tau_slow * 1e-2
tau2_high = tau_slow * 1e2
bounds2 = ([a2_low, tau2_low], [a2_high, tau2_high])
labels2 = ["AMP2", "TAU2"]
initial2 = (a2, tau2)
# try:
# popt, pcov = curve_fit(scint_profile, raw_x[:buffer2] ,raw[raw_max:raw_max+buffer2],p0=initial2, bounds=bounds2)
y = np.log(raw[buffer1 : buffer2])
y[np.isinf(y)] = 0
y[np.isnan(y)] = 0
popt, pcov = curve_fit(
log_tau_slow_profile, raw_x[: buffer2 - buffer1], y, p0=initial2, bounds=bounds2
)
perr = np.sqrt(np.diag(pcov))
# except:
# rprint("Fit could not be performed")
# popt2 = initial2
# perr2 = np.zeros(len(popt2))
zeros_aux = np.zeros(buffer1)
zeros_aux2 = np.zeros(len(raw) - buffer2)
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
# rprint("SHOW key not included in OPT")
# CHECK FIRST FIT
fig = plt.figure()
plt.subplot(1, 1, 1)
plt.title("First fit to determine peak")
plt.plot(raw_x, raw, label="raw", c="black")
plt.plot(
raw_x,
np.concatenate(
[
zeros_aux,
tau_slow_profile(raw_x[: buffer2 - buffer1], *popt),
zeros_aux2,
]
),
label="TauFit",
)
# plt.axvline(raw_x[-buffer2], ls = "--", c = "k")
plt.xlabel("Time in [s]")
plt.ylabel("ADC Counts")
if check_key(OPT, "LOGY") == True and OPT["LOGY"] == True:
plt.semilogy()
plt.legend()
if save:
save_figure(fig, f"{root}/{info['OUT_PATH'][0]}/images", run, ch, f"{key}_TauFit", debug=debug)
while not plt.waitforbuttonpress(-1):
pass
plt.clf()
plt.close()
aux = np.concatenate(
[zeros_aux, tau_slow_profile(raw_x[: buffer2 - buffer1], *popt), zeros_aux2]
)
return aux, raw, popt, perr, labels2