import numpy as np from uncertainties import ufloat import lmfit from lmfit.models import (ConstantModel, ComplexConstantModel, LinearModel, QuadraticModel, PolynomialModel, SineModel, GaussianModel, Gaussian2dModel, LorentzianModel, SplitLorentzianModel, VoigtModel, PseudoVoigtModel, MoffatModel, Pearson7Model, StudentsTModel, BreitWignerModel, LognormalModel, DampedOscillatorModel, ExponentialGaussianModel, SkewedGaussianModel, SkewedVoigtModel, ThermalDistributionModel, DoniachModel, PowerLawModel, ExponentialModel, StepModel, RectangleModel, ExpressionModel, DampedHarmonicOscillatorModel) from lmfit.models import (guess_from_peak, guess_from_peak2d, fwhm_expr, height_expr, update_param_vals) from lmfit.lineshapes import (not_zero, breit_wigner, damped_oscillator, dho, doniach, expgaussian, exponential, gaussian, gaussian2d, linear, lognormal, lorentzian, moffat, parabolic, pearson7, powerlaw, pvoigt, rectangle, sine, skewed_gaussian, skewed_voigt, split_lorentzian, step, students_t, thermal_distribution, tiny, voigt) from lmfit import Model import numpy as np from numpy import (arctan, copysign, cos, exp, isclose, isnan, log, pi, real, sin, sqrt, where) from scipy.special import erf, erfc from scipy.special import gamma as gamfcn from scipy.special import wofz from scipy.optimize import curve_fit from scipy.interpolate import CubicSpline from scipy.ndimage import gaussian_filter import xarray as xr import copy import matplotlib.pyplot as plt log2 = log(2) s2pi = sqrt(2*pi) s2 = sqrt(2.0) def gaussianWithOffset(x, amplitude=1.0, center=0.0, sigma=1.0, offset=0.0): """Return a 1-dimensional Gaussian function with an offset. gaussian(x, amplitude, center, sigma) = (amplitude/(s2pi*sigma)) * exp(-(1.0*x-center)**2 / (2*sigma**2)) """ return ((amplitude/(max(tiny, s2pi*sigma))) * exp(-(1.0*x-center)**2 / max(tiny, (2*sigma**2))) + offset) def lorentzianWithOffset(x, amplitude=1.0, center=0.0, sigma=1.0, offset=0.0): return ((amplitude/(1 + ((1.0*x-center)/max(tiny, sigma))**2)) / max(tiny, (pi*sigma)) + offset) def exponentialWithOffset(x, amplitude=1.0, decay=1.0, offset=0.0): decay = not_zero(decay) return amplitude * exp(-x/decay) + offset def expansion(x, amplitude=1.0, offset=0.0): return np.sqrt(amplitude*x*x + offset) def dampingOscillation(x, center=0, amplitude=1.0, frequency=1.0, decay=1.0, offset=0.0): return amplitude * np.exp(-decay*x)*np.sin(2*np.pi*frequency*(x-center)) + offset def two_gaussian2d(x, y=0.0, A_amplitude=1.0, A_centerx=0.0, A_centery=0.0, A_sigmax=1.0, A_sigmay=1.0, B_amplitude=1.0, B_centerx=0.0, B_centery=0.0, B_sigmax=1.0, B_sigmay=1.0): """Return a 2-dimensional Gaussian function. gaussian2d(x, y, amplitude, centerx, centery, sigmax, sigmay) = amplitude/(2*pi*sigmax*sigmay) * exp(-(x-centerx)**2/(2*sigmax**2) -(y-centery)**2/(2*sigmay**2)) """ z = A_amplitude*(gaussian(x, amplitude=1, center=A_centerx, sigma=A_sigmax) * gaussian(y, amplitude=1, center=A_centery, sigma=A_sigmay)) z += B_amplitude*(gaussian(x, amplitude=1, center=B_centerx, sigma=B_sigmax) * gaussian(y, amplitude=1, center=B_centery, sigma=B_sigmay)) return z def polylog_tab(pow, x, order=100): """Calculationg the polylog sum_i(x^i/i^pow), up to the order-th element of the sum :param pow: power in denominator of sum :type pow: int (can be float) :param x: argument of Polylogarithm :type x: float or 1D numpy array :return: value of polylog(x) :rtype: same as x, float or 1D numpy array """ sum = 0 for k in range(1,order): sum += x ** k /k **pow return sum # Creating array for interpolation x_int = np.linspace(0, 1.00001, 100000) poly_tab = polylog_tab(2,x_int) # Creating function interpolating between the Polylog values calculated before polylog_int = CubicSpline(x_int, poly_tab) def thermal(x, x0, amp, sigma): res = np.exp(-0.5 * (x-x0)**2 / sigma**2) return amp/1.643 * polylog_int(res) def Thomas_Fermi_1d(x, x0, amp, sigma): res = (1- ((x-x0)/sigma)**2) res = np.where(res > 0, res, 0) res = res**(3/2) return amp * res def density_1d(x, x0_bec, x0_th, amp_bec, amp_th, sigma_bec, sigma_th): return thermal(x, x0_th, amp_th, sigma_th) + Thomas_Fermi_1d(x, x0_bec, amp_bec, sigma_bec) def polylog(pow, x): order = 15 sum = 0 for k in range(1,order): sum += x ** k /k **pow return sum def ThomasFermi_2d(x, y=0.0, centerx=0.0, centery=0.0, amplitude=1.0, sigmax=1.0, sigmay=1.0): res = (1- ((x-centerx)/(sigmax))**2 - ((y-centery)/(sigmay))**2) res = np.where(res > 0, res, 0) res = res**(3/2) return amplitude * res def polylog2_2d(x, y=0.0, centerx=0.0, centery=0.0, amplitude=1.0, sigmax=1.0, sigmay=1.0): ## Approximation of the polylog function with 2D gaussian as argument. -> discribes the thermal part of the cloud return amplitude/1.643 * polylog_int(np.exp( -((x-centerx)**2/(2 * sigmax**2))-((y-centery)**2/( 2 * sigmay**2)) )) def density_profile_BEC_2d(x, y=0.0, amp_bec=1.0, amp_th=1.0, x0_bec=0.0, y0_bec=0.0, x0_th=0.0, y0_th=0.0, sigmax_bec=1.0, sigmay_bec=1.0, sigma_th=1.0): return ThomasFermi_2d(x=x, y=y, centerx=x0_bec, centery=y0_bec, amplitude=amp_bec, sigmax=sigmax_bec, sigmay=sigmay_bec ) + polylog2_2d(x=x, y=y, centerx=x0_th, centery=y0_th, amplitude=amp_th, sigmax=sigma_th,sigmay=sigma_th) class GaussianWithOffsetModel(Model): fwhm_factor = 2*np.sqrt(2*np.log(2)) height_factor = 1./np.sqrt(2*np.pi) def __init__(self, independent_vars=['x'], nan_policy='raise', prefix='', name=None, **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(gaussianWithOffset, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): self.set_param_hint('sigma', min=0) self.set_param_hint('fwhm', expr=fwhm_expr(self)) self.set_param_hint('height', expr=height_expr(self)) def guess(self, data, x, negative=False, **kwargs): offset = np.min(data) data = data - offset pars = guess_from_peak(self, data, x, negative) pars.add('offset', value=offset) return update_param_vals(pars, self.prefix, **kwargs) class LorentzianWithOffsetModel(Model): fwhm_factor = 2.0 height_factor = 1./np.pi def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(lorentzianWithOffset, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): self.set_param_hint('sigma', min=0) self.set_param_hint('fwhm', expr=fwhm_expr(self)) self.set_param_hint('height', expr=height_expr(self)) def guess(self, data, x, negative=False, **kwargs): """Estimate initial model parameter values from data.""" offset = np.min(data) data = data - offset pars = guess_from_peak(self, data, x, negative, ampscale=1.25) pars.add('offset', value=offset) return update_param_vals(pars, self.prefix, **kwargs) class ExponentialWithOffsetModel(Model): def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(exponentialWithOffset, **kwargs) def guess(self, data, x, **kwargs): """Estimate initial model parameter values from data.""" offset = np.min(data) data = data - offset try: sval, oval = np.polyfit(x, np.log(abs(data)+1.e-15), 1) except TypeError: sval, oval = 1., np.log(abs(max(data)+1.e-9)) pars = self.make_params(amplitude=np.exp(oval), decay=-1.0/sval) pars.add('offset', value=offset) return update_param_vals(pars, self.prefix, **kwargs) class ExpansionModel(Model): def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(expansion, **kwargs) def guess(self, data, x, **kwargs): """Estimate initial model parameter values from data.""" popt1, pcov1 = curve_fit(expansion, x, data) pars = self.make_params(amplitude=popt1[0], offset=popt1[1]) return update_param_vals(pars, self.prefix, **kwargs) class DampingOscillationModel(Model): def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(dampingOscillation, **kwargs) def guess(self, data, x, **kwargs): """Estimate initial model parameter values from data.""" try: popt1, pcov1 = curve_fit(dampingOscillation, x, data, np.array(0, 5, 5e2, 1e3, 16)) pars = self.make_params(center=popt1[0], amplitude=popt1[1], frequency=popt1[2], decay=popt1[3], offset=popt1[4]) except: pars = self.make_params(center=0, amplitude=5.0, frequency=5e2, decay=1.0e3, offset=16.0) return update_param_vals(pars, self.prefix, **kwargs) class TwoGaussian2dModel(Model): fwhm_factor = 2*np.sqrt(2*np.log(2)) height_factor = 1./2*np.pi def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) self.helperModel = Gaussian2dModel() super().__init__(two_gaussian2d, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): self.set_param_hint('delta', value=-1, max=0) self.set_param_hint('A_sigmax', expr=f'{self.prefix}delta + {self.prefix}B_sigmax') def guess(self, data, x, y, negative=False, **kwargs): pars_guess = guess_from_peak2d(self.helperModel, data, x, y, negative) pars = self.make_params(A_amplitude=pars_guess['amplitude'].value, A_centerx=pars_guess['centerx'].value, A_centery=pars_guess['centery'].value, A_sigmax=pars_guess['sigmax'].value, A_sigmay=pars_guess['sigmay'].value, B_amplitude=pars_guess['amplitude'].value, B_centerx=pars_guess['centerx'].value, B_centery=pars_guess['centery'].value, B_sigmax=pars_guess['sigmax'].value, B_sigmay=pars_guess['sigmay'].value) pars[f'{self.prefix}A_sigmax'].set(expr=f'delta + {self.prefix}B_sigmax') pars.add(f'{self.prefix}delta', value=-1, max=0, min=-np.inf, vary=True) pars[f'{self.prefix}A_sigmay'].set(min=0.0) pars[f'{self.prefix}B_sigmax'].set(min=0.0) pars[f'{self.prefix}B_sigmay'].set(min=0.0) return pars class Polylog22dModel(Model): fwhm_factor = 2*np.sqrt(2*np.log(2)) height_factor = 1./2*np.pi def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(polylog2_2d, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): self.set_param_hint('Rx', min=0) self.set_param_hint('Ry', min=0) def guess(self, data, x, y, negative=False, **kwargs): """Estimate initial model parameter values from data.""" pars = guess_from_peak2d(self, data, x, y, negative) return update_param_vals(pars, self.prefix, **kwargs) class ThomasFermi2dModel(Model): fwhm_factor = 1 height_factor = 0.5 def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(ThomasFermi_2d, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): self.set_param_hint('Rx', min=0) self.set_param_hint('Ry', min=0) def guess(self, data, x, y, negative=False, **kwargs): """Estimate initial model parameter values from data.""" pars = guess_from_peak2d(self, data, x, y, negative) # amplitude = pars['amplitude'].value # simgax = pars['sigmax'].value # sigmay = pars['sigmay'].value # pars['amplitude'].set(value=amplitude/s2pi/simgax/sigmay) simgax = pars['sigmax'].value sigmay = pars['sigmay'].value pars['simgax'].set(value=simgax / 2.355) pars['sigmay'].set(value=sigmay / 2.355) return update_param_vals(pars, self.prefix, **kwargs) class DensityProfileBEC2dModel(lmfit.Model): """ Fitting class to do 2D bimodal fit on OD of absorption images """ def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise', atom_n_conv=144, is_debug=False, **kwargs ): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) self.atom_n_conv = atom_n_conv self.is_debug=is_debug super().__init__(density_profile_BEC_2d, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): # self.set_param_hint('BEC_sigmax', min=0) self.set_param_hint('amp_bec', min=0) self.set_param_hint('amp_th', min=0) self.set_param_hint('x0_bec', min=0) self.set_param_hint('y0_bec', min=0) self.set_param_hint('x0_th', min=0) self.set_param_hint('y0_th', min=0) self.set_param_hint('sigmax_bec', min=0) self.set_param_hint('sigmay_bec', min=0) self.set_param_hint('sigma_th', min=0) def guess(self, data, x, y, **kwargs): """ Estimate and create initial model parameters for 2d fit :param data: Flattened 2d array, flattened from array with (x,y) --> :param x: :param y: :param kwargs: :return: """ # # global X_guess # global bval_1d x_width = len(np.unique(x)) y_width = len(np.unique(y)) data = np.reshape(data, (y_width, x_width)) data = data.T shape = np.shape(data) cut_width = np.max(shape) thresh = self.calc_thresh(data) center = self.calc_cen(thresh) BEC_width_guess = self.guess_BEC_width(thresh, center) if BEC_width_guess[0] < BEC_width_guess[1]: if self.is_debug: print(f'x smaller y') s_width_ind = 0 X_guess = np.sum(data[:, round(center[1] - BEC_width_guess[1]/2) : round(center[1] + BEC_width_guess[1]/2)], 1) / len(data[0,round(center[1] - BEC_width_guess[1]/2) : round(center[1] + BEC_width_guess[1]/2)]) else: if self.is_debug: print(f'y smaller x') s_width_ind = 1 X_guess = np.sum(data[round(center[0] - BEC_width_guess[0]/2) : round(center[0] + BEC_width_guess[0]/2), :], 0) / len(data[0,round(center[0] - BEC_width_guess[0]/2) : round(center[0] + BEC_width_guess[0]/2)]) if self.is_debug: print(f'center = {center}') print(f'BEC widths: {BEC_width_guess}') plt.plot(X_guess) plt.show() x = np.linspace(0, len(X_guess), len(X_guess)) max_val = np.max(X_guess) fitmodel_1d = lmfit.Model(density_1d, independent_vars=['x']) params_1d = lmfit.Parameters() params_1d.add_many( ('x0_bec', center[s_width_ind], True,0, 200), ('x0_th',center[s_width_ind], True,0, 200), ('amp_bec', 0.7 * max_val, True, 0, 1.3 * max_val), ('amp_th', 0.3 * max_val, True, 0, 1.3 * max_val), ('deltax', 3*BEC_width_guess[s_width_ind], True, 0,cut_width), # ('sigma_bec',BEC_width_guess[i,j,0]/1.22, True, 0, 50) ('sigma_bec',BEC_width_guess[s_width_ind]/1.22, True, 0, 50) ) params_1d.add('sigma_th', 3*BEC_width_guess[0], min=0, expr=f'0.632*sigma_bec + 0.518*deltax') res_1d = fitmodel_1d.fit(X_guess, x=x, params=params_1d) if self.is_debug: params_1d.pretty_print() self.print_bval(res_1d) bval_1d = res_1d.best_values amp_conv_1d_2d = np.max(gaussian_filter(data, sigma=1)) / (bval_1d['amp_bec'] + bval_1d['amp_th']) max_val = np.max(data) params = self.make_params() if bval_1d['amp_th']/bval_1d['amp_bec'] > 3: print(f'Image seems to be purely thermal (guessed from 1d fit amplitude)') params[f'{self.prefix}amp_bec'].set(value=0, vary=False) params[f'{self.prefix}amp_th'].set(value=amp_conv_1d_2d * bval_1d['amp_th'], max=1.3 * max_val, vary=True) params[f'{self.prefix}x0_bec'].set(value=1, vary=False) params[f'{self.prefix}y0_bec'].set(value=1, vary=False) params[f'{self.prefix}x0_th'].set(value=center[0], min=center[0] -10, max=center[0] + 10, vary=True) params[f'{self.prefix}y0_th'].set(value=center[1], min=center[1] -10, max=center[1] + 10, vary=True) params[f'{self.prefix}sigmax_bec'].set(value=1, vary=False) params[f'{self.prefix}sigmay_bec'].set(value=1, vary=False) params[f'{self.prefix}sigma_th'].set(value=bval_1d['sigma_th'], max=cut_width, vary=True) elif bval_1d['amp_bec']/bval_1d['amp_th'] > 10: print('Image seems to be pure BEC (guessed from 1d fit amplitude)') params[f'{self.prefix}amp_bec'].set(value=amp_conv_1d_2d * bval_1d['amp_bec'], max=1.3 * max_val, vary=True) params[f'{self.prefix}amp_th'].set(value=0, vary=False) params[f'{self.prefix}x0_bec'].set(value=center[0], min=center[0] -10, max=center[0] + 10, vary=True) params[f'{self.prefix}y0_bec'].set(value=center[1], min=center[1] -10, max=center[1] + 10, vary=True) params[f'{self.prefix}x0_th'].set(value=1, vary=False) params[f'{self.prefix}y0_th'].set(value=1, vary=False) params[f'{self.prefix}sigma_th'].set(value=1, vary=False) if s_width_ind == 0: params[f'{self.prefix}sigmax_bec'].set(value=bval_1d['sigma_bec'], max= 2*BEC_width_guess[0], vary=True) params[f'{self.prefix}sigmay_bec'].set(value=BEC_width_guess[1]/1.22, max= 2*BEC_width_guess[1], vary=True) elif s_width_ind == 1: params[f'{self.prefix}sigmax_bec'].set(value=BEC_width_guess[0]/1.22, max= 2*BEC_width_guess[0], vary=True) params[f'{self.prefix}sigmay_bec'].set(value=bval_1d['sigma_bec'], max= 2*BEC_width_guess[1], vary=True) else: print('Error in small width BEC recogintion, s_width_ind should be 0 or 1') else: params[f'{self.prefix}amp_bec'].set(value=amp_conv_1d_2d * bval_1d['amp_bec'], max=1.3 * max_val, vary=True) params[f'{self.prefix}amp_th'].set(value=amp_conv_1d_2d * bval_1d['amp_th'], max=1.3 * max_val, vary=True) params[f'{self.prefix}x0_bec'].set(value=center[0], min=center[0] -10, max=center[0] + 10, vary=True) params[f'{self.prefix}y0_bec'].set(value=center[1], min=center[1] -10, max=center[1] + 10, vary=True) params[f'{self.prefix}x0_th'].set(value=center[0], min=center[0] -10, max=center[0] + 10, vary=True) params[f'{self.prefix}y0_th'].set(value=center[1], min=center[1] -10, max=center[1] + 10, vary=True) params[f'{self.prefix}sigma_th'].set(value=bval_1d['sigma_th'], max=cut_width, vary=True) if s_width_ind == 0: params[f'{self.prefix}sigmax_bec'].set(value=bval_1d['sigma_bec'], max= 2*BEC_width_guess[0], vary=True) params[f'{self.prefix}sigmay_bec'].set(value=BEC_width_guess[1]/1.22, max= 2*BEC_width_guess[1], vary=True) elif s_width_ind == 1: params[f'{self.prefix}sigmax_bec'].set(value=BEC_width_guess[0]/1.22, max= 2*BEC_width_guess[0], vary=True) params[f'{self.prefix}sigmay_bec'].set(value=bval_1d['sigma_bec'], max= 2*BEC_width_guess[1], vary=True) else: print('Error in small width BEC recogintion, s_width_ind should be 0 or 1') if self.is_debug: print('') print('Init Params') params.pretty_print() return lmfit.models.update_param_vals(params, self.prefix, **kwargs) def fit(self, data, **kwargs): data_1d = data res = super().fit(data_1d, **kwargs) if self.is_debug: print('bval first fit') self.print_bval(res) if res.params['amp_bec'].vary and res.params['amp_th'].vary: bval = res.best_values sigma_cut = max(bval['sigmay_bec'], bval['sigmax_bec']) tf_fit = ThomasFermi_2d(kwargs['x'],kwargs['y'],centerx=bval['x0_bec'], centery=bval['y0_bec'], amplitude=bval['amp_bec'], sigmax=bval['sigmax_bec'], sigmay=bval['sigmay_bec']) tf_fit_2 = ThomasFermi_2d(kwargs['x'],kwargs['y'],centerx=bval['x0_bec'], centery=bval['y0_bec'], amplitude=bval['amp_bec'], sigmax=1.5 * sigma_cut, sigmay=1.5*sigma_cut) mask = np.where(tf_fit > 0, np.nan, data_1d) #mask[i,j] = gaussian_filter(mask[i,j], sigma = 0.4) mask = np.where(tf_fit_2 > 0, mask, np.nan) N_c = np.nansum(mask) # conversion N_count to Pixels N_a = self.atom_n_conv * N_c if N_a < 6615: print('No thermal part detected, performing fit without thermal function') params = res.params params[f'{self.prefix}amp_th'].set(value=0, vary=False) params[f'{self.prefix}x0_th'].set(value=1, vary=False) params[f'{self.prefix}y0_th'].set(value=1, vary=False) params[f'{self.prefix}sigma_th'].set(value=1, vary=False) res = super().fit(data_1d, x=kwargs['x'], y=kwargs['y'], params=params) return res return res def calc_thresh(self, data, thresh_val=0.3, sigma=0.4): """Returns thresholded binary image after blurring to guess BEC size :param data: 2d image or 1D or 2D array containing 2d images :type data: 2d, 3d or 4d numpy array :param thresh_val: relative threshhold value for binarization with respect to maximum of blurred image :param sigma: sigma of gaussian blur filter (see scipy.ndimage.gaussian_filter :return: binary 2d image or 1D or 2D array containing 2d images :rtype: 2d, 3d or 4d numpy array """ shape = np.shape(data) thresh = np.zeros(shape) blurred = gaussian_filter(data, sigma=sigma) if len(shape) == 4: for i in range(0,shape[0]): for j in range(0, shape[1]): thresh[i,j] = np.where(blurred[i,j] < np.max(blurred[i,j])*thresh_val, 0, 1) elif len(shape) == 3: for i in range(0,shape[0]): thresh[i] = np.where(blurred[i] < np.max(blurred[i])*thresh_val, 0, 1) elif len(shape) == 2: thresh = np.where(blurred < np.max(blurred)*thresh_val, 0, 1) else: print("Shape of data is wrong, output is empty") return thresh def calc_cen(self, thresh1): """ returns array: [X_center,Y_center] """ cen = np.zeros(2) (Y,X) = np.shape(thresh1) thresh1 = thresh1 /np.sum(thresh1) # marginal distributions dx = np.sum(thresh1, 0) dy = np.sum(thresh1, 1) # expected values cen[0] = np.sum(dx * np.arange(X)) cen[1] = np.sum(dy * np.arange(Y)) return cen def guess_BEC_width(self, thresh, center): """ returns width of thresholded area along both axis through the center with shape of thresh and [X_width, Y_width] for each image """ shape = np.shape(thresh) if len(shape) == 2: BEC_width_guess = np.array([np.sum(thresh[round(center[1]), :]), np.sum(thresh[:, round(center[0])]) ]) elif len(shape) == 3: BEC_width_guess = np.zeros((shape[0], 2)) for i in range(0, shape[0]): BEC_width_guess[i, 0] = np.sum(thresh[i, round(center[i,j,1]), :]) BEC_width_guess[i, 1] = np.sum(thresh[i, :, round(center[i,j,0])]) elif len(shape) == 4: BEC_width_guess = np.zeros((shape[0], shape[1], 2)) for i in range(0, shape[0]): for j in range(0, shape[1]): BEC_width_guess[i, j, 0] = np.sum(thresh[i, j, round(center[i,j,1]), :]) BEC_width_guess[i, j, 1] = np.sum(thresh[i, j, :, round(center[i,j,0])]) else: print("Shape of data is wrong, output is empty") return BEC_width_guess def cond_frac(self, results): """Returns condensate fraction""" bval = results.best_values tf_fit = ThomasFermi_2d(X, Y, centerx=bval['x0_bec'], centery=bval['y0_bec'], amplitude=bval['amp_bec'], sigmax=bval['sigmax_bec'], sigmay=bval['sigmay_bec']) N_bec = np.sum(tf_fit) fit = density_profile_BEC_2d(X, Y, **bval) N_ges = np.sum(fit) return N_bec/N_ges def return_atom_number(self, result, X, Y, is_print=True): """Printing fitted atom number in bec + thermal state""" bval = result.best_values tf_fit = ThomasFermi_2d(X,Y,centerx=bval['x0_bec'], centery=bval['y0_bec'], amplitude=bval['amp_bec'], sigmax=bval['sigmax_bec'], sigmay=bval['sigmay_bec']) N_bec = self.atom_n_conv * np.sum(tf_fit) th_fit = polylog2_2d(X, Y, centerx=bval['x0_th'], centery=bval['y0_th'], amplitude=bval['amp_th'], sigmax=bval['sigma_th'], sigmay=bval['sigma_th']) N_th = self.atom_n_conv * np.sum(th_fit) # fit = density_profile_BEC_2d(X,Y, **bval) # N_ges = self.atom_n_conv * np.sum(fit) if is_print: print() print('Atom numbers:') print(f' N_bec: {N_bec :.0f}') print(f' N_th: {N_th :.0f}') print(f' N_ges: {N_bec + N_th :.0f}') print(f' Cond. frac: {N_bec/(N_bec + N_th):.2f}') print('') def return_temperature(self, result, omg, tof, is_print=True, eff_pix=2.472e-6): """Returns temperature of thermal cloud""" R_th = result.best_values['sigma_th'] * eff_pix * np.sqrt(2) print(R_th) T = R_th**2 * 164*const.u/const.k * (1/omg**2 + tof**2)**(-1) if is_print: print(f'Temperature: {T*1e9:.2f} nK') return T def print_bval(self, res_s): """nicely print best fitted values + init values + bounds """ keys = res_s.best_values.keys() bval = res_s.best_values init = res_s.init_params for item in keys: print(f'{item}: {bval[item]:.3f}, (init = {init[item].value:.3f}), bounds = [{init[item].min:.2f} : {init[item].max :.2f}] ') print('') class NewFitModel(Model): def __init__(self, func, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(func, **kwargs) def guess(self, *args, **kwargs): return self.make_params() lmfit_models = {'Constant': ConstantModel, 'Complex Constant': ComplexConstantModel, 'Linear': LinearModel, 'Quadratic': QuadraticModel, 'Polynomial': PolynomialModel, 'Gaussian': GaussianModel, 'Gaussian-2D': Gaussian2dModel, 'Lorentzian': LorentzianModel, 'Split-Lorentzian': SplitLorentzianModel, 'Voigt': VoigtModel, 'PseudoVoigt': PseudoVoigtModel, 'Moffat': MoffatModel, 'Pearson7': Pearson7Model, 'StudentsT': StudentsTModel, 'Breit-Wigner': BreitWignerModel, 'Log-Normal': LognormalModel, 'Damped Oscillator': DampedOscillatorModel, 'Damped Harmonic Oscillator': DampedHarmonicOscillatorModel, 'Exponential Gaussian': ExponentialGaussianModel, 'Skewed Gaussian': SkewedGaussianModel, 'Skewed Voigt': SkewedVoigtModel, 'Thermal Distribution': ThermalDistributionModel, 'Doniach': DoniachModel, 'Power Law': PowerLawModel, 'Exponential': ExponentialModel, 'Step': StepModel, 'Rectangle': RectangleModel, 'Expression': ExpressionModel, 'Gaussian With Offset':GaussianWithOffsetModel, 'Lorentzian With Offset':LorentzianWithOffsetModel, 'Expansion':ExpansionModel, 'Damping Oscillation Model':DampingOscillationModel, 'Two Gaussian-2D':TwoGaussian2dModel, 'Thomas Fermi-2D': ThomasFermi2dModel, 'Density Profile of BEC-2D': DensityProfileBEC2dModel, 'Polylog2-2D': polylog2_2d, } class FitAnalyser(): """This is a class integrated all the functions to do a fit. """ def __init__(self, fitModel, fitDim=1, **kwargs) -> None: """Initialize function :param fitModel: The fitting model of fit function :type fitModel: lmfit Model :param fitDim: The dimension of the fit, defaults to 1 :type fitDim: int, optional """ if isinstance(fitModel, str): self.fitModel = lmfit_models[fitModel](**kwargs) else: self.fitModel = fitModel self.fitDim = fitDim def print_params_set_template(self, params=None): """Print a template to manually set the initial parameters of the fit :param params: An object of Parameters class to print, defaults to None :type params: lmfit Paraemters, optional """ if params is None: params = self.fitModel.make_params() for key in params: res = "params.add(" res += "name=\"" + key + "\", " if not params[key].expr is None: res += "expr=\"" + params[key].expr +"\")" else: res += "value=" + f'{params[key].value:3g}' + ", " if str(params[key].max)=="inf": res += "max=np.inf, " else: res += "max=" + f'{params[key].max:3g}' + ", " if str(params[key].min)=="-inf": res += "min=-np.inf, " else: res += "min=" + f'{params[key].min:3g}' + ", " res += "vary=" + str(params[key].vary) + ")" print(res) def _guess_1D(self, data, x, **kwargs): """Call the guess function of the 1D fit model to guess the initial value. :param data: The data to fit :type data: 1D numpy array :param x: The data of x axis :type x: 1D numpy array :return: The guessed initial parameters for the fit :rtype: lmfit Parameters """ return self.fitModel.guess(data=data, x=x, **kwargs) def _guess_2D(self, data, x, y, **kwargs): """Call the guess function of the 2D fit model to guess the initial value. :param data: The flattened data to fit :type data: 1D numpy array :param x: The flattened data of x axis :type x: 1D numpy array :param y: The flattened data of y axis :type y: 1D numpy array :return: The guessed initial parameters for the fit :rtype: lmfit Parameters """ data = data.flatten(order='F') return self.fitModel.guess(data=data, x=x, y=y, **kwargs) def guess(self, dataArray, x=None, y=None, guess_kwargs={}, input_core_dims=None, dask='parallelized', vectorize=True, keep_attrs=True, daskKwargs=None, **kwargs): """Call the guess function of the fit model to guess the initial value. :param dataArray: The data for the fit :type dataArray: xarray DataArray :param x: The name of x axis in data or the data of x axis, defaults to None :type x: str or numpy array, optional :param y: The name of y axis in data or the data of y axis, defaults to None :type y: str or numpy array, optional :param guess_kwargs: the keyworded arguments to send to the guess function, defaults to {} :type guess_kwargs: dict, optional :param input_core_dims: over write of the same argument in xarray.apply_ufunc, defaults to None :type input_core_dims: list or array like, optional :param dask: over write of the same argument in xarray.apply_ufunc,, defaults to 'parallelized' :type dask: str, optional :param vectorize: over write of the same argument in xarray.apply_ufunc, defaults to True :type vectorize: bool, optional :param keep_attrs: over write of the same argument in xarray.apply_ufunc, defaults to True :type keep_attrs: bool, optional :param daskKwargs: over write of the same argument in xarray.apply_ufunc, defaults to None :type daskKwargs: dict, optional :return: The guessed initial parameters for the fit :rtype: xarray DataArray """ kwargs.update( { "dask": dask, "vectorize": vectorize, "input_core_dims": input_core_dims, 'keep_attrs': keep_attrs, } ) if not daskKwargs is None: kwargs.update({"dask_gufunc_kwargs": daskKwargs}) if input_core_dims is None: kwargs.update( { "input_core_dims": [['x']], } ) if x is None: if 'x' in dataArray.dims: x = dataArray['x'].to_numpy() else: if isinstance(x, str): if input_core_dims is None: kwargs.update( { "input_core_dims": [[x]], } ) x = dataArray[x].to_numpy() if self.fitDim == 1: guess_kwargs.update( { 'x':x, } ) return xr.apply_ufunc(self._guess_1D, dataArray, kwargs=guess_kwargs, output_dtypes=[type(self.fitModel.make_params())], **kwargs ) if self.fitDim == 2: if y is None: if 'y' in dataArray.dims: y = dataArray['y'].to_numpy() if input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y']], } ) else: if isinstance(y, str): kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y) y = dataArray[y].to_numpy() elif input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y']], } ) _x, _y = np.meshgrid(x, y) _x = _x.flatten() _y = _y.flatten() # dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1])) # kwargs["input_core_dims"][0] = ['_z'] guess_kwargs.update( { 'x':_x, 'y':_y, } ) return xr.apply_ufunc(self._guess_2D, dataArray, kwargs=guess_kwargs, output_dtypes=[type(self.fitModel.make_params())], **kwargs ) def _fit_1D(self, data, params, x): """Call the fit function of the 1D fit model to do the fit. :param data: The data to fit :type data: 1D numpy array :param params: The initial paramters of the fit :type params: lmfit Parameters :param x: The data of x axis :type x: 1D numpy array :return: The result of the fit :rtype: lmfit FitResult """ return self.fitModel.fit(data=data, x=x, params=params, nan_policy='omit') def _fit_2D(self, data, params, x, y): """Call the fit function of the 2D fit model to do the fit. :param data: The flattened data to fit :type data: 1D numpy array :param params: The flattened initial paramters of the fit :type params: lmfit Parameters :param x: The flattened data of x axis :type x: 1D numpy array :param y: The flattened data of y axis :type y: 1D numpy array :return: The result of the fit :rtype: lmfit FitResult """ data = data.flatten(order='F') return self.fitModel.fit(data=data, x=x, y=y, params=params, nan_policy='omit') def fit(self, dataArray, paramsArray, x=None, y=None, input_core_dims=None, dask='parallelized', vectorize=True, keep_attrs=True, daskKwargs=None, **kwargs): """Call the fit function of the fit model to do the fit. :param dataArray: The data for the fit :type dataArray: xarray DataArray :param paramsArray: The flattened initial paramters of the fit :type paramsArray: xarray DataArray or lmfit Parameters :param x: The name of x axis in data or the data of x axis, defaults to None :type x: str or numpy array, optional :param y: The name of y axis in data or the data of y axis, defaults to None :type y: str or numpy array, optional :param input_core_dims: over write of the same argument in xarray.apply_ufunc, defaults to None, defaults to None :type input_core_dims: list or array like, optional :param dask: over write of the same argument in xarray.apply_ufunc, defaults to None, defaults to 'parallelized' :type dask: str, optional :param vectorize: over write of the same argument in xarray.apply_ufunc, defaults to None, defaults to True :type vectorize: bool, optional :param keep_attrs: over write of the same argument in xarray.apply_ufunc, defaults to None, defaults to True :type keep_attrs: bool, optional :param daskKwargs: over write of the same argument in xarray.apply_ufunc, defaults to None, defaults to None :type daskKwargs: dict, optional :return: The fit result :rtype: xarray DataArray """ kwargs.update( { "dask": dask, "vectorize": vectorize, "input_core_dims": input_core_dims, 'keep_attrs': keep_attrs, } ) if not daskKwargs is None: kwargs.update({"dask_gufunc_kwargs": daskKwargs}) if isinstance(paramsArray, type(self.fitModel.make_params())): if input_core_dims is None: kwargs.update( { "input_core_dims": [['x']], } ) if x is None: if 'x' in dataArray.dims: x = dataArray['x'].to_numpy() else: if isinstance(x, str): if input_core_dims is None: kwargs.update( { "input_core_dims": [[x]], } ) x = dataArray[x].to_numpy() if self.fitDim == 1: res = xr.apply_ufunc(self._fit_1D, dataArray, kwargs={'params':paramsArray,'x':x}, output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))], **kwargs) if keep_attrs: res.attrs = copy.copy(dataArray.attrs) return res if self.fitDim == 2: if y is None: if 'y' in dataArray.dims: y = dataArray['y'].to_numpy() if input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y']], } ) else: if isinstance(y, str): kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y) y = dataArray[y].to_numpy() elif input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y']], } ) _x, _y = np.meshgrid(x, y) _x = _x.flatten() _y = _y.flatten() # dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1])) # kwargs["input_core_dims"][0] = ['_z'] res = xr.apply_ufunc(self._fit_2D, dataArray, kwargs={'params':paramsArray,'x':_x, 'y':_y}, output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))], **kwargs) if keep_attrs: res.attrs = copy.copy(dataArray.attrs) return res else: if input_core_dims is None: kwargs.update( { "input_core_dims": [['x'], []], } ) if x is None: if 'x' in dataArray.dims: x = dataArray['x'].to_numpy() else: if isinstance(x, str): if input_core_dims is None: kwargs.update( { "input_core_dims": [[x], []], } ) x = dataArray[x].to_numpy() if self.fitDim == 1: return xr.apply_ufunc(self._fit_1D, dataArray, paramsArray, kwargs={'x':x}, output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))], **kwargs) if self.fitDim == 2: if input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y'], []], } ) if y is None: if 'y' in dataArray.dims: y = dataArray['y'].to_numpy() else: if isinstance(y, str): y = dataArray[y].to_numpy() kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y) _x, _y = np.meshgrid(x, y) _x = _x.flatten() _y = _y.flatten() # dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1])) # kwargs["input_core_dims"][0] = ['_z'] return xr.apply_ufunc(self._fit_2D, dataArray, paramsArray, kwargs={'x':_x, 'y':_y}, output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))], **kwargs) def _eval_1D(self, fitResult, x): """Call the eval function of the 1D fit model to calculate the curve. :param fitResult: The result of fit :type fitResult: lmfit FitResult :param x: The data of x axis :type x: 1D numpy array :return: The curve based on the fit result :rtype: 1D numpy array """ return self.fitModel.eval(x=x, **fitResult.best_values) def _eval_2D(self, fitResult, x, y, shape): """Call the eval function of the 2D fit model to calculate the curve. :param fitResult: The result of fit :type fitResult: lmfit FitResult :param x: The data of x axis :type x: 1D numpy array :param y: The flattened data of y axis :type y: 1D numpy array :param shape: The desired shape for output :type shape: list, tuple or array like :return: The curve based on the fit result :rtype: 2D numpy array """ res = self.fitModel.eval(x=x, y=y, **fitResult.best_values) return res.reshape(shape, order='F') def eval(self, fitResultArray, x=None, y=None, output_core_dims=None, prefix="", dask='parallelized', vectorize=True, daskKwargs=None, **kwargs): """Call the eval function of the fit model to calculate the curve. :param fitResultArray: The result of fit :type fitResultArray: xarray DataArray :param x: The name of x axis in data or the data of x axis, defaults to None :type x: str or numpy array, optional :param y: The name of y axis in data or the data of y axis, defaults to None :type y: str or numpy array, optional :param output_core_dims: over write of the same argument in xarray.apply_ufunc, defaults to None :type output_core_dims: list, optional :param prefix: prefix str of the fit model, defaults to "" :type prefix: str, optional :param dask: over write of the same argument in xarray.apply_ufunc, defaults to 'parallelized' :type dask: str, optional :param vectorize: over write of the same argument in xarray.apply_ufunc, defaults to True :type vectorize: bool, optional :param daskKwargs: over write of the same argument in xarray.apply_ufunc, defaults to None :type daskKwargs: dict, optional :return: The curve based on the fit result :rtype: xarray """ kwargs.update( { "dask": dask, "vectorize": vectorize, "output_core_dims": output_core_dims, } ) if daskKwargs is None: daskKwargs = {} if self.fitDim == 1: if output_core_dims is None: kwargs.update( { "output_core_dims": prefix+'x', } ) output_core_dims = [prefix+'x'] daskKwargs.update( { 'output_sizes': { output_core_dims[0]: np.size(x), }, 'meta': np.ndarray((0,0), dtype=float) } ) kwargs.update( { "dask_gufunc_kwargs": daskKwargs, } ) res = xr.apply_ufunc(self._eval_1D, fitResultArray, kwargs={"x":x}, **kwargs) return res.assign_coords({prefix+'x':np.array(x)}) if self.fitDim == 2: if output_core_dims is None: kwargs.update( { "output_core_dims": [[prefix+'x', prefix+'y']], } ) output_core_dims = [prefix+'x', prefix+'y'] daskKwargs.update( { 'output_sizes': { output_core_dims[0]: np.size(x), output_core_dims[1]: np.size(y), }, 'meta': np.ndarray((0,0), dtype=float) }, ) kwargs.update( { "dask_gufunc_kwargs": daskKwargs, } ) _x, _y = np.meshgrid(x, y) _x = _x.flatten() _y = _y.flatten() res = xr.apply_ufunc(self._eval_2D, fitResultArray, kwargs={"x":_x, "y":_y, "shape":(len(x), len(y))}, **kwargs) return res.assign_coords({prefix+'x':np.array(x), prefix+'y':np.array(y)}) def _get_fit_value_single(self, fitResult, key): """get value of one single parameter from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param key: The name of the parameter :type key: str :return: The vaule of the parameter :rtype: float """ return fitResult.params[key].value def _get_fit_value(self, fitResult, params): """get value of parameters from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param params: A list of the name of paramerters to read :type params: list or array like :return: The vaule of the parameter :rtype: 1D numpy array """ func = np.vectorize(self._get_fit_value_single) res = tuple( func(fitResult, key) for key in params ) return res def get_fit_value(self, fitResult, dask='parallelized', **kwargs): """get value of parameters from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param dask: over write of the same argument in xarray.apply_ufunc, defaults to 'parallelized' :type dask: str, optional :return: The vaule of the parameter :rtype: xarray DataArray """ firstIndex = { key: fitResult[key][0] for key in fitResult.dims } firstFitResult = fitResult.sel(firstIndex).item() params = list(firstFitResult.params.keys()) output_core_dims=[ [] for _ in range(len(params))] kwargs.update( { "dask": dask, "output_core_dims": output_core_dims, } ) value = xr.apply_ufunc(self._get_fit_value, fitResult, kwargs=dict(params=params), **kwargs) value = xr.Dataset( data_vars={ params[i]: value[i] for i in range(len(params)) }, attrs=fitResult.attrs ) return value def _get_fit_std_single(self, fitResult, key): """get standard deviation of one single parameter from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param key: The name of the parameter :type key: str :return: The vaule of the parameter :rtype: float """ return fitResult.params[key].stderr def _get_fit_std(self, fitResult, params): """get standard deviation of parameters from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param params: A list of the name of paramerters to read :type params: list or array like :return: The vaule of the parameter :rtype: 1D numpy array """ func = np.vectorize(self._get_fit_std_single) res = tuple( func(fitResult, key) for key in params ) return res def get_fit_std(self, fitResult, dask='parallelized', **kwargs): """get standard deviation of parameters from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param dask: over write of the same argument in xarray.apply_ufunc, defaults to 'parallelized' :type dask: str, optional :return: The vaule of the parameter :rtype: xarray DataArray """ firstIndex = { key: fitResult[key][0] for key in fitResult.dims } firstFitResult = fitResult.sel(firstIndex).item() params = list(firstFitResult.params.keys()) output_core_dims=[ [] for _ in range(len(params))] kwargs.update( { "dask": dask, "output_core_dims": output_core_dims, } ) value = xr.apply_ufunc(self._get_fit_std, fitResult, kwargs=dict(params=params), **kwargs) value = xr.Dataset( data_vars={ params[i]: value[i] for i in range(len(params)) }, attrs=fitResult.attrs ) return value def _get_fit_full_result_single(self, fitResult, key): """get the full result of one single parameter from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param key: The name of the parameter :type key: str :return: The vaule of the parameter :rtype: float """ if not fitResult.params[key].value is None: value = fitResult.params[key].value else: value = np.nan if not fitResult.params[key].stderr is None: std = fitResult.params[key].stderr else: std = np.nan return ufloat(value, std) def _get_fit_full_result(self, fitResult, params): """get the full result of parameters from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param params: A list of the name of paramerters to read :type params: list or array like :return: The vaule of the parameter :rtype: 1D numpy array """ func = np.vectorize(self._get_fit_full_result_single) res = tuple( func(fitResult, key) for key in params ) return res def get_fit_full_result(self, fitResult, dask='parallelized', **kwargs): """get the full result of parameters from fit result :param fitResult: The result of the fit :type fitResult: lmfit FitResult :param dask: over write of the same argument in xarray.apply_ufunc, defaults to 'parallelized' :type dask: str, optional :return: The vaule of the parameter :rtype: xarray DataArray """ firstIndex = { key: fitResult[key][0] for key in fitResult.dims } firstFitResult = fitResult.sel(firstIndex).item() params = list(firstFitResult.params.keys()) output_core_dims=[ [] for _ in range(len(params))] kwargs.update( { "dask": dask, "output_core_dims": output_core_dims, } ) value = xr.apply_ufunc(self._get_fit_full_result, fitResult, kwargs=dict(params=params), **kwargs) value = xr.Dataset( data_vars={ params[i]: value[i] for i in range(len(params)) }, attrs=fitResult.attrs ) return value