# ================================================================================================================================================#
# This library contains functions to perform fits to the data. #
# ================================================================================================================================================#
from srcs.utils import get_project_root
import os, stat, math, scipy
from lmfit import models
from typing import Optional
from itertools import product
from scipy.signal import find_peaks
from scipy.optimize import curve_fit
import numpy as np
from scipy.special import erf
import matplotlib
matplotlib.use('Qt5Agg')
from matplotlib import pyplot as plt
from math import factorial as fact
from rich import print as rprint
from scipy.ndimage import gaussian_filter1d
# Imports from other libraries
from .io_functions import check_key, read_yaml_file, save_figure
from .ana_functions import find_amp_decrease
from .unit_functions import get_run_units
from .sty_functions import get_prism_colors
np.seterr(divide="ignore")
root = get_project_root()
colors = get_prism_colors()
# ===========================================================================#
# ************************** THEORETICAL FUNCTIONS **************************#
# ===========================================================================#
[docs]def chi_squared(x, y, popt):
fit_y = np.sum(
[gaussian(x, popt[j], popt[j + 1], popt[j + 2]) for j in range(0, len(popt), 3)]
)
return np.sum((y - fit_y) ** 2 / fit_y) / (y.size - len(popt))
[docs]def pure_scint(time, t0, a1, a2, tau1, tau2):
y = a1 / tau1 * np.exp(-(time - t0) / tau1) + a2 / tau2 * np.exp(
-(time - t0) / tau2
)
[docs]def gauss(x, a, x0, sigma):
return (
a
/ (sigma * math.sqrt(2 * math.pi))
* np.exp(-0.5 * np.power((x - x0) / sigma, 2))
)
[docs]def gaussian_train(x, *params):
y = np.zeros_like(x)
for i in range(0, len(params), 3):
center = params[i]
height = params[i + 1]
width = params[i + 2]
y = y + gaussian(x, center, height, width)
return y
[docs]def loggaussian_train(x, *params):
y = gaussian_train(x, *params)
y[y <= 0] = 1e-1
return np.log(y)
[docs]def gaussian(x, center, height, width):
return height * np.exp(-0.5 * ((x - center) / width) ** 2)
[docs]def test_gaussian(x, center, width):
return np.exp(-0.5 * ((x - center) / width) ** 2)
# def pmt_spe(x, height, center, width):
# return height * np.exp(-0.5*((x - center)/width)**2)
[docs]def loggaussian(x, center, height, width):
return np.log10(gaussian(x, center, height, width))
[docs]def func(t, t0, sigma, a, tau):
return (
(2 * a / tau)
* np.exp((sigma / (np.sqrt(2) * tau)) ** 2 - (np.array(t) - t0) / tau)
* (1 - erf((sigma**2 - tau * (np.array(t) - t0)) / (np.sqrt(2) * sigma * tau)))
)
[docs]def func2(t, p, t0, sigma, a1, tau1, sigma2, a2, tau2):
return p + func(t, t0, sigma, a1, tau1) + func(t, t0, sigma2, a2, tau2)
[docs]def logfunc2(t, p, t0, sigma1, a1, tau1, sigma2, a2, tau2):
return np.log(p + func(t, t0, sigma1, a1, tau1) + func(t, t0, sigma2, a2, tau2))
[docs]def logfunc3(t, p, t0, sigma, a1, tau1, a2, tau2, a3, tau3):
return np.log(
p
+ func(t, t0, sigma, a1, tau1)
+ func(t, t0, sigma, a2, tau2)
+ func(t, t0, sigma, a3, tau3)
)
[docs]def func3(t, p, t0, sigma, a1, tau1, a2, tau2, a3, tau3):
return (
p
+ func(t, t0, sigma, a1, tau1)
+ func(t, t0, sigma, a2, tau2)
+ func(t, t0, sigma, a3, tau3)
)
[docs]def scfunc(t, a, b, c, d, e, f):
return (
a * np.exp(-(t - c) / b) / np.power(2 * np.pi, 0.5) * np.exp(-(d**2) / (b**2))
) * (1 - erf(((c - t) / d + d / b) / np.power(2, 0.5))) + (
e * np.exp(-(t - c) / f) / np.power(2 * np.pi, 0.5) * np.exp(-(d**2) / (f**2))
) * (
1 - erf(((c - t) / d + d / f) / np.power(2, 0.5))
)
[docs]def dec_gauss(f, fc, n):
y = np.exp(-0.5 * (f / fc) ** n)
return y
[docs]def fit_dec_gauss(f, fc, n):
y = np.log10(dec_gauss(f, fc, n))
y[0] = 0
return y
[docs]def purity(t, p, t0, a1, a3, sigma, quenching):
tau1 = (1 / 7.1e-9 + quenching) ** -1
tau3 = (1 / 1.66e-6 + quenching) ** -1
a1_prime = a1 / (1 + tau1 * quenching)
a3_prime = a3 / (1 + tau3 * quenching)
return p + func(t, t0, sigma, a1_prime, tau1) + func(t, t0, sigma, a3_prime, tau3)
[docs]def simple_purity(t, p, t0, a0, a1, sigma, quenching):
tau1 = (1 / 7.1e-9 + quenching) ** -1
tau3 = (1 / 1.66e-6 + quenching) ** -1
a1_prime = a0 / (1 + tau1 * quenching)
a3_prime = (a0 * (1 - a1) / a1) / (1 + tau3 * quenching)
return p + func(t, t0, sigma, a1_prime, tau1) + func(t, t0, sigma, a3_prime, tau3)
[docs]def logpurity(t, p, t0, a1, a3, sigma, quenching):
return np.log(purity(t, p, t0, a1, a3, sigma, quenching))
[docs]def logsimple_purity(t, p, t0, a0, a1, sigma, quenching):
return np.log(simple_purity(t, p, t0, a0, a1, sigma, quenching))
[docs]def lmfit_models(function):
# if function == "gaussian": return models.GaussianModel()
if function == "linear":
return models.LinearModel()
if function == "lorentzian":
return models.LorentzianModel()
if function == "exponential":
return models.ExponentialModel()
if function == "powerlaw":
return models.PowerLawModel()
# Lmfit for initial parameters
# model = lmfit_models(function)
# params = model.guess(ydata, x=xdata)
# result = model.fit (ydata, params, x=xdata)
# rprint(f'Chi-square = {result.chisqr:.4f}, Reduced Chi-square = {result.redchi:.4f}')
# --------------------------------------------------------------------------- #
# THIS LIBRARY NEED MIMO PORQUE HAY COSAS REDUNDANTES QUE SE PUEDEN UNIFICAR
[docs]def fit_gaussians(x, y, *p0):
assert x.shape == y.shape, "Input arrays must have the same shape."
popt, pcov = curve_fit(gaussian_train, x, y, p0=p0[0])
fit_y = gaussian_train(x, *popt)
chi_squared = np.sum(
(y[abs(fit_y) > 0.1] - fit_y[abs(fit_y) > 0.1]) ** 2 / fit_y[abs(fit_y) > 0.1]
) / (y.size - len(popt))
return popt, fit_y, chi_squared
##Binomial+Poisson distribution
[docs]def B(i, k, debug=False):
"""
Factorial factor of F
"""
if (i == 0) & (k == 0):
return 1
if (i == 0) & (k > 0):
return 0
else:
return fact(k - 1) * fact(k) / (fact(i - 1) * fact(i) * fact(k - i))
[docs]def F(K, p, L, debug=False):
"""
Computes prob of the kth point in a convoluted poisson+binomial distribution.
L is the mean value of the poisson, p is the binomial coef, i.e. the crosstalk we want to compute
"""
aux_sum = 0
if debug:
rprint(K)
for i in range(K + 1):
aux_sum += B(i, K) * ((L * (1 - p)) ** i) * (p ** (K - i))
return np.exp(-L) * aux_sum / fact(K)
[docs]def PoissonPlusBinomial(x, N, p, L, debug=False):
# N = len(x)
N = int(N)
aux = np.zeros(shape=N)
for i in range(N):
if debug:
rprint(x, i, x[i])
aux[i] = F(int(x[i]), p, L)
return aux / sum(aux)
# ===========================================================================#
# *********************** FITTING FUNCTIONS *********************************#
# ===========================================================================#
[docs]def gaussian_fit(counts, bins, bars, thresh, fit_function="gaussian", custom_fit=[0]):
"""This function fits the histogram, to a gaussians.
:params counts: counts of the histogram
:type counts: np.array
:params bins: bins of the histogram
:type bins: np.array
:params bars: bars of the histogram
:type bars: np.array
:params thresh: threshold value (for height of peaks and valleys)
:type thresh: int
:params fit_function: function to fit to, defaults to "gaussian"
:type fit_function: str, optional
:params custom_fit: custom fit, defaults to [0]
:type custom_fit: list, optional
:return: x, popt, pcov, perr -- x values, fit parameters, covariance matrix and errors
:rtype: tuple
"""
#### PEAK FINDER PARAMETERS #### thresh = int(len(my_runs[run][ch][key])/1000), wdth = 10 and prom = 0.5 work well
wdth = 10
prom = 0.5
acc = 1000
## Create linear interpolation between bins to search peaks in these variables ##
if len(custom_fit) == 2:
mean = custom_fit[0]
sigma = custom_fit[1]
x = np.linspace(mean - sigma, mean + sigma, acc)
y_intrp = scipy.interpolate.interp1d(bins[:-1], counts)
y = y_intrp(x)
else:
x = np.linspace(bins[1], bins[-2], acc)
y_intrp = scipy.interpolate.interp1d(bins[:-1], counts)
y = y_intrp(x)
rprint("\n...Fitting to a gaussian...")
## Find indices of peaks ##
if fit_function == "gaussian":
peak_idx, _ = find_peaks(y, height=thresh, width=wdth, prominence=prom)
if fit_function == "loggaussian":
peak_idx, _ = find_peaks(
np.log10(y), height=np.log10(thresh), width=wdth, prominence=prom
)
if len(custom_fit) == 2:
rprint("\n--- Customized fit ---")
mean = float(custom_fit[0])
sigma = float(custom_fit[1])
best_peak_idx = peak_idx[np.abs(x[peak_idx] - mean).argmin()]
best_peak_idx1 = best_peak_idx + 50
x_gauss = x
y_gauss = y
rprint("Taking peak at: ", x[best_peak_idx])
else:
sigma = abs(wdth * (bins[0] - bins[1]))
best_peak_idx = peak_idx[0]
best_peak_idx1 = best_peak_idx + 50
x_space = np.linspace(
x[best_peak_idx] - sigma, x[best_peak_idx1] + sigma, acc
) # Array with values between the x_coord of 2 consecutives peaks
step = x_space[1] - x_space[0]
x_gauss = x_space - int(acc / 2) * step
x_gauss = x_gauss[x_gauss >= bins[0]]
y_gauss = y_intrp(x_gauss)
rprint("Taking peak at: ", x[best_peak_idx])
# try:
popt, pcov = curve_fit(
gaussian,
x_gauss,
y_gauss,
p0=[x[best_peak_idx1], y[best_peak_idx], sigma],
maxfev=5000,
)
# popt, pcov = curve_fit(gaussian,x_gauss,y_gauss,p0=[y[best_peak_idx],x[best_peak_idx1]],sigma = sigma, absolute_sigma=True, maxfev=5000)
perr = np.sqrt(np.diag(pcov))
chi2 = chi_squared(x_gauss, y_gauss, popt)
# except:
# rprint("WARNING: Peak could not be fitted")
return x, popt, pcov, perr
# return x, popt, pcov, perr, chi2 # UPLOAD WHEN POSSIBLE MERGING IN MAIN BRANCH; upgrade all the fit functions
[docs]def peak_valley_finder(x, y, params):
"""This function finds the alternating peaks and valleys of the histogram.
"""
dist = params["PEAK_DISTANCE"]
thresh = params["THRESHOLD"]
wdth = params["WIDTH"]
prom = params["PROMINENCE"]
acc = params["ACCURACY"]
max_y = np.max(y)
y_norm = y / max_y
# Initial peak and valley detection
peak_idx, _ = find_peaks(y_norm, height=thresh, width=wdth, prominence=prom, distance=dist)
valley_idx, _ = find_peaks(1 - y_norm, height=0, width=wdth, prominence=prom, distance=dist)
# Combine and sort
points = [(p, 'peak') for p in peak_idx] + [(v, 'valley') for v in valley_idx]
points.sort()
filtered_points = []
i = 0
while i < len(points):
group = [points[i]]
j = i + 1
# Group consecutive points of same type (peak or valley)
while j < len(points) and points[j][1] == points[i][1]:
group.append(points[j])
j += 1
if len(group) == 1:
filtered_points.append(group[0])
else:
# Looking for previous and next points of opposite types
prev_boundary = points[i - 1][0] if i > 0 else group[0][0] - dist
next_boundary = points[j][0] if j < len(points) else group[-1][0] + dist
# Keep the most centered point between the grouped points
center = (prev_boundary + next_boundary) / 2
best_point = min(group, key=lambda pt: abs(pt[0] - center))
filtered_points.append(best_point)
i = j
# Separate indices by type
final_peaks = np.array([i for i, t in filtered_points if t == 'peak'])
final_valleys = np.array([i for i, t in filtered_points if t == 'valley'])
rprint("Peaks found at: ", final_peaks)
rprint("Valleys found at: ", final_valleys)
rprint(
"[cyan]PeakFinder using parameters: dist = %i, thresh = %.2f, wdth = %i, prom = %.2f, acc = %i[/cyan]"
% (dist, thresh, wdth, prom, acc)
)
return final_peaks, final_valleys
[docs]def gaussian_train_fit(fig, x, y, y_intrp, peak_idx, valley_idx, params, debug=False):
"""This function fits the histogram, to a train of gaussians.
"""
initial = [] # Saving for input to the TRAIN FIT
if len(peak_idx) < len(valley_idx):
n_peaks = len(peak_idx) # Number of peaks found by find_peak
else:
n_peaks = len(valley_idx)
labels = [""] * (n_peaks - 1) + ["Initial fit"]
for i in range(n_peaks):
if i == 0:
x_gauss = np.linspace(
x[peak_idx[i]] - (x[valley_idx[i]] - x[peak_idx[i]]),
x[valley_idx[i]],
params["ACCURACY"],
) # Array with values between the x_coord of 2 consecutives peaks
else:
x_gauss = np.linspace(
x[valley_idx[i - 1]], x[valley_idx[i]], params["ACCURACY"]
)
x_gauss = x_gauss[x_gauss >= x[0]]
y_gauss = y_intrp(x_gauss)
# plt.plot(x_gauss,y_gauss,ls="--",alpha=0.9)
try:
# (y[peak_idx[i]]),x[peak_idx[i]],abs(wdth*(bins[0]-bins[1]))])
popt, pcov = curve_fit(
gaussian,
x_gauss,
y_gauss,
p0=[
x[peak_idx[i]],
y[peak_idx[i]],
abs(params["WIDTH"] * (x_gauss[0] - x_gauss[1])),
],
bounds=(
[x[peak_idx[i]] - params["WIDTH"], 0, 0],
[
x[peak_idx[i]] + params["WIDTH"],
np.inf,
10 * abs(params["WIDTH"] * (x_gauss[0] - x_gauss[1])),
],
),
)
perr = np.sqrt(np.diag(pcov))
## FITTED to gaussian(x, height, center, width) ##
initial.append(popt[0]) # CENTER
initial.append(popt[1]) # HEIGHT
initial.append(np.abs(popt[2])) # WIDTH
# fig.plot(
# x_gauss,
# gaussian(x_gauss, *popt),
# ls="--",
# c="black",
# alpha=0.5,
# label=labels[i],
# )
except:
continue
pcov = np.zeros((len(initial), len(initial)))
perr = np.zeros(len(initial))
popt = initial
## GAUSSIAN TRAIN FIT ## Taking as input parameters the individual gaussian fits with initial
try:
popt, pcov = curve_fit(
gaussian_train, x[: valley_idx[-1]], y[: valley_idx[-1]], p0=initial
)
perr = np.sqrt(np.diag(pcov))
except ValueError:
rprint("[red]Full fit could not be performed[/red]")
except RuntimeError:
rprint("[red]Full fit could not be performed[/red]")
return popt, pcov
[docs]def pmt_spe_fit(counts, bins, bars, thresh):
"""This function fits the histogram, to a train of gaussians
\n[es muy parecida a gaussian_train_fit; hay algunas cosas que las coge en log pero igual se pueden unificar]
\n[se le puede dedicar un poco mas de tiempo para tener un ajuste mas fino pero parece que funciona]
"""
## Threshold value (for height of peaks and valleys) ##
# thresh = int(len(my_runs[run][ch][key])/1000)
wdth = 10
prom = 0.5
acc = 1000
## Create linear interpolation between bins to search peaks in these variables ##
x = np.linspace(bins[1], bins[-2], acc)
y_intrp = scipy.interpolate.interp1d(bins[:-1], counts)
y = y_intrp(x)
## Find indices of peaks ##
peak_idx, _ = find_peaks(
np.log(y), height=np.log(thresh), width=wdth, prominence=prom
)
# peak_idx, _ = find_peaks(y, height = thresh, width = wdth, prominence = prom)
## Find indices of valleys (from inverting the signal) ##
valley_idx, _ = find_peaks(
-np.log(y),
height=[-np.max(np.log(counts)), -np.log(thresh)],
width=wdth,
prominence=prom,
)
# valley_idx, _ = find_peaks(-y, height = [-np.max(counts), -thresh], width = wdth)
n_peaks = 4 # Fit of ped+1pe+2pe
initial = [] # Saving for input to the TRAIN FIT
if len(peak_idx) - 1 < n_peaks:
n_peaks = len(peak_idx) - 1 # Number of peaks found by find_peak
for i in range(n_peaks):
x_space = np.linspace(
x[peak_idx[i]], x[peak_idx[i + 1]], acc
) # Array with values between the x_coord of 2 consecutives peaks
step = x_space[1] - x_space[0]
x_gauss = x_space - int(acc / 2) * step
x_gauss = x_gauss[x_gauss >= bins[0]]
y_gauss = y_intrp(x_gauss)
# plt.plot(x_gauss,y_gauss,ls="--",alpha=0.9)
try:
popt, pcov = curve_fit(
gaussian,
x_gauss,
y_gauss,
p0=[y[peak_idx[i]], x[peak_idx[i]], abs(wdth * (bins[0] - bins[1]))],
)
perr = np.sqrt(np.diag(pcov))
## FITTED to gaussian(x, height, center, width) ##
initial.append(popt[1]) # HEIGHT
initial.append(popt[0]) # CENTER
initial.append(np.abs(popt[2])) # WIDTH
# plt.plot(x_gauss,gaussian(x_gauss, *popt), ls = "--", c = "black", alpha = 0.5)
# plt.ylim((1e-2,1e5))
except:
initial.append(x[peak_idx[i]])
initial.append(y[peak_idx[i]])
initial.append(abs(wdth * (bins[0] - bins[1])))
rprint("[red]Peak %i could not be fitted[/red]" % i)
try:
# GAUSSIAN TRAIN FIT ## Taking as input parameters the individual gaussian fits with initial
# popt, pcov = curve_fit(loggaussian_train,x[:peak_idx[-1]],np.log10(y[:peak_idx[-1]]),p0=initial)
popt, pcov = curve_fit(
gaussian_train, x[: peak_idx[-1]], y[: peak_idx[-1]], p0=initial
)
perr = np.sqrt(np.diag(pcov))
except:
popt = initial
rprint("[red]Full fit could not be performed[/red]")
return x, y, peak_idx, valley_idx, popt, pcov, perr
[docs]def cut_threshold(raw, thld):
for i in range(len(raw)):
if raw[i] <= thld:
raw[i] = thld
if np.isnan(raw[i]):
raw[i] = thld
if np.isinf(raw[i]):
raw[i] = thld
return raw
[docs]def peak_fit(
fit_raw, raw_x, buffer, thld, sigma_fast=1e-9, a_fast=1, tau_fast=1e-8, OPT={}
):
"""This function fits the peak to a gaussian function, and returns the parameters
"""
raw_max = np.argmax(fit_raw)
if check_key(OPT, "CUT_THRESHOLD") == True and OPT["CUT_THRESHOLD"] == True:
fit_raw = cut_threshold(fit_raw, thld)
guess_t0 = raw_x[raw_max]
p = np.mean(fit_raw[: raw_max - buffer])
t0 = guess_t0
t0_low = guess_t0 * 0.02
t0_high = guess_t0 * 50
sigma = sigma_fast
sigma_low = sigma * 1e-2
sigma_high = sigma * 1e2
a1 = a_fast
a1_low = a_fast * 1e-2
a1_high = a_fast * 1e2
tau1 = tau_fast
tau1_low = 6e-9
tau1_high = tau1 * 1e2
bounds = (
[t0_low, sigma_low, a1_low, tau1_low],
[t0_high, sigma_high, a1_high, tau1_high],
)
initial = [t0, sigma, a1, tau1]
labels = ["TIME", "SIGM", "AMP1", "TAU1"]
popt, pcov = curve_fit(
func,
raw_x[raw_max - buffer : raw_max + int(buffer / 2)],
fit_raw[raw_max - buffer : raw_max + int(buffer / 2)],
p0=initial,
)
perr = np.sqrt(np.diag(pcov))
# PRINT FIRST FIT VALUE
if check_key(OPT, "TERMINAL_OUTPUT") == True and OPT["TERMINAL_OUTPUT"] == True:
rprint("\n--- FISRT FIT VALUES (FAST) ---")
for i in range(len(initial)):
rprint("%s:\t%.2E\t%.2E" % (labels[i], popt[i], perr[i]))
rprint("-------------------------------")
return popt, perr
[docs]def sipm_fit(info, labels, raw, raw_x, fit_range, thld=1e-6, OPT:Optional[dict]=None, save:bool=True, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
max_x = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
OPT["CUT_THRESHOLD"] = True
popt1, perr1 = peak_fit(raw, raw_x, buffer1, thld=thld, OPT=OPT)
p = np.mean(raw[: max_x - buffer1])
a1 = 2e-5
a1_low = 1e-8
a1_high = 9e-2
a2 = 2e-5
a2_low = 1e-8
a2_high = 9e-2
a3 = 2e-5
a3_low = 1e-8
a3_high = 9e-2
tau1 = 9e-8
tau1_low = 6e-9
tau1_high = 1e-7
tau2 = 9e-7
tau2_low = tau1_high
tau2_high = 1e-6
tau3 = 9e-6
tau3_low = tau2_high
tau3_high = 1e-5
sigma2 = popt1[1] * 10
sigma2_low = popt1[1]
sigma2_high = popt1[1] * 100
# USING VALUES FROM FIRST FIT PERFORM SECONG FIT FOR THE SLOW COMPONENT
bounds2 = (
[sigma2_low, a2_low, tau2_low, a3_low, tau3_low],
[sigma2_high, a2_high, tau2_high, a3_high, tau3_high],
)
initial2 = (sigma2, a2, tau2, a3, tau3)
labels2 = ["SIGM", "AMP2", "TAU2", "AMP3", "TAU3"]
popt, pcov = curve_fit(
lambda t, sigma2, a2, tau2, a3, tau3: logfunc3(
t, p, popt1[0], sigma2, popt1[2], popt1[3], a2, tau2, a3, tau3
),
raw_x[max_x - buffer1 : max_x + buffer2],
np.log(raw[max_x - buffer1 : max_x + buffer2]),
p0=initial2,
bounds=bounds2,
method="trf",
)
perr2 = np.sqrt(np.diag(pcov))
sigma2 = popt[0]
a2 = popt[1]
tau2 = popt[2]
a3 = popt[3]
tau3 = popt[4]
param = [p, a1, sigma2, tau1, popt1[3], a2, tau2, a3, tau3]
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(OPT, "SHOW") == False:
show_fit(raw, raw_x, func3, raw_max=max_x, buffer1=buffer1, buffer2=buffer2, param=param, OPT=OPT, save=save, debug=debug)
aux = func3(raw_x, *param)
return aux, raw, param, perr2, labels2
[docs]def scint_fit(info, labels, raw, raw_x, fit_range, thld=1e-6, i_param={}, OPT:Optional[dict]=None, save:bool=True, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
next_plot = False
OPT["CUT_THRESHOLD"] = True
# Define input parameters from dictionary
sigma = i_param["sigma"]
a_fast = i_param["a_fast"]
tau_fast = i_param["tau_fast"]
a_slow = i_param["a_slow"]
tau_slow = i_param["tau_slow"]
# Find peak and perform fit
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
popt1, perr1 = peak_fit(raw, raw_x, buffer1, thld, sigma, a_fast, tau_fast, OPT)
# USING VALUES FROM FIRST FIT PERFORM SECONG FIT FOR THE SLOW COMPONENT
p = np.mean(raw[: raw_max - buffer1])
p_std = np.std(raw[: raw_max - buffer1])
sigma1 = popt1[1]
sigma1_low = sigma1 * 0.9
sigma1_high = sigma1 * 1.1
a1 = popt1[2]
a1_low = a1 * 0.9
a1_high = a1 * 1.1
tau1 = popt1[3]
tau1_low = tau1 * 0.9
tau1_high = tau1 * 1.1
sigma2 = sigma
sigma2_low = sigma * 0.9
sigma2_high = sigma * 1.1
a2 = a_slow
a2_low = a_slow * 1e-2
a2_high = a_slow * 1e2
tau2 = tau_slow
tau2_low = tau_slow * 1e-2
tau2_high = tau_slow * 1e2
initial2 = (a1, sigma2, a2, tau2)
try:
popt2, pcov2 = curve_fit(
lambda t, a1, sigma2, a2, tau2: logfunc2(
t, p, popt1[0], popt1[1], a1, popt1[3], sigma2, a2, tau2
),
raw_x[raw_max - buffer1 : raw_max + buffer2],
np.log(raw[raw_max - buffer1 : raw_max + buffer2]),
p0=initial2,
)
perr2 = np.sqrt(np.diag(pcov2))
except:
rprint("[red]Fit could not be performed[/red]")
popt2 = initial2
perr2 = np.zeros(len(popt2))
t0 = popt1[0]
a1 = popt2[0]
sigma2 = popt2[1]
a2 = popt2[2]
tau2 = popt2[3]
labels = ["PED", "T0", "SIG1", "AMP1", "TAU1", "SIG2", "AMP2", "TAU2"]
param = [p, popt1[0], popt1[1], popt2[0], popt1[2], popt2[1], popt2[2], popt2[3]]
perr = [p_std, perr1[0], perr1[1], perr2[0], perr1[2], perr2[1], perr2[2], perr2[3]]
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(OPT, "SHOW") == False:
show_fit(info, raw, raw_x, func2, (run, ch, key), raw_max=raw_max, buffer1=buffer1, buffer2=buffer2, param=param, thld=thld, OPT=OPT, save=save, debug=debug)
aux = func2(raw_x, *param)
return aux, raw, param, perr, labels
[docs]def purity_fit(info, labels, raw, raw_x, fit_range, thld=1e-6, i_param={}, OPT:Optional[dict]=None, save:bool=True, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
initial = []
labels = ["PED", "T0", "A1", "A3", "SIGMA", "QUENCH"]
for init in labels:
if init not in i_param:
if init == "PED":
initial.append(np.mean(raw[: raw_max - buffer1]))
if init == "T0":
initial.append(raw_x[raw_max])
else:
initial.append(i_param[init])
# if np.any(np.isnan(raw)) or np.any(np.isinf(raw)) or np.any(raw <= 0):
# rprint("[red]Negative/Infinite/Zero/Nan values found in raw[/red]")
if check_key(OPT, "FILTER") and isinstance(OPT["FILTER"], int):
rprint(
"Filtering the signal with a gaussian filter of sigma = %i" % OPT["FILTER"]
)
raw = gaussian_filter1d(raw, sigma=OPT["FILTER"])
if check_key(OPT, "LOG") and OPT["LOG"]:
func = logpurity
if check_key(OPT, "CUT_THRESHOLD") and OPT["CUT_THRESHOLD"]:
raw = cut_threshold(raw, thld)
else:
func = purity
popt, pcov = curve_fit(
func,
raw_x[raw_max - buffer1 : raw_max + buffer2],
raw[raw_max - buffer1 : raw_max + buffer2],
p0=initial,
)
perr = np.sqrt(np.diag(pcov))
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(OPT, "SHOW") == False:
show_fit(info, raw, raw_x, purity, (run, ch, key), raw_max=raw_max, buffer1=buffer1, buffer2=buffer2, param=popt, OPT=OPT, save=save, debug=debug)
return purity(raw_x, *popt), raw, popt, perr, labels
[docs]def simple_purity_fit(info, labels, raw, raw_x, fit_range, thld=1e-6, i_param={}, OPT:Optional[dict]=None, save:bool=True, debug:bool=True):
"""
\nDOC
"""
run, ch, key = labels
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
initial = []
labels = ["PED", "T0", "A0", "A1", "SIGMA", "QUENCH"]
for init in labels:
if init not in i_param:
if init == "PED":
initial.append(np.mean(raw[: raw_max - buffer1]))
if init == "T0":
initial.append(raw_x[raw_max])
rprint("[red]Initial parameter %s not found in i_param, using default value[/red]" % init)
else:
initial.append(i_param[init])
if check_key(OPT, "CUT_THRESHOLD") and OPT["CUT_THRESHOLD"]:
rprint("Cutting threshold at %.2E" % thld)
ana = cut_threshold(raw, thld)
if check_key(OPT, "FILTER") and OPT["FILTER"]:
rprint(
"Filtering the signal with a gaussian filter of sigma = %i" % OPT["FILTER_SIGMA"]
)
ana = gaussian_filter1d(ana, sigma=OPT["FILTER_SIGMA"])
if check_key(OPT, "LOGY") and OPT["LOGY"]:
popt, pcov = curve_fit(
logsimple_purity,
raw_x[raw_max - buffer1 : raw_max + buffer2],
np.log(ana[raw_max - buffer1 : raw_max + buffer2]),
p0=initial,
)
else:
popt, pcov = curve_fit(
simple_purity,
raw_x[raw_max - buffer1 : raw_max + buffer2],
ana[raw_max - buffer1 : raw_max + buffer2],
p0=initial,
)
perr = np.sqrt(np.diag(pcov))
if "SHOW" in OPT and OPT["SHOW"]:
show_fit(info, raw, raw_x, simple_purity, (run, ch, key), raw_max=raw_max, buffer1=buffer1, buffer2=buffer2, thld=thld, param=popt, OPT=OPT, save=save, debug=debug)
return simple_purity(raw_x, *popt), raw, popt, perr, labels
[docs]def sc_fit(info, labels, raw, raw_x, fit_range, thld=1e-6, OPT:Optional[dict]=None, save:bool=True, debug:bool=False):
run, ch, key = labels
# Prepare plot vis
next_plot = False
plt.rcParams["figure.figsize"] = [8, 8]
t0 = np.argmax(raw)
raw_x = np.arange(len(raw))
initial = (1500, 150, t0, 8, -700, 300)
# bounds = ([-200, 10, t0*0.1, 1, -1500, 10], [10000, 3000, t0*10, 20, 1500, 1000])
labels = ["AMP", "tau1", "T0", "sigma", "AMP2", "tau2"]
try:
popt, pcov = curve_fit(
scfunc,
raw_x[fit_range[0] : fit_range[1]],
raw[fit_range[0] : fit_range[1]],
p0=initial,
method="trf",
)
perr = np.sqrt(np.diag(pcov))
except:
rprint("[red]Fit did not succeed[/red]")
popt = initial
perr = np.zeros(len(initial))
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
show_fit(
info,
raw,
raw_x,
scfunc,
(run, ch, key),
raw_max=t0,
buffer1=fit_range[0],
buffer2=fit_range[1],
param=popt,
thld=thld,
OPT=OPT,
save=save,
debug=debug,
)
aux = scfunc(raw_x, *popt)
# rprint("\n")
return aux, raw, popt, perr, labels
[docs]def fit_wvfs(
my_runs,
info,
signal_type,
thld,
fit_range=[0, 200],
i_param={},
in_key=["ADC"],
out_key="",
OPT:Optional[dict]=None,
save:bool=True,
debug:bool=True,
):
"""
\nDOC
"""
if OPT is None:
OPT = {}
fit_dict, ref_dict, popt_dict, perr_dict, label_dict = {}, {}, {}, {}, {}
i_param = get_initial_parameters(i_param)
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
plt.ion()
for run, ch, key in product(my_runs["NRun"], my_runs["NChannel"], in_key):
aux = dict()
ref = dict()
raw = my_runs[run][ch][key]
raw_x = my_runs[run][ch]["Sampling"] * np.arange(len(raw[0]))
for i in range(len(raw)):
raw_max = np.max(raw[i])
raw[i] = raw[i] / raw_max
if signal_type == "SiPM":
fit, new_raw, popt, perr, labels = sipm_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thld, OPT, save=save, debug=debug
)
elif signal_type == "SC":
fit, new_raw, popt, perr, labels = sc_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thld, OPT, save=save, debug=debug
)
elif signal_type == "Scint":
fit, new_raw, popt, perr, labels = scint_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thld, i_param, OPT, save=save, debug=debug
)
elif signal_type == "Purity":
fit, new_raw, popt, perr, labels = purity_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thld, i_param, OPT, save=save, debug=debug
)
elif signal_type == "Quenching":
fit, new_raw, popt, perr, labels = simple_purity_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, thld, i_param, OPT, save=save, debug=debug
)
elif signal_type == "SimpleScint":
fit, new_raw, popt, perr, labels = simple_scint_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, i_param, OPT, save=save, debug=debug
)
elif signal_type == "TauSlow":
fit, new_raw, popt, perr, labels = tau_fit(
info, (run, ch, key), raw[i], raw_x, fit_range, i_param, OPT, save=save, debug=debug
)
else:
rprint("[red]Fit type not recognized[/red]")
return
aux[i] = fit * raw_max
ref[i] = new_raw * raw_max
raw[i] = raw[i] * raw_max
i_idx, f_idx = find_amp_decrease(aux[i], 1e-3)
PE = np.sum(raw[i])
PE_std = np.std(raw[i][:i_idx])
folder_path = (
f'{root}/{info["OUT_PATH"][0]}/analysis/fits/run{run}/ch{ch}/'
)
if not os.path.exists(folder_path):
os.makedirs(name=folder_path, mode=0o777, exist_ok=True)
os.chmod(folder_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
term_output = ""
term_output = term_output + "Fitting wvf %s for run %s, ch %s\n" % (
i,
run,
ch,
)
term_output = term_output + "--------------------------------\n"
for i in range(len(labels)):
term_output = term_output + "%s:\t%.2E\t%.2E\n" % (
labels[i],
popt[i],
perr[i],
)
term_output = term_output + "--------------------------------\n"
if (
check_key(OPT, "TERMINAL_OUTPUT") == True
and OPT["TERMINAL_OUTPUT"] == True
):
rprint(term_output)
if save:
with open(f"{folder_path}run{run}_ch{ch}_{(signal_type).lower()}_fit.txt", "w+") as f:
if signal_type == "Scint" or signal_type == "SimpleScint":
f.write("%s:\t%.2f\t%.2f\n" % ("PE", PE, PE_std))
for i in range(len(labels)):
f.write("%s:\t%.4E\t%.4E\n" % (labels[i], popt[i], perr[i]))
if debug:
rprint(f"File saved as: {folder_path}run{run}_ch{ch}_{(signal_type).lower()}_fit.txt")
fit_dict[(run, ch, key)] = fit
ref_dict[(run, ch, key)] = ref
popt_dict[(run, ch, key)] = popt
perr_dict[(run, ch, key)] = perr
label_dict[(run, ch, key)] = labels
my_runs[run][ch]["Fit" + signal_type + out_key] = aux
my_runs[run][ch]["Ref" + signal_type + out_key] = ref
if check_key(OPT, "SHOW") and OPT["SHOW"]:
plt.ioff()
return fit_dict, ref_dict, popt_dict, perr_dict, label_dict
[docs]def show_fit(info, raw, raw_x, func, labels, raw_max, buffer1, buffer2, param, thld=1e-6, OPT:Optional[dict]=None, save:bool=True, debug:bool=True):
"""
\nDOC
This function shows the fit of the waveform and saves the figure if required.
\nParameters
----------
info : dict
Dictionary containing information about the run and channel.
labels : tuple
Tuple containing run, channel, and key information.
raw : np.ndarray
The raw waveform data to be fitted.
raw_x : np.ndarray
The x-axis values corresponding to the raw waveform data.
func : function
The fitting function to be used.
raw_max : int
The index of the maximum value in the raw waveform.
buffer1 : int
The number of points before the maximum to include in the fit.
buffer2 : int
The number of points after the maximum to include in the fit.
param : list
The parameters obtained from the fit.
thld : float, optional
Threshold value for the waveform, default is 1e-6.
OPT : dict, optional
Dictionary containing options for the fit, such as filtering and logging.
save : bool, optional
Whether to save the figure, default is False.
debug : bool, optional
Whether to print debug information, default is False.
\nReturns
-------
None
"""
run, ch, key = labels
fig = plt.figure()
plt.title(f"{key} - {OPT['SCINT_FIT'] if 'SCINT_FIT' in OPT else 'default'} - run {run} ch {ch}")
plt.plot(raw_x, raw, zorder=0, label="raw", c=colors[int(ch)-2])
if check_key(OPT, "CUT_THRESHOLD") and OPT["CUT_THRESHOLD"]:
ana = cut_threshold(raw, thld)
plt.axhline(thld, ls=":", c="k")
else:
ana = raw.copy()
if check_key(OPT, "FILTER") and OPT["FILTER"]:
plt.plot(raw_x, gaussian_filter1d(ana, sigma=OPT["FILTER_SIGMA"]), zorder=1, label="ana", c=colors[int(ch)-1])
plt.plot(raw_x[raw_max-buffer1:raw_max+buffer2], func(raw_x[raw_max-buffer1:raw_max+buffer2], *param), ls = "--", c=colors[6], label="fit")
plt.plot(raw_x[:raw_max-buffer1], func(raw_x[:raw_max-buffer1], *param), ls = ":", c=colors[6])
plt.plot(raw_x[raw_max+buffer2:], func(raw_x[raw_max+buffer2:], *param), ls = ":", c=colors[6])
plt.xlabel("Time in [s]"); plt.ylabel("ADC Counts")
plt.axvline(raw_x[raw_max-buffer1], ls = ":", c = "k", label = "limits")
plt.axvline(raw_x[raw_max+buffer2], ls = ":", c = "k")
if check_key(OPT, "LOGY") == True and OPT["LOGY"] == True:
plt.semilogy()
plt.ylim(thld, raw[raw_max]*2)
plt.legend()
# if save:
save_figure(fig, f'{root}/{info["OUT_PATH"][0]}/images/', run, ch, f'{key}_Fit', debug=debug)
# if debug:
# rprint(
# f"Saving plot in {root}/{info['OUT_PATH'][0]}/images/run{run}/ch{ch}/run{run}_ch{ch}_{key}_{OPT['SCINT_FIT']}_Fit.png"
# )
while not plt.waitforbuttonpress(-1):
pass
plt.clf()
plt.close(fig)
[docs]def get_initial_parameters(params):
"""
\nDOC
This function checks the input parameters dictionary and sets default values
for the scintillation fit parameters if they are not provided.
\nParameters
----------
params : dict
A dictionary containing input parameters for the fit.
\nReturns
-------
params : dict
The input parameters dictionary with default values set for missing keys.
"""
# Define input parameters from dictionary
if check_key(params, "ped") == False:
params["ped"] = 1e-6
if check_key(params, "t0") == False:
params["t0"] = 1e-6
if check_key(params, "sigma") == False:
params["sigma"] = 1e-8
if check_key(params, "const") == False:
params["const"] = 1e-8
if check_key(params, "a_fast") == False:
params["a_fast"] = 1e-2
if check_key(params, "tau_fast") == False:
params["tau_fast"] = 1e-8
if check_key(params, "a_slow") == False:
params["a_slow"] = 1e-2
if check_key(params, "tau_slow") == False:
params["tau_slow"] = 1e-6
return params
[docs]def scint_profile(x, const, a_f, tau_f, tau_s):
return const * (
2 * a_f / tau_f * np.exp(-(x) / tau_f)
+ 2 * (1 - a_f) / tau_s * np.exp(-(x) / tau_s)
)
[docs]def log_scint_profile(x, const, a_f, tau_f, tau_s):
return np.log(scint_profile(x, const, a_f, tau_f, tau_s))
[docs]def tau_slow_profile(x, a_s, tau_s):
return 2 * a_s / tau_s * np.exp(-(x) / tau_s)
[docs]def log_tau_slow_profile(x, a_s, tau_s):
y = np.log(tau_slow_profile(x, a_s, tau_s))
# Replace infs and nans from the array by 0
y[np.isinf(y)] = 0
y[np.isnan(y)] = 0
return y
[docs]def simple_scint_fit(info, labels, raw, raw_x, fit_range, i_param={}, OPT:Optional[dict]=None, save:bool=True, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
next_plot = False
thld = 1e-10
for i in range(len(raw)):
if raw[i] <= thld:
raw[i] = thld
if np.isnan(raw[i]):
raw[i] = thld
# Define input parameters from dictionary
const = i_param["const"]
a_fast = i_param["a_fast"]
tau_fast = i_param["tau_fast"]
a_slow = i_param["a_slow"]
tau_slow = i_param["tau_slow"]
# Find peak and perform fit
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
# USING VALUES FROM FIRST FIT PERFORM SECONG FIT FOR THE SLOW COMPONENT
a1 = a_fast
a1_low = a1 * 0.9
a1_high = a1 * 1.1
tau1 = tau_fast
tau1_low = tau1 * 0.9
tau1_high = tau1 * 1.1
a2 = a_slow
a2_low = a_slow * 1e-2
a2_high = a_slow * 1e2
tau2 = tau_slow
tau2_low = tau_slow * 1e-2
tau2_high = tau_slow * 1e2
const_high = const * 100
const_low = const * 0.01
bounds2 = (
[const_low, a1_low, tau1_low, tau2_low],
[const_high, a1_high, tau1_high, tau2_high],
)
initial2 = (const, a1, tau1, tau2)
labels2 = ["CONST", "AMP1", "TAU1", "TAU2"]
# try:
# popt, pcov = curve_fit(scint_profile, raw_x[:buffer2] ,raw[raw_max:raw_max+buffer2],p0=initial2, bounds=bounds2)
popt, pcov = curve_fit(
log_scint_profile,
raw_x[:buffer2],
np.log(raw[raw_max : raw_max + buffer2]),
p0=initial2,
bounds=bounds2,
)
perr = np.sqrt(np.diag(pcov))
zeros_aux = np.zeros(raw_max)
zeros_aux2 = np.zeros(len(raw) - raw_max - buffer2)
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
show_fit(raw, raw_x, scint_profile, raw_max=raw_max, buffer1=buffer1, buffer2=buffer2, param=popt, OPT=OPT, save=save, debug=debug)
aux = np.concatenate([zeros_aux, scint_profile(raw_x[:buffer2], *popt), zeros_aux2])
return aux, raw, popt, perr, labels2
[docs]def tau_fit(info, labels, raw, raw_x, fit_range, i_param={}, OPT:Optional[dict]=None, save:bool=True, debug:bool=False):
"""
\nDOC
"""
run, ch, key = labels
if OPT == None:
OPT = {}
next_plot = False
# Define input parameters from dictionary
a_slow = i_param["a_slow"]
tau_slow = i_param["tau_slow"]
# Find peak and perform fit
raw_max = np.argmax(raw)
buffer1 = fit_range[0]
buffer2 = fit_range[1]
# USING VALUES FROM FIRST FIT PERFORM SECONG FIT FOR THE SLOW COMPONENT
a2 = a_slow
a2_low = a_slow * 1e-2
a2_high = a_slow * 1e2
tau2 = tau_slow
tau2_low = tau_slow * 1e-2
tau2_high = tau_slow * 1e2
bounds2 = ([a2_low, tau2_low], [a2_high, tau2_high])
labels2 = ["AMP2", "TAU2"]
initial2 = (a2, tau2)
# try:
# popt, pcov = curve_fit(scint_profile, raw_x[:buffer2] ,raw[raw_max:raw_max+buffer2],p0=initial2, bounds=bounds2)
y = np.log(raw[buffer1 : buffer2])
y[np.isinf(y)] = 0
y[np.isnan(y)] = 0
popt, pcov = curve_fit(
log_tau_slow_profile, raw_x[: buffer2 - buffer1], y, p0=initial2, bounds=bounds2
)
perr = np.sqrt(np.diag(pcov))
# except:
# rprint("Fit could not be performed")
# popt2 = initial2
# perr2 = np.zeros(len(popt2))
zeros_aux = np.zeros(buffer1)
zeros_aux2 = np.zeros(len(raw) - buffer2)
if (check_key(OPT, "SHOW") == True and OPT["SHOW"] == True) or check_key(
OPT, "SHOW"
) == False:
# rprint("SHOW key not included in OPT")
# CHECK FIRST FIT
tau_slow_fit = tau_slow_profile(raw_x[: buffer2 - buffer1], *popt)
fig = plt.figure()
plt.subplot(1, 1, 1)
plt.title("%s - Tau fit - run %s" %(key, run))
if check_key(OPT, "MICRO_SEC") == True and OPT["MICRO_SEC"] == True:
plt.xlabel(r"Time [$\mu$s]")
raw_x = raw_x* 1e6
else:
plt.xlabel("Time in [s]")
plt.ylabel("ADC Counts")
plt.plot(raw_x, raw, label="Raw - ch%s" %(ch), c=get_prism_colors()[int(ch)])
plt.plot(
raw_x,
np.concatenate(
[
zeros_aux,
tau_slow_fit,
zeros_aux2,
]
),
label="TauFit",
c="red"
)
# plt.axvline(raw_x[-buffer2], ls = "--", c = "k")
if check_key(OPT, "LOGY") == True and OPT["LOGY"] == True:
plt.semilogy()
plt.legend()
if save:
save_figure(fig, f"{root}/{info['OUT_PATH'][0]}/images", run, ch, f"{key}_TauFit", debug=debug)
while not plt.waitforbuttonpress(-1):
pass
plt.clf()
plt.close()
aux = np.concatenate(
[zeros_aux, tau_slow_profile(raw_x[: buffer2 - buffer1], *popt), zeros_aux2]
)
return aux, raw, popt, perr, labels2