Browse Source

Added b2kstll custom fixes

master
rskuza 2 years ago
parent
commit
13f509fa1d
  1. 382
      Code/Scripts/Python Scripts/b2kstll/models/angular.py
  2. 63
      Code/Scripts/Python Scripts/b2kstll/models/angular_sympy.py

382
Code/Scripts/Python Scripts/b2kstll/models/angular.py

@ -0,0 +1,382 @@
import warnings
from math import pi
import numpy as np
import tensorflow as tf
import zfit
from zfit import z
import yaml
import sympy
from b2kstll.models.Kpi import KpiPWavePDF, KpiSWavePDF, ReKpiPandSWavePDF, ImKpiPandSWavePDF
from b2kstll.models.acceptance import AccPDF
from b2kstll.utils import get_coeffs
import analysis.efficiency
from analysis.efficiency import load_efficiency_model
from particle import Particle
from b2kstll.models.angular_sympy import angular_func_sympy
class AngularPDF(zfit.pdf.BasePDF):
""" Generic angular PDF built from input function and corresponding parameters
Args:
ArbitraryFNC (function: Any arbitrary angular function
params (params): parameters of interest in the given function
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
"""
def __init__(self, obs, params, func, name):
super().__init__(obs=obs, params=params, name=name+"PDF")
self._user_func = func
_N_OBS = 3
VALID_LIMITS = {'costheta_l': (-1, 1),
'costheta_k': (-1, 1),
'phi': (-pi, pi)}
def _pdf(self, x, norm_range):
costheta_l, costheta_k, phi = z.unstack_x(x)
pdf = self._user_func(**self.params, costheta_l=costheta_l, costheta_k=costheta_k, phi=phi)
return pdf
# Calculate Normalisation
def get_normalisation(FL, upper_ctk):
kwargs = dict(
FL = sympy.Symbol('FL'),
AFB = sympy.Symbol('AFB'),
S3 = sympy.Symbol('S3'),
S4 = sympy.Symbol('S4'),
S5 = sympy.Symbol('S5'),
S7 = sympy.Symbol('S7'),
S8 = sympy.Symbol('S8'),
S9 = sympy.Symbol('S9'),
costheta_l = sympy.Symbol('costheta_l'),
costheta_k = sympy.Symbol('costheta_k'),
phi = sympy.Symbol('phi')
)
sympy_val = angular_func_sympy(**kwargs)
integral = sympy.integrate(sympy_val,
(kwargs['costheta_l'], -1, 1),
(kwargs['costheta_k'], -1, upper_ctk),
(kwargs['phi'], -sympy.pi, sympy.pi),)
fun = sympy.lambdify((kwargs["FL"]),integral, 'tensorflow')
return fun(FL)
UPPER_CTK = 0.85
def PWave(FL, S3, S4, S5, AFB, S7, S8, S9, costheta_l, costheta_k, phi, backend=tf):
"""Full d4Gamma/dq2dOmega for Bd -> Kst ll (l=e,mu)
Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104)
i.e. the valid of the angles is given for
- phi: [-pi, pi]
- theta_K: [-pi, pi]
- theta_l: [-pi, pi]
The function is normalized over a finite range and therefore a PDF.
Args:
FL (`zfit.Parameter`): Fraction of longitudinal polarisation of the Kst
S3 (`zfit.Parameter`): A_perp^2 - A_para^2 / A_zero^2 + A_para^2 + A_perp^2 (L, R)
S4 (`zfit.Parameter`): RE(A_zero*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
S5 (`zfit.Parameter`): RE(A_zero*^2 * A_perp^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
AFB (`zfit.Parameter`): Forward-backward asymmetry of the di-lepton system (also i.e. 3/4 * S6s)
S7 (`zfit.Parameter`): IM(A_zero*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
S8 (`zfit.Parameter`): IM(A_zero*^2 * A_perp^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
S9 (`zfit.Parameter`): IM(A_perp*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R)
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
"""
sintheta_k = backend.sqrt(1.0 - costheta_k * costheta_k)
sintheta_l = backend.sqrt(1.0 - costheta_l * costheta_l)
sintheta_2k = (1.0 - costheta_k * costheta_k)
sintheta_2l = (1.0 - costheta_l * costheta_l)
sin2theta_k = (2.0 * sintheta_k * costheta_k)
cos2theta_l = (2.0 * costheta_l * costheta_l - 1.0)
sin2theta_l = (2.0 * sintheta_l * costheta_l)
pdf = ((3.0 / 4.0) * (1.0 - FL) * sintheta_2k +
FL * costheta_k * costheta_k +
(1.0 / 4.0) * (1.0 - FL) * sintheta_2k * cos2theta_l +
-1.0 * FL * costheta_k * costheta_k * cos2theta_l +
S3 * sintheta_2k * sintheta_2l * backend.cos(2.0 * phi) +
S4 * sin2theta_k * sin2theta_l * backend.cos(phi) +
S5 * sin2theta_k * sintheta_l * backend.cos(phi) +
(4.0 / 3.0) * AFB * sintheta_2k * costheta_l +
S7 * sin2theta_k * sintheta_l * backend.sin(phi) +
S8 * sin2theta_k * sin2theta_l * backend.sin(phi) +
S9 * sintheta_2k * sintheta_2l * backend.sin(2.0 * phi))
#return (9.0 / (32.0 * pi)) * pdf
return pdf / get_normalisation(FL = FL, upper_ctk = UPPER_CTK)
def SWave(costheta_l, costheta_k, phi, backend=tf):
"""Full d4Gamma/dq2dOmega for Swave part only
Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104)
i.e. the valid of the angles is given for
- phi: [-pi, pi]
- theta_K: [-pi, pi]
- theta_l: [-pi, pi]
The function is normalized over a finite range and therefore a PDF.
Args:
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
"""
sintheta_2l = (1.0 - costheta_l * costheta_l)
pdf = (3.0 / (16.0 * pi)) * sintheta_2l
return pdf
def PandSWaveInterference(Ss1, Ss2, Ss3, Ss4, Ss5, costheta_l, costheta_k, phi, backend=tf):
"""Full d4Gamma/dq2dOmega for the Real interference term between P and S waves
Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104)
i.e. the valid of the angles is given for
- phi: [-pi, pi]
- theta_K: [-pi, pi]
- theta_l: [-pi, pi]
The function is normalized over a finite range and therefore a PDF.
Args:
Ss1, Ss2, Ss3, Ss4, Ss5 (`zfit.Parameter`)
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
"""
sintheta_l = backend.sqrt(1.0 - costheta_l * costheta_l)
sintheta_k = backend.sqrt(1.0 - costheta_k * costheta_k)
sintheta_2l = (1.0 - costheta_l * costheta_l)
sin2theta_l = (2.0 * sintheta_l * costheta_l)
pdf = (Ss1 * sintheta_2l * costheta_k +
Ss2 * sin2theta_l * sintheta_k * backend.cos(phi) +
Ss3 * sintheta_l * sintheta_k * backend.cos(phi) +
Ss4 * sintheta_l * sintheta_k * backend.sin(phi) +
Ss5 * sin2theta_l * sintheta_k * backend.sin(phi))
return (3.0 / (16.0 * pi)) * pdf
class AngularPandSWavePDF(zfit.pdf.BaseFunctor):
"""Full d4Gamma/dq2dOmega for both spin 0 and 1 for Bd -> Kst ll (l=e,mu) decays
Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104)
i.e. the valid of the angles is given for
- phi: [-pi, pi]
- theta_K: [0, pi]
- theta_l: [0, pi]
The function is normalized over a finite range and therefore a PDF.
Args:
AngularPWavePDF (`zfit.pdf.ZPDF`: P-wave normalised PDF
AngularSWavePDF (`zfit.pdf.ZPDF`: S-wave interference part normalised PDF
FS (`zfit.Parameter`): S-wave fraction, i.e. |A^L_S|^2 + |A^R_S|^2/ Sum(|A^{S,R}_{S,zero,para,perp}|^2)
obs (`zfit.Space`):
name (str):
dtype (tf.DType):
"""
def __init__(self, PWave, SWave, PandSWaveInterference, FS, obs):
super().__init__(pdfs=[PWave, SWave, PandSWaveInterference], name="AngularPandSWavePDF", params={'FS': FS}, obs=obs)
_N_OBS = 3
VALID_LIMITS = {'costheta_l': (-1, 1),
'costheta_k': (-1, 1),
'phi': (-pi, pi)}
def _pdf(self, x, norm_range):
FS = self.params['FS']
PWave, SWave, PandSWaveInterference = self.pdfs
pdf = (1 - FS) * PWave.pdf(x) + FS * SWave.pdf(x) + PandSWaveInterference.pdf(x)
return pdf
# A bit of handling
class B2Kstll:
"""Factory class for B->K*ll angular fit"""
PDFS = {'PWave': (PWave, ['FL', 'S3', 'S4', 'S5', 'AFB', 'S7', 'S8', 'S9']),
'SWave': (SWave, []),
'PandSWaveInterference' : (PandSWaveInterference, ['Ss1', 'Ss2', 'Ss3', 'Ss4', 'Ss5']),
#'PandSWaveInterference5D': (PandSWaveInterference5D, ['Ss1re', 'Ss2re', 'Ss3re', 'Ss4re', 'Ss5re',
# 'Ss1im', 'Ss2im', 'Ss3im', 'Ss4im', 'Ss5im']),
'PandSWave' : (AngularPandSWavePDF, ['FS'])}
#'PandSWave5D': (AngularPandSWavePDF5D, ['FS'])}
KPI = {'PWave': (KpiPWavePDF, ['Kst_892'], [313]),
'SWave': (KpiSWavePDF, ['Kst_1430'], [10311]),
'RePandSWave': (ReKpiPandSWavePDF, ['Kst_892', 'Kst_1430'], [313, 10311]),
'ImPandSWave': (ImKpiPandSWavePDF, ['Kst_892', 'Kst_1430'], [313, 10311])}
def __init__(self, cosThetaL, cosThetaK, Phi, q2_min=None, q2_max=None):
"""Initialize observable names."""
self._obs_names = {'costheta_l': cosThetaL,
'costheta_k': cosThetaK,
'phi': Phi}
costheta_l = zfit.Space(cosThetaL, limits=(-1.0, 1.0))
costheta_k = zfit.Space(cosThetaK, limits=(-1.0, UPPER_CTK))
phi = zfit.Space(Phi, limits=(-pi, pi))
self._spaces = costheta_l * costheta_k * phi
self.params = {}
self.acc = None
self.acc_integral = None
self.Kpi = None
self.q2_bin_acc = None
self.q2_min = q2_min
self.q2_max = q2_max
def set_q2_bin_acceptance(self, q2_bin_acc):
"""Set the q2 value used to slice the acceptance function for parametrization in 4D"""
self.q2_bin_acc = q2_bin_acc
def set_acc(self, file_name, q2_bin_acc=None):
"""Set an acceptance function to the signal PDF."""
# Load coefficients from MoM
#coeffs_yaml = load_efficiency_model(file_name)
#coeffs = coeffs_yaml.get_coefficients()
self.acc,self.accmodel = get_coeffs(file_name)
self.set_q2_bin_acceptance(q2_bin_acc)
def set_acc_integral(self, file_name, str_order):
"""Set the integral of the PDF x acceptance."""
# Load integral from a yaml file
with open(file_name, 'r') as f:
dict_int = yaml.load(f, Loader=yaml.FullLoader)
integral = dict_int[0][str_order]
self.acc_integral = sympy.sympify(integral)
def set_Kpi(self, name):
""" Set the Kpi invariant mass system."""
#self._has_Kpi = True
self.Kpi = name
def get_pdf(self, name):
"""Get corresponding PDF and paramaters."""
fnc_class, param_names = self.PDFS[name]
def get_params(param_list):
out = {}
for param in param_list:
if param not in self.params:
if param == 'FL':
config = [0.6, 0, 1]
elif param == 'FS':
config = [0.1, 0, 1]
else:
config = [0.0, -2, 2]
self.params.update({param: zfit.Parameter(param, *config)})
out[param] = self.params[param]
return out
obs = self._spaces
def get_Kpi_pdf(name, model):
"""Set the corresponding PDF for the Kpi system and parameters."""
pdf_class, param_names, param_id = self.KPI[model]
def get_res(param_list, param_id):
out = {}
for param, idparam in zip(param_names, param_id):
if not any([i for i in self.params if param in i.lower()]):
m = Particle.from_pdgid(idparam).mass * 1e-03
w = Particle.from_pdgid(idparam).width * 1e-03
self.params.update({param+"_M": zfit.Parameter(param+"_M", m, floating=False)})
self.params.update({param+"_W": zfit.Parameter(param+"_W", w, floating=False)})
out[param+"_M"] = self.params[param+"_M"]
out[param+"_W"] = self.params[param+"_W"]
return out
# Add Kpi dimension
mKpi = zfit.Space(name, limits=(0.79555, 0.99555))
self._spaces *= mKpi
params = get_res(param_names, param_id)
pdf = pdf_class(obs=mKpi, params=params, name=model, q2_bin=(self.q2_min+self.q2_max)/2.)
return pdf
if name == 'PandSWave':
pdf_tot, params_tot = self.PDFS[name]
get_params(params_tot)
listPDF = dict.fromkeys(['PWave', 'SWave', 'PandSWaveInterference'])
for iPDF in listPDF:
ifncWave, iparamWave = self.PDFS[iPDF]
initParams = get_params(iparamWave)
listPDF[iPDF] = AngularPDF(obs=obs, params=initParams, func=ifncWave, name=iPDF)
if self.Kpi:
pdf_Kpi = get_Kpi_pdf(self.Kpi, iPDF)
listPDF[iPDF] *= pdf_Kpi
params = self.params
pdf = pdf_tot(obs=obs, PWave=listPDF["PWave"], SWave=listPDF["SWave"], PandSWaveInterference=listPDF["PandSWaveInterference"], FS=self.params['FS'])
if self.acc is not None:
pdf = AccPDF(pdf, self.acc, self.accmodel, obs=obs, dict_names=self._obs_names, q2_slice=self.q2_bin_acc, name=name)
else:
params = get_params(param_names)
pdf = AngularPDF(obs=obs, params=params, func=fnc_class, name=name)
if self.acc is not None:
pdf = AccPDF(pdf, self.acc, self.accmodel, obs=obs, dict_names=self._obs_names, q2_slice=self.q2_bin_acc, name=name)
if self.acc_integral:
tf_expr = sympy.lambdify(self.acc_integral.free_symbols, self.acc_integral, 'tensorflow')
integral = integral_acceptance(params, tf_expr)
pdf.register_analytic_integral(func=integral, limits=zfit.Space(axes=(0,1,2), limits=obs.limits))
if self.Kpi:
pdf_Kpi = get_Kpi_pdf(self.Kpi, name)
pdf = pdf * pdf_Kpi
params = self.params
obs = self._spaces
return obs, pdf, params

