import warnings from math import pi import numpy as np import tensorflow as tf import zfit from zfit import z import yaml import sympy from b2kstll.models.Kpi import KpiPWavePDF, KpiSWavePDF, ReKpiPandSWavePDF, ImKpiPandSWavePDF from b2kstll.models.acceptance import AccPDF from b2kstll.utils import get_coeffs import analysis.efficiency from analysis.efficiency import load_efficiency_model from particle import Particle from b2kstll.models.angular_sympy import angular_func_sympy class AngularPDF(zfit.pdf.BasePDF): """ Generic angular PDF built from input function and corresponding parameters Args: ArbitraryFNC (function: Any arbitrary angular function params (params): parameters of interest in the given function obs (`zfit.Space`): name (str): dtype (tf.DType): """ def __init__(self, obs, params, func, name): super().__init__(obs=obs, params=params, name=name+"PDF") self._user_func = func _N_OBS = 3 VALID_LIMITS = {'costheta_l': (-1, 1), 'costheta_k': (-1, 1), 'phi': (-pi, pi)} def _pdf(self, x, norm_range): costheta_l, costheta_k, phi = z.unstack_x(x) pdf = self._user_func(**self.params, costheta_l=costheta_l, costheta_k=costheta_k, phi=phi) return pdf # Calculate Normalisation def get_normalisation(FL, upper_ctk): kwargs = dict( FL = sympy.Symbol('FL'), AFB = sympy.Symbol('AFB'), S3 = sympy.Symbol('S3'), S4 = sympy.Symbol('S4'), S5 = sympy.Symbol('S5'), S7 = sympy.Symbol('S7'), S8 = sympy.Symbol('S8'), S9 = sympy.Symbol('S9'), costheta_l = sympy.Symbol('costheta_l'), costheta_k = sympy.Symbol('costheta_k'), phi = sympy.Symbol('phi') ) sympy_val = angular_func_sympy(**kwargs) integral = sympy.integrate(sympy_val, (kwargs['costheta_l'], -1, 1), (kwargs['costheta_k'], -1, upper_ctk), (kwargs['phi'], -sympy.pi, sympy.pi),) fun = sympy.lambdify((kwargs["FL"]),integral, 'tensorflow') return fun(FL) UPPER_CTK = 0.85 def PWave(FL, S3, S4, S5, AFB, S7, S8, S9, costheta_l, costheta_k, phi, backend=tf): """Full d4Gamma/dq2dOmega for Bd -> Kst ll (l=e,mu) Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104) i.e. the valid of the angles is given for - phi: [-pi, pi] - theta_K: [-pi, pi] - theta_l: [-pi, pi] The function is normalized over a finite range and therefore a PDF. Args: FL (`zfit.Parameter`): Fraction of longitudinal polarisation of the Kst S3 (`zfit.Parameter`): A_perp^2 - A_para^2 / A_zero^2 + A_para^2 + A_perp^2 (L, R) S4 (`zfit.Parameter`): RE(A_zero*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R) S5 (`zfit.Parameter`): RE(A_zero*^2 * A_perp^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R) AFB (`zfit.Parameter`): Forward-backward asymmetry of the di-lepton system (also i.e. 3/4 * S6s) S7 (`zfit.Parameter`): IM(A_zero*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R) S8 (`zfit.Parameter`): IM(A_zero*^2 * A_perp^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R) S9 (`zfit.Parameter`): IM(A_perp*^2 * A_para^2) / A_zero^2 + A_para^2 + A_perp^2 (L, R) obs (`zfit.Space`): name (str): dtype (tf.DType): """ sintheta_k = backend.sqrt(1.0 - costheta_k * costheta_k) sintheta_l = backend.sqrt(1.0 - costheta_l * costheta_l) sintheta_2k = (1.0 - costheta_k * costheta_k) sintheta_2l = (1.0 - costheta_l * costheta_l) sin2theta_k = (2.0 * sintheta_k * costheta_k) cos2theta_l = (2.0 * costheta_l * costheta_l - 1.0) sin2theta_l = (2.0 * sintheta_l * costheta_l) pdf = ((3.0 / 4.0) * (1.0 - FL) * sintheta_2k + FL * costheta_k * costheta_k + (1.0 / 4.0) * (1.0 - FL) * sintheta_2k * cos2theta_l + -1.0 * FL * costheta_k * costheta_k * cos2theta_l + S3 * sintheta_2k * sintheta_2l * backend.cos(2.0 * phi) + S4 * sin2theta_k * sin2theta_l * backend.cos(phi) + S5 * sin2theta_k * sintheta_l * backend.cos(phi) + (4.0 / 3.0) * AFB * sintheta_2k * costheta_l + S7 * sin2theta_k * sintheta_l * backend.sin(phi) + S8 * sin2theta_k * sin2theta_l * backend.sin(phi) + S9 * sintheta_2k * sintheta_2l * backend.sin(2.0 * phi)) #return (9.0 / (32.0 * pi)) * pdf return pdf / get_normalisation(FL = FL, upper_ctk = UPPER_CTK) def SWave(costheta_l, costheta_k, phi, backend=tf): """Full d4Gamma/dq2dOmega for Swave part only Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104) i.e. the valid of the angles is given for - phi: [-pi, pi] - theta_K: [-pi, pi] - theta_l: [-pi, pi] The function is normalized over a finite range and therefore a PDF. Args: obs (`zfit.Space`): name (str): dtype (tf.DType): """ sintheta_2l = (1.0 - costheta_l * costheta_l) pdf = (3.0 / (16.0 * pi)) * sintheta_2l return pdf def PandSWaveInterference(Ss1, Ss2, Ss3, Ss4, Ss5, costheta_l, costheta_k, phi, backend=tf): """Full d4Gamma/dq2dOmega for the Real interference term between P and S waves Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104) i.e. the valid of the angles is given for - phi: [-pi, pi] - theta_K: [-pi, pi] - theta_l: [-pi, pi] The function is normalized over a finite range and therefore a PDF. Args: Ss1, Ss2, Ss3, Ss4, Ss5 (`zfit.Parameter`) obs (`zfit.Space`): name (str): dtype (tf.DType): """ sintheta_l = backend.sqrt(1.0 - costheta_l * costheta_l) sintheta_k = backend.sqrt(1.0 - costheta_k * costheta_k) sintheta_2l = (1.0 - costheta_l * costheta_l) sin2theta_l = (2.0 * sintheta_l * costheta_l) pdf = (Ss1 * sintheta_2l * costheta_k + Ss2 * sin2theta_l * sintheta_k * backend.cos(phi) + Ss3 * sintheta_l * sintheta_k * backend.cos(phi) + Ss4 * sintheta_l * sintheta_k * backend.sin(phi) + Ss5 * sin2theta_l * sintheta_k * backend.sin(phi)) return (3.0 / (16.0 * pi)) * pdf class AngularPandSWavePDF(zfit.pdf.BaseFunctor): """Full d4Gamma/dq2dOmega for both spin 0 and 1 for Bd -> Kst ll (l=e,mu) decays Angular distribution obtained in the total PDF (using LHCb convention JHEP 02 (2016) 104) i.e. the valid of the angles is given for - phi: [-pi, pi] - theta_K: [0, pi] - theta_l: [0, pi] The function is normalized over a finite range and therefore a PDF. Args: AngularPWavePDF (`zfit.pdf.ZPDF`: P-wave normalised PDF AngularSWavePDF (`zfit.pdf.ZPDF`: S-wave interference part normalised PDF FS (`zfit.Parameter`): S-wave fraction, i.e. |A^L_S|^2 + |A^R_S|^2/ Sum(|A^{S,R}_{S,zero,para,perp}|^2) obs (`zfit.Space`): name (str): dtype (tf.DType): """ def __init__(self, PWave, SWave, PandSWaveInterference, FS, obs): super().__init__(pdfs=[PWave, SWave, PandSWaveInterference], name="AngularPandSWavePDF", params={'FS': FS}, obs=obs) _N_OBS = 3 VALID_LIMITS = {'costheta_l': (-1, 1), 'costheta_k': (-1, 1), 'phi': (-pi, pi)} def _pdf(self, x, norm_range): FS = self.params['FS'] PWave, SWave, PandSWaveInterference = self.pdfs pdf = (1 - FS) * PWave.pdf(x) + FS * SWave.pdf(x) + PandSWaveInterference.pdf(x) return pdf # A bit of handling class B2Kstll: """Factory class for B->K*ll angular fit""" PDFS = {'PWave': (PWave, ['FL', 'S3', 'S4', 'S5', 'AFB', 'S7', 'S8', 'S9']), 'SWave': (SWave, []), 'PandSWaveInterference' : (PandSWaveInterference, ['Ss1', 'Ss2', 'Ss3', 'Ss4', 'Ss5']), #'PandSWaveInterference5D': (PandSWaveInterference5D, ['Ss1re', 'Ss2re', 'Ss3re', 'Ss4re', 'Ss5re', # 'Ss1im', 'Ss2im', 'Ss3im', 'Ss4im', 'Ss5im']), 'PandSWave' : (AngularPandSWavePDF, ['FS'])} #'PandSWave5D': (AngularPandSWavePDF5D, ['FS'])} KPI = {'PWave': (KpiPWavePDF, ['Kst_892'], [313]), 'SWave': (KpiSWavePDF, ['Kst_1430'], [10311]), 'RePandSWave': (ReKpiPandSWavePDF, ['Kst_892', 'Kst_1430'], [313, 10311]), 'ImPandSWave': (ImKpiPandSWavePDF, ['Kst_892', 'Kst_1430'], [313, 10311])} def __init__(self, cosThetaL, cosThetaK, Phi, q2_min=None, q2_max=None): """Initialize observable names.""" self._obs_names = {'costheta_l': cosThetaL, 'costheta_k': cosThetaK, 'phi': Phi} costheta_l = zfit.Space(cosThetaL, limits=(-1.0, 1.0)) costheta_k = zfit.Space(cosThetaK, limits=(-1.0, UPPER_CTK)) phi = zfit.Space(Phi, limits=(-pi, pi)) self._spaces = costheta_l * costheta_k * phi self.params = {} self.acc = None self.acc_integral = None self.Kpi = None self.q2_bin_acc = None self.q2_min = q2_min self.q2_max = q2_max def set_q2_bin_acceptance(self, q2_bin_acc): """Set the q2 value used to slice the acceptance function for parametrization in 4D""" self.q2_bin_acc = q2_bin_acc def set_acc(self, file_name, q2_bin_acc=None): """Set an acceptance function to the signal PDF.""" # Load coefficients from MoM #coeffs_yaml = load_efficiency_model(file_name) #coeffs = coeffs_yaml.get_coefficients() self.acc,self.accmodel = get_coeffs(file_name) self.set_q2_bin_acceptance(q2_bin_acc) def set_acc_integral(self, file_name, str_order): """Set the integral of the PDF x acceptance.""" # Load integral from a yaml file with open(file_name, 'r') as f: dict_int = yaml.load(f, Loader=yaml.FullLoader) integral = dict_int[0][str_order] self.acc_integral = sympy.sympify(integral) def set_Kpi(self, name): """ Set the Kpi invariant mass system.""" #self._has_Kpi = True self.Kpi = name def get_pdf(self, name): """Get corresponding PDF and paramaters.""" fnc_class, param_names = self.PDFS[name] def get_params(param_list): out = {} for param in param_list: if param not in self.params: if param == 'FL': config = [0.6, 0, 1] elif param == 'FS': config = [0.1, 0, 1] else: config = [0.0, -2, 2] self.params.update({param: zfit.Parameter(param, *config)}) out[param] = self.params[param] return out obs = self._spaces def get_Kpi_pdf(name, model): """Set the corresponding PDF for the Kpi system and parameters.""" pdf_class, param_names, param_id = self.KPI[model] def get_res(param_list, param_id): out = {} for param, idparam in zip(param_names, param_id): if not any([i for i in self.params if param in i.lower()]): m = Particle.from_pdgid(idparam).mass * 1e-03 w = Particle.from_pdgid(idparam).width * 1e-03 self.params.update({param+"_M": zfit.Parameter(param+"_M", m, floating=False)}) self.params.update({param+"_W": zfit.Parameter(param+"_W", w, floating=False)}) out[param+"_M"] = self.params[param+"_M"] out[param+"_W"] = self.params[param+"_W"] return out # Add Kpi dimension mKpi = zfit.Space(name, limits=(0.79555, 0.99555)) self._spaces *= mKpi params = get_res(param_names, param_id) pdf = pdf_class(obs=mKpi, params=params, name=model, q2_bin=(self.q2_min+self.q2_max)/2.) return pdf if name == 'PandSWave': pdf_tot, params_tot = self.PDFS[name] get_params(params_tot) listPDF = dict.fromkeys(['PWave', 'SWave', 'PandSWaveInterference']) for iPDF in listPDF: ifncWave, iparamWave = self.PDFS[iPDF] initParams = get_params(iparamWave) listPDF[iPDF] = AngularPDF(obs=obs, params=initParams, func=ifncWave, name=iPDF) if self.Kpi: pdf_Kpi = get_Kpi_pdf(self.Kpi, iPDF) listPDF[iPDF] *= pdf_Kpi params = self.params pdf = pdf_tot(obs=obs, PWave=listPDF["PWave"], SWave=listPDF["SWave"], PandSWaveInterference=listPDF["PandSWaveInterference"], FS=self.params['FS']) if self.acc is not None: pdf = AccPDF(pdf, self.acc, self.accmodel, obs=obs, dict_names=self._obs_names, q2_slice=self.q2_bin_acc, name=name) else: params = get_params(param_names) pdf = AngularPDF(obs=obs, params=params, func=fnc_class, name=name) if self.acc is not None: pdf = AccPDF(pdf, self.acc, self.accmodel, obs=obs, dict_names=self._obs_names, q2_slice=self.q2_bin_acc, name=name) if self.acc_integral: tf_expr = sympy.lambdify(self.acc_integral.free_symbols, self.acc_integral, 'tensorflow') integral = integral_acceptance(params, tf_expr) pdf.register_analytic_integral(func=integral, limits=zfit.Space(axes=(0,1,2), limits=obs.limits)) if self.Kpi: pdf_Kpi = get_Kpi_pdf(self.Kpi, name) pdf = pdf * pdf_Kpi params = self.params obs = self._spaces return obs, pdf, params