383 lines
14 KiB
Executable File
383 lines
14 KiB
Executable File
import warnings
from math import pi
import numpy as np
import tensorflow as tf
import zfit
from zfit import z
import yaml
import sympy
from b2kstll.models.Kpi import KpiPWavePDF, KpiSWavePDF, ReKpiPandSWavePDF, ImKpiPandSWavePDF
from b2kstll.models.acceptance import AccPDF
from b2kstll.utils import get_coeffs
import analysis.efficiency
from analysis.efficiency import load_efficiency_model
from particle import Particle
from b2kstll.models.angular_sympy import angular_func_sympy
class AngularPDF(zfit.pdf.BasePDF):
""" Generic angular PDF built from input function and corresponding parameters
ArbitraryFNC (function: Any arbitrary angular function
params (params): parameters of interest in the given function
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
def __init__(self, obs, params, func, name):
super().__init__(obs=obs, params=params, name=name+"PDF")
self._user_func = func
_N_OBS = 3
VALID_LIMITS = {'costheta_l': (-1, 1),
'costheta_k': (-1, 1),
'phi': (-pi, pi)}
def _pdf(self, x, norm_range):
costheta_l, costheta_k, phi = z.unstack_x(x)
pdf = self._user_func(**self.params, costheta_l=costheta_l, costheta_k=costheta_k, phi=phi)
return pdf
# Calculate Normalisation
def get_normalisation(FL, upper_ctk):
kwargs = dict(
FL = sympy.Symbol('FL'),
AFB = sympy.Symbol('AFB'),
S3 = sympy.Symbol('S3'),
S4 = sympy.Symbol('S4'),
S5 = sympy.Symbol('S5'),
S7 = sympy.Symbol('S7'),
S8 = sympy.Symbol('S8'),
S9 = sympy.Symbol('S9'),
costheta_l = sympy.Symbol('costheta_l'),
costheta_k = sympy.Symbol('costheta_k'),
phi = sympy.Symbol('phi')
sympy_val = angular_func_sympy(**kwargs)
integral = sympy.integrate(sympy_val,
(kwargs['costheta_l'], -1, 1),
(kwargs['costheta_k'], -1, upper_ctk),
(kwargs['phi'], -sympy.pi, sympy.pi),)
fun = sympy.lambdify((kwargs["FL"]),integral, 'tensorflow')
return fun(FL)
UPPER_CTK = 0.85
def PWave(FL, S3, S4, S5, AFB, S7, S8, S9, costheta_l, costheta_k, phi, backend=tf):
"""Full d4Gamma/dq2dOmega for Bd -> Kst ll (l=e,mu)
Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104)
i.e. the valid of the angles is given for
- phi: [-pi, pi]
- theta_K: [-pi, pi]
- theta_l: [-pi, pi]
The function is normalized over a finite range and therefore a PDF.
FL (`zfit.Parameter`): Fraction of longitudinal polarisation of the Kst
S3 (`zfit.Parameter`): A_perp^2 - A_para^2 / A_zero^2 + A_para^2 + A_perp^2 (L, R)
S4 (`zfit.Parameter`): RE(A_zero*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
S5 (`zfit.Parameter`): RE(A_zero*^2 * A_perp^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
AFB (`zfit.Parameter`): Forward-backward asymmetry of the di-lepton system (also i.e. 3/4 * S6s)
S7 (`zfit.Parameter`): IM(A_zero*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
S8 (`zfit.Parameter`): IM(A_zero*^2 * A_perp^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
S9 (`zfit.Parameter`): IM(A_perp*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
sintheta_k = backend.sqrt(1.0 - costheta_k * costheta_k)
sintheta_l = backend.sqrt(1.0 - costheta_l * costheta_l)
sintheta_2k = (1.0 - costheta_k * costheta_k)
sintheta_2l = (1.0 - costheta_l * costheta_l)
sin2theta_k = (2.0 * sintheta_k * costheta_k)
cos2theta_l = (2.0 * costheta_l * costheta_l - 1.0)
sin2theta_l = (2.0 * sintheta_l * costheta_l)
pdf = ((3.0 / 4.0) * (1.0 - FL) * sintheta_2k +
FL * costheta_k * costheta_k +
(1.0 / 4.0) * (1.0 - FL) * sintheta_2k * cos2theta_l +
-1.0 * FL * costheta_k * costheta_k * cos2theta_l +
S3 * sintheta_2k * sintheta_2l * backend.cos(2.0 * phi) +
S4 * sin2theta_k * sin2theta_l * backend.cos(phi) +
S5 * sin2theta_k * sintheta_l * backend.cos(phi) +
(4.0 / 3.0) * AFB * sintheta_2k * costheta_l +
S7 * sin2theta_k * sintheta_l * backend.sin(phi) +
S8 * sin2theta_k * sin2theta_l * backend.sin(phi) +
S9 * sintheta_2k * sintheta_2l * backend.sin(2.0 * phi))
#return (9.0 / (32.0 * pi)) * pdf
return pdf / get_normalisation(FL = FL, upper_ctk = UPPER_CTK)
def SWave(costheta_l, costheta_k, phi, backend=tf):
"""Full d4Gamma/dq2dOmega for Swave part only
Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104)
i.e. the valid of the angles is given for
- phi: [-pi, pi]
- theta_K: [-pi, pi]
- theta_l: [-pi, pi]
The function is normalized over a finite range and therefore a PDF.
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
sintheta_2l = (1.0 - costheta_l * costheta_l)
pdf = (3.0 / (16.0 * pi)) * sintheta_2l
return pdf
def PandSWaveInterference(Ss1, Ss2, Ss3, Ss4, Ss5, costheta_l, costheta_k, phi, backend=tf):
"""Full d4Gamma/dq2dOmega for the Real interference term between P and S waves
Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104)
i.e. the valid of the angles is given for
- phi: [-pi, pi]
- theta_K: [-pi, pi]
- theta_l: [-pi, pi]
The function is normalized over a finite range and therefore a PDF.
Ss1, Ss2, Ss3, Ss4, Ss5 (`zfit.Parameter`)
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
sintheta_l = backend.sqrt(1.0 - costheta_l * costheta_l)
sintheta_k = backend.sqrt(1.0 - costheta_k * costheta_k)
sintheta_2l = (1.0 - costheta_l * costheta_l)
sin2theta_l = (2.0 * sintheta_l * costheta_l)
pdf = (Ss1 * sintheta_2l * costheta_k +
Ss2 * sin2theta_l * sintheta_k * backend.cos(phi) +
Ss3 * sintheta_l * sintheta_k * backend.cos(phi) +
Ss4 * sintheta_l * sintheta_k * backend.sin(phi) +
Ss5 * sin2theta_l * sintheta_k * backend.sin(phi))
return (3.0 / (16.0 * pi)) * pdf
class AngularPandSWavePDF(zfit.pdf.BaseFunctor):
"""Full d4Gamma/dq2dOmega for both spin 0 and 1 for Bd -> Kst ll (l=e,mu) decays
Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104)
i.e. the valid of the angles is given for
- phi: [-pi, pi]
- theta_K: [0, pi]
- theta_l: [0, pi]
The function is normalized over a finite range and therefore a PDF.
AngularPWavePDF (`zfit.pdf.ZPDF`: P-wave normalised PDF
AngularSWavePDF (`zfit.pdf.ZPDF`: S-wave interference part normalised PDF
FS (`zfit.Parameter`): S-wave fraction, i.e. |A^L_S|^2 + |A^R_S|^2/ Sum(|A^{S,R}_{S,zero,para,perp}|^2)
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
def __init__(self, PWave, SWave, PandSWaveInterference, FS, obs):
super().__init__(pdfs=[PWave, SWave, PandSWaveInterference], name="AngularPandSWavePDF", params={'FS': FS}, obs=obs)
_N_OBS = 3
VALID_LIMITS = {'costheta_l': (-1, 1),
'costheta_k': (-1, 1),
'phi': (-pi, pi)}
def _pdf(self, x, norm_range):
FS = self.params['FS']
PWave, SWave, PandSWaveInterference = self.pdfs
pdf = (1 - FS) * PWave.pdf(x) + FS * SWave.pdf(x) + PandSWaveInterference.pdf(x)
return pdf
# A bit of handling
class B2Kstll:
"""Factory class for B->K*ll angular fit"""
PDFS = {'PWave': (PWave, ['FL', 'S3', 'S4', 'S5', 'AFB', 'S7', 'S8', 'S9']),
'SWave': (SWave, []),
'PandSWaveInterference' : (PandSWaveInterference, ['Ss1', 'Ss2', 'Ss3', 'Ss4', 'Ss5']),
#'PandSWaveInterference5D': (PandSWaveInterference5D, ['Ss1re', 'Ss2re', 'Ss3re', 'Ss4re', 'Ss5re',
# 'Ss1im', 'Ss2im', 'Ss3im', 'Ss4im', 'Ss5im']),
'PandSWave' : (AngularPandSWavePDF, ['FS'])}
#'PandSWave5D': (AngularPandSWavePDF5D, ['FS'])}
KPI = {'PWave': (KpiPWavePDF, ['Kst_892'], [313]),
'SWave': (KpiSWavePDF, ['Kst_1430'], [10311]),
'RePandSWave': (ReKpiPandSWavePDF, ['Kst_892', 'Kst_1430'], [313, 10311]),
'ImPandSWave': (ImKpiPandSWavePDF, ['Kst_892', 'Kst_1430'], [313, 10311])}
def __init__(self, cosThetaL, cosThetaK, Phi, q2_min=None, q2_max=None):
"""Initialize observable names."""
self._obs_names = {'costheta_l': cosThetaL,
'costheta_k': cosThetaK,
'phi': Phi}
costheta_l = zfit.Space(cosThetaL, limits=(-1.0, 1.0))
costheta_k = zfit.Space(cosThetaK, limits=(-1.0, UPPER_CTK))
phi = zfit.Space(Phi, limits=(-pi, pi))
self._spaces = costheta_l * costheta_k * phi
self.params = {}
self.acc = None
self.acc_integral = None
self.Kpi = None
self.q2_bin_acc = None
self.q2_min = q2_min
self.q2_max = q2_max
def set_q2_bin_acceptance(self, q2_bin_acc):
"""Set the q2 value used to slice the acceptance function for parametrization in 4D"""
self.q2_bin_acc = q2_bin_acc
def set_acc(self, file_name, q2_bin_acc=None):
"""Set an acceptance function to the signal PDF."""
# Load coefficients from MoM
#coeffs_yaml = load_efficiency_model(file_name)
#coeffs = coeffs_yaml.get_coefficients()
self.acc,self.accmodel = get_coeffs(file_name)
def set_acc_integral(self, file_name, str_order):
"""Set the integral of the PDF x acceptance."""
# Load integral from a yaml file
with open(file_name, 'r') as f:
dict_int = yaml.load(f, Loader=yaml.FullLoader)
integral = dict_int[0][str_order]
self.acc_integral = sympy.sympify(integral)
def set_Kpi(self, name):
""" Set the Kpi invariant mass system."""
#self._has_Kpi = True
self.Kpi = name
def get_pdf(self, name):
"""Get corresponding PDF and paramaters."""
fnc_class, param_names = self.PDFS[name]
def get_params(param_list):
out = {}
for param in param_list:
if param not in self.params:
if param == 'FL':
config = [0.6, 0, 1]
elif param == 'FS':
config = [0.1, 0, 1]
config = [0.0, -2, 2]
self.params.update({param: zfit.Parameter(param, *config)})
out[param] = self.params[param]
return out
obs = self._spaces
def get_Kpi_pdf(name, model):
"""Set the corresponding PDF for the Kpi system and parameters."""
pdf_class, param_names, param_id = self.KPI[model]
def get_res(param_list, param_id):
out = {}
for param, idparam in zip(param_names, param_id):
if not any([i for i in self.params if param in i.lower()]):
m = Particle.from_pdgid(idparam).mass * 1e-03
w = Particle.from_pdgid(idparam).width * 1e-03
self.params.update({param+"_M": zfit.Parameter(param+"_M", m, floating=False)})
self.params.update({param+"_W": zfit.Parameter(param+"_W", w, floating=False)})
out[param+"_M"] = self.params[param+"_M"]
out[param+"_W"] = self.params[param+"_W"]
return out
# Add Kpi dimension
mKpi = zfit.Space(name, limits=(0.79555, 0.99555))
self._spaces *= mKpi
params = get_res(param_names, param_id)
pdf = pdf_class(obs=mKpi, params=params, name=model, q2_bin=(self.q2_min+self.q2_max)/2.)
return pdf
if name == 'PandSWave':
pdf_tot, params_tot = self.PDFS[name]
listPDF = dict.fromkeys(['PWave', 'SWave', 'PandSWaveInterference'])
for iPDF in listPDF:
ifncWave, iparamWave = self.PDFS[iPDF]
initParams = get_params(iparamWave)
listPDF[iPDF] = AngularPDF(obs=obs, params=initParams, func=ifncWave, name=iPDF)
if self.Kpi:
pdf_Kpi = get_Kpi_pdf(self.Kpi, iPDF)
listPDF[iPDF] *= pdf_Kpi
params = self.params
pdf = pdf_tot(obs=obs, PWave=listPDF["PWave"], SWave=listPDF["SWave"], PandSWaveInterference=listPDF["PandSWaveInterference"], FS=self.params['FS'])
if self.acc is not None:
pdf = AccPDF(pdf, self.acc, self.accmodel, obs=obs, dict_names=self._obs_names, q2_slice=self.q2_bin_acc, name=name)
params = get_params(param_names)
pdf = AngularPDF(obs=obs, params=params, func=fnc_class, name=name)
if self.acc is not None:
pdf = AccPDF(pdf, self.acc, self.accmodel, obs=obs, dict_names=self._obs_names, q2_slice=self.q2_bin_acc, name=name)
if self.acc_integral:
tf_expr = sympy.lambdify(self.acc_integral.free_symbols, self.acc_integral, 'tensorflow')
integral = integral_acceptance(params, tf_expr)
pdf.register_analytic_integral(func=integral, limits=zfit.Space(axes=(0,1,2), limits=obs.limits))
if self.Kpi:
pdf_Kpi = get_Kpi_pdf(self.Kpi, name)
pdf = pdf * pdf_Kpi
params = self.params
obs = self._spaces
return obs, pdf, params