63
Code/Scripts/Python Scripts/b2kstll/models/angular_sympy.py

@ -0,0 +1,63 @@
import inspect
import sympy
def angular_func_sympy(FL, AFB, S3, S4, S5, S7, S8, S9, costheta_l, costheta_k, phi):
sintheta_k = sympy.sqrt(1.0 - costheta_k * costheta_k)
sintheta_l = sympy.sqrt(1.0 - costheta_l * costheta_l)
sintheta_2k = (1.0 - costheta_k * costheta_k)
sintheta_2l = (1.0 - costheta_l * costheta_l)
sin2theta_k = (2.0 * sintheta_k * costheta_k)
cos2theta_l = (2.0 * costheta_l * costheta_l - 1.0)
sin2theta_l = (2.0 * sintheta_l * costheta_l)
pdf = ((3.0 / 4.0) * (1.0 - FL) * sintheta_2k +
FL * costheta_k * costheta_k +
(1.0 / 4.0) * (1.0 - FL) * sintheta_2k * cos2theta_l +
-1.0 * FL * costheta_k * costheta_k * cos2theta_l +
S3 * sintheta_2k * sintheta_2l * sympy.cos(2.0 * phi) +
S4 * sin2theta_k * sin2theta_l * sympy.cos(phi) +
S5 * sin2theta_k * sintheta_l * sympy.cos(phi) +
(4.0 / 3.0) * AFB * sintheta_2k * costheta_l +
S7 * sin2theta_k * sintheta_l * sympy.sin(phi) +
S8 * sin2theta_k * sin2theta_l * sympy.sin(phi) +
S9 * sintheta_2k * sintheta_2l * sympy.sin(2.0 * phi))
return pdf
""" kwargs = dict(
FL = sympy.Symbol('FL'),
AFB = sympy.Symbol('AFB'),
S3 = sympy.Symbol('S3'),
S4 = sympy.Symbol('S4'),
S5 = sympy.Symbol('S5'),
S7 = sympy.Symbol('S7'),
S8 = sympy.Symbol('S8'),
S9 = sympy.Symbol('S9'),
costheta_l = sympy.Symbol('costheta_l'),
costheta_k = sympy.Symbol('costheta_k'),
phi = sympy.Symbol('phi')
)
lower_ctl = sympy.Symbol('lower_ctl')
upper_ctl = sympy.Symbol('upper_ctl')
lower_ctk = sympy.Symbol('lower_ctk')
upper_ctk = sympy.Symbol('upper_ctk')
lower_phi = sympy.Symbol('lower_phi')
upper_phi = sympy.Symbol('upper_phi')
sympy_val = angular_func_sympy(**kwargs)
print('starting integration')
integral = sympy.integrate(sympy_val,
(kwargs['costheta_l'], -1, 1),
(kwargs['costheta_k'], -1, 1),
(kwargs['phi'], -sympy.pi, sympy.pi),)
# integral = sympy.integrate(sympy_val,
# kwargs['costheta_l'],
# kwargs['costheta_k'],
# kwargs['phi'])
print(integral)
fun = sympy.lambdify((lower_ctl, lower_ctk, lower_phi),integral, 'tensorflow')
print(inspect.getsource(fun)) """
Loading…
Cancel
Save