import numpy as np from uncertainties import ufloat import lmfit from lmfit.models import (ConstantModel, ComplexConstantModel, LinearModel, QuadraticModel, PolynomialModel, SineModel, GaussianModel, Gaussian2dModel, LorentzianModel, SplitLorentzianModel, VoigtModel, PseudoVoigtModel, MoffatModel, Pearson7Model, StudentsTModel, BreitWignerModel, LognormalModel, DampedOscillatorModel, ExponentialGaussianModel, SkewedGaussianModel, SkewedVoigtModel, ThermalDistributionModel, DoniachModel, PowerLawModel, ExponentialModel, StepModel, RectangleModel, ExpressionModel, DampedHarmonicOscillatorModel) from lmfit.models import (guess_from_peak, guess_from_peak2d, fwhm_expr, height_expr, update_param_vals) from lmfit.lineshapes import (not_zero, breit_wigner, damped_oscillator, dho, doniach, expgaussian, exponential, gaussian, gaussian2d, linear, lognormal, lorentzian, moffat, parabolic, pearson7, powerlaw, pvoigt, rectangle, sine, skewed_gaussian, skewed_voigt, split_lorentzian, step, students_t, thermal_distribution, tiny, voigt) from lmfit import Model import numpy as np from numpy import (arctan, copysign, cos, exp, isclose, isnan, log, pi, real, sin, sqrt, where) from scipy.special import erf, erfc from scipy.special import gamma as gamfcn from scipy.special import wofz from scipy.optimize import curve_fit import xarray as xr log2 = log(2) s2pi = sqrt(2*pi) s2 = sqrt(2.0) def gaussianWithOffset(x, amplitude=1.0, center=0.0, sigma=1.0, offset=0.0): """Return a 1-dimensional Gaussian function with an offset. gaussian(x, amplitude, center, sigma) = (amplitude/(s2pi*sigma)) * exp(-(1.0*x-center)**2 / (2*sigma**2)) """ return ((amplitude/(max(tiny, s2pi*sigma))) * exp(-(1.0*x-center)**2 / max(tiny, (2*sigma**2))) + offset) def lorentzianWithOffset(x, amplitude=1.0, center=0.0, sigma=1.0, offset=0.0): return ((amplitude/(1 + ((1.0*x-center)/max(tiny, sigma))**2)) / max(tiny, (pi*sigma)) + offset) def exponentialWithOffset(x, amplitude=1.0, decay=1.0, offset=0.0): decay = not_zero(decay) return amplitude * exp(-x/decay) + offset def expansion(x, amplitude=1.0, offset=0.0): return np.sqrt(amplitude*x*x + offset) def dampingOscillation(x, center=0, amplitude=1.0, frequency=1.0, decay=1.0, offset=0.0): return amplitude * np.exp(-decay*x)*np.sin(2*np.pi*frequency*(x-center)) + offset def two_gaussian2d(x, y=0.0, A_amplitude=1.0, A_centerx=0.0, A_centery=0.0, A_sigmax=1.0, A_sigmay=1.0, B_amplitude=1.0, B_centerx=0.0, B_centery=0.0, B_sigmax=1.0, B_sigmay=1.0): """Return a 2-dimensional Gaussian function. gaussian2d(x, y, amplitude, centerx, centery, sigmax, sigmay) = amplitude/(2*pi*sigmax*sigmay) * exp(-(x-centerx)**2/(2*sigmax**2) -(y-centery)**2/(2*sigmay**2)) """ z = A_amplitude*(gaussian(x, amplitude=1, center=A_centerx, sigma=A_sigmax) * gaussian(y, amplitude=1, center=A_centery, sigma=A_sigmay)) z += B_amplitude*(gaussian(x, amplitude=1, center=B_centerx, sigma=B_sigmax) * gaussian(y, amplitude=1, center=B_centery, sigma=B_sigmay)) return z class GaussianWithOffsetModel(Model): fwhm_factor = 2*np.sqrt(2*np.log(2)) height_factor = 1./np.sqrt(2*np.pi) def __init__(self, independent_vars=['x'], nan_policy='raise', prefix='', name=None, **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(gaussianWithOffset, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): self.set_param_hint('sigma', min=0) self.set_param_hint('fwhm', expr=fwhm_expr(self)) self.set_param_hint('height', expr=height_expr(self)) def guess(self, data, x, negative=False, **kwargs): offset = np.min(data) data = data - offset pars = guess_from_peak(self, data, x, negative) pars.add('offset', value=offset) return update_param_vals(pars, self.prefix, **kwargs) class LorentzianWithOffsetModel(Model): fwhm_factor = 2.0 height_factor = 1./np.pi def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(lorentzianWithOffset, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): self.set_param_hint('sigma', min=0) self.set_param_hint('fwhm', expr=fwhm_expr(self)) self.set_param_hint('height', expr=height_expr(self)) def guess(self, data, x, negative=False, **kwargs): """Estimate initial model parameter values from data.""" offset = np.min(data) data = data - offset pars = guess_from_peak(self, data, x, negative, ampscale=1.25) pars.add('offset', value=offset) return update_param_vals(pars, self.prefix, **kwargs) class ExponentialWithOffsetModel(Model): def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(exponentialWithOffset, **kwargs) def guess(self, data, x, **kwargs): """Estimate initial model parameter values from data.""" offset = np.min(data) data = data - offset try: sval, oval = np.polyfit(x, np.log(abs(data)+1.e-15), 1) except TypeError: sval, oval = 1., np.log(abs(max(data)+1.e-9)) pars = self.make_params(amplitude=np.exp(oval), decay=-1.0/sval) pars.add('offset', value=offset) return update_param_vals(pars, self.prefix, **kwargs) class ExpansionModel(Model): def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(expansion, **kwargs) def guess(self, data, x, **kwargs): """Estimate initial model parameter values from data.""" popt1, pcov1 = curve_fit(expansion, x, data) pars = self.make_params(amplitude=popt1[0], offset=popt1[1]) return update_param_vals(pars, self.prefix, **kwargs) class DampingOscillationModel(Model): def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(dampingOscillation, **kwargs) def guess(self, data, x, **kwargs): """Estimate initial model parameter values from data.""" try: popt1, pcov1 = curve_fit(dampingOscillation, x, data, np.array(0, 5, 5e2, 1e3, 16)) pars = self.make_params(center=popt1[0], amplitude=popt1[1], frequency=popt1[2], decay=popt1[3], offset=popt1[4]) except: pars = self.make_params(center=0, amplitude=5.0, frequency=5e2, decay=1.0e3, offset=16.0) return update_param_vals(pars, self.prefix, **kwargs) class TwoGaussian2dModel(Model): fwhm_factor = 2*np.sqrt(2*np.log(2)) height_factor = 1./2*np.pi def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) self.helperModel = Gaussian2dModel() super().__init__(two_gaussian2d, **kwargs) self._set_paramhints_prefix() def _set_paramhints_prefix(self): self.set_param_hint('delta', value=-1, max=0) self.set_param_hint('A_sigmax', expr=f'{self.prefix}delta + {self.prefix}B_sigmax') def guess(self, data, x, y, negative=False, **kwargs): pars_guess = guess_from_peak2d(self.helperModel, data, x, y, negative) pars = self.make_params(A_amplitude=pars_guess['amplitude'].value, A_centerx=pars_guess['centerx'].value, A_centery=pars_guess['centery'].value, A_sigmax=pars_guess['sigmax'].value, A_sigmay=pars_guess['sigmay'].value, B_amplitude=pars_guess['amplitude'].value, B_centerx=pars_guess['centerx'].value, B_centery=pars_guess['centery'].value, B_sigmax=pars_guess['sigmax'].value, B_sigmay=pars_guess['sigmay'].value) pars[f'{self.prefix}A_sigmax'].set(expr=f'delta + {self.prefix}B_sigmax') pars.add(f'{self.prefix}delta', value=-1, max=0, min=-np.inf, vary=True) pars[f'{self.prefix}A_sigmay'].set(min=0.0) pars[f'{self.prefix}B_sigmax'].set(min=0.0) pars[f'{self.prefix}B_sigmay'].set(min=0.0) return pars class NewFitModel(Model): def __init__(self, func, independent_vars=['x'], prefix='', nan_policy='raise', **kwargs): kwargs.update({'prefix': prefix, 'nan_policy': nan_policy, 'independent_vars': independent_vars}) super().__init__(func, **kwargs) def guess(self, *args, **kwargs): return self.make_params() lmfit_models = {'Constant': ConstantModel, 'Complex Constant': ComplexConstantModel, 'Linear': LinearModel, 'Quadratic': QuadraticModel, 'Polynomial': PolynomialModel, 'Gaussian': GaussianModel, 'Gaussian-2D': Gaussian2dModel, 'Lorentzian': LorentzianModel, 'Split-Lorentzian': SplitLorentzianModel, 'Voigt': VoigtModel, 'PseudoVoigt': PseudoVoigtModel, 'Moffat': MoffatModel, 'Pearson7': Pearson7Model, 'StudentsT': StudentsTModel, 'Breit-Wigner': BreitWignerModel, 'Log-Normal': LognormalModel, 'Damped Oscillator': DampedOscillatorModel, 'Damped Harmonic Oscillator': DampedHarmonicOscillatorModel, 'Exponential Gaussian': ExponentialGaussianModel, 'Skewed Gaussian': SkewedGaussianModel, 'Skewed Voigt': SkewedVoigtModel, 'Thermal Distribution': ThermalDistributionModel, 'Doniach': DoniachModel, 'Power Law': PowerLawModel, 'Exponential': ExponentialModel, 'Step': StepModel, 'Rectangle': RectangleModel, 'Expression': ExpressionModel, 'Gaussian With Offset':GaussianWithOffsetModel, 'Lorentzian With Offset':LorentzianWithOffsetModel, 'Expansion':ExpansionModel, 'Damping Oscillation Model':DampingOscillationModel, 'Two Gaussian-2D':TwoGaussian2dModel, } class FitAnalyser(): def __init__(self, fitModel, fitDim=1, **kwargs) -> None: if isinstance(fitModel, str): self.fitModel = lmfit_models[fitModel](**kwargs) else: self.fitModel = fitModel self.fitDim = fitDim def print_params_set_templat(self, params=None): if params is None: params = self.fitModel.make_params() for key in params: res = "params.add(" res += "name=\"" + key + "\", " if not params[key].expr is None: res += "expr=" + params[key].expr +")" else: res += "value=" + f'{params[key].value:3g}' + ", " if str(params[key].max)=="inf": res += "max=np.inf, " else: res += "max=" + f'{params[key].max:3g}' + ", " if str(params[key].min)=="-inf": res += "min=-np.inf, " else: res += "min=" + f'{params[key].min:3g}' + ", " res += "vary=" + str(params[key].vary) + ")" print(res) def _guess_1D(self, data, x, **kwargs): return self.fitModel.guess(data=data, x=x, **kwargs) def _guess_2D(self, data, x, y, **kwargs): data = data.flatten(order='F') return self.fitModel.guess(data=data, x=x, y=y, **kwargs) def guess(self, dataArray, x=None, y=None, guess_kwargs={}, input_core_dims=None, dask='parallelized', vectorize=True, keep_attrs=True, daskKwargs=None, **kwargs): kwargs.update( { "dask": dask, "vectorize": vectorize, "input_core_dims": input_core_dims, 'keep_attrs': keep_attrs, } ) if not daskKwargs is None: kwargs.update({"dask_gufunc_kwargs": daskKwargs}) if input_core_dims is None: kwargs.update( { "input_core_dims": [['x']], } ) if x is None: if 'x' in dataArray.dims: x = dataArray['x'].to_numpy() else: if isinstance(x, str): if input_core_dims is None: kwargs.update( { "input_core_dims": [[x]], } ) x = dataArray[x].to_numpy() if self.fitDim == 1: guess_kwargs.update( { 'x':x, } ) return xr.apply_ufunc(self._guess_1D, dataArray, kwargs=guess_kwargs, output_dtypes=[type(self.fitModel.make_params())], **kwargs ) if self.fitDim == 2: if y is None: if 'y' in dataArray.dims: y = dataArray['y'].to_numpy() if input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y']], } ) else: if isinstance(y, str): kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y) y = dataArray[y].to_numpy() elif input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y']], } ) _x, _y = np.meshgrid(x, y) _x = _x.flatten() _y = _y.flatten() # dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1])) # kwargs["input_core_dims"][0] = ['_z'] guess_kwargs.update( { 'x':_x, 'y':_y, } ) return xr.apply_ufunc(self._guess_2D, dataArray, kwargs=guess_kwargs, output_dtypes=[type(self.fitModel.make_params())], **kwargs ) def _fit_1D(self, data, params, x): return self.fitModel.fit(data=data, x=x, params=params, nan_policy='omit') def _fit_2D(self, data, params, x, y): data = data.flatten(order='F') return self.fitModel.fit(data=data, x=x, y=y, params=params, nan_policy='omit') def fit(self, dataArray, paramsArray, x=None, y=None, input_core_dims=None, dask='parallelized', vectorize=True, keep_attrs=True, daskKwargs=None, **kwargs): kwargs.update( { "dask": dask, "vectorize": vectorize, "input_core_dims": input_core_dims, 'keep_attrs': keep_attrs, } ) if not daskKwargs is None: kwargs.update({"dask_gufunc_kwargs": daskKwargs}) if isinstance(paramsArray, type(self.fitModel.make_params())): if input_core_dims is None: kwargs.update( { "input_core_dims": [['x']], } ) if x is None: if 'x' in dataArray.dims: x = dataArray['x'].to_numpy() else: if isinstance(x, str): if input_core_dims is None: kwargs.update( { "input_core_dims": [[x]], } ) x = dataArray[x].to_numpy() if self.fitDim == 1: return xr.apply_ufunc(self._fit_1D, dataArray, kwargs={'params':paramsArray,'x':x}, output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))], **kwargs) if self.fitDim == 2: if y is None: if 'y' in dataArray.dims: y = dataArray['y'].to_numpy() if input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y']], } ) else: if isinstance(y, str): kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y) y = dataArray[y].to_numpy() elif input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y']], } ) _x, _y = np.meshgrid(x, y) _x = _x.flatten() _y = _y.flatten() # dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1])) # kwargs["input_core_dims"][0] = ['_z'] return xr.apply_ufunc(self._fit_2D, dataArray, kwargs={'params':paramsArray,'x':_x, 'y':_y}, output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))], **kwargs) else: if input_core_dims is None: kwargs.update( { "input_core_dims": [['x'], []], } ) if x is None: if 'x' in dataArray.dims: x = dataArray['x'].to_numpy() else: if isinstance(x, str): if input_core_dims is None: kwargs.update( { "input_core_dims": [[x], []], } ) x = dataArray[x].to_numpy() if self.fitDim == 1: return xr.apply_ufunc(self._fit_1D, dataArray, paramsArray, kwargs={'x':x}, output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))], **kwargs) if self.fitDim == 2: if input_core_dims is None: kwargs.update( { "input_core_dims": [['x', 'y'], []], } ) if y is None: if 'y' in dataArray.dims: y = dataArray['y'].to_numpy() else: if isinstance(y, str): y = dataArray[y].to_numpy() kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y) _x, _y = np.meshgrid(x, y) _x = _x.flatten() _y = _y.flatten() # dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1])) # kwargs["input_core_dims"][0] = ['_z'] return xr.apply_ufunc(self._fit_2D, dataArray, paramsArray, kwargs={'x':_x, 'y':_y}, output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))], **kwargs) def _eval_1D(self, fitResult, x): return self.fitModel.eval(x=x, **fitResult.best_values) def _eval_2D(self, fitResult, x, y, shape): res = self.fitModel.eval(x=x, y=y, **fitResult.best_values) return res.reshape(shape, order='F') def eval(self, fitResultArray, x=None, y=None, output_core_dims=None, prefix="", dask='parallelized', vectorize=True, daskKwargs=None, **kwargs): kwargs.update( { "dask": dask, "vectorize": vectorize, "output_core_dims": output_core_dims, } ) if daskKwargs is None: daskKwargs = {} if self.fitDim == 1: if output_core_dims is None: kwargs.update( { "output_core_dims": prefix+'x', } ) output_core_dims = [prefix+'x'] daskKwargs.update( { 'output_sizes': { output_core_dims[0]: np.size(x), }, 'meta': np.ndarray((0,0), dtype=float) } ) kwargs.update( { "dask_gufunc_kwargs": daskKwargs, } ) res = xr.apply_ufunc(self._eval_1D, fitResultArray, kwargs={"x":x}, **kwargs) return res.assign_coords({prefix+'x':np.array(x)}) if self.fitDim == 2: if output_core_dims is None: kwargs.update( { "output_core_dims": [[prefix+'x', prefix+'y']], } ) output_core_dims = [prefix+'x', prefix+'y'] daskKwargs.update( { 'output_sizes': { output_core_dims[0]: np.size(x), output_core_dims[1]: np.size(y), }, 'meta': np.ndarray((0,0), dtype=float) }, ) kwargs.update( { "dask_gufunc_kwargs": daskKwargs, } ) _x, _y = np.meshgrid(x, y) _x = _x.flatten() _y = _y.flatten() res = xr.apply_ufunc(self._eval_2D, fitResultArray, kwargs={"x":_x, "y":_y, "shape":(len(x), len(y))}, **kwargs) return res.assign_coords({prefix+'x':np.array(x), prefix+'y':np.array(y)}) def _get_fit_value_single(self, fitResult, key): return fitResult.params[key].value def _get_fit_value(self, fitResult, params): func = np.vectorize(self._get_fit_value_single) res = tuple( func(fitResult, key) for key in params ) return res def get_fit_value(self, fitResult, dask='parallelized', **kwargs): firstIndex = { key: fitResult[key][0] for key in fitResult.dims } firstFitResult = fitResult.sel(firstIndex).item() params = list(firstFitResult.params.keys()) output_core_dims=[ [] for _ in range(len(params))] kwargs.update( { "dask": dask, "output_core_dims": output_core_dims, } ) value = xr.apply_ufunc(self._get_fit_value, fitResult, kwargs=dict(params=params), **kwargs) value = xr.Dataset( data_vars={ params[i]: value[i] for i in range(len(params)) }, attrs=fitResult.attrs ) return value def _get_fit_std_single(self, fitResult, key): return fitResult.params[key].stderr def _get_fit_std(self, fitResult, params): func = np.vectorize(self._get_fit_std_single) res = tuple( func(fitResult, key) for key in params ) return res def get_fit_std(self, fitResult, dask='parallelized', **kwargs): firstIndex = { key: fitResult[key][0] for key in fitResult.dims } firstFitResult = fitResult.sel(firstIndex).item() params = list(firstFitResult.params.keys()) output_core_dims=[ [] for _ in range(len(params))] kwargs.update( { "dask": dask, "output_core_dims": output_core_dims, } ) value = xr.apply_ufunc(self._get_fit_std, fitResult, kwargs=dict(params=params), **kwargs) value = xr.Dataset( data_vars={ params[i]: value[i] for i in range(len(params)) }, attrs=fitResult.attrs ) return value def _get_fit_full_result_single(self, fitResult, key): if not fitResult.params[key].value is None: value = fitResult.params[key].value else: value = np.nan if not fitResult.params[key].stderr is None: std = fitResult.params[key].stderr else: std = np.nan return ufloat(value, std) def _get_fit_full_result(self, fitResult, params): func = np.vectorize(self._get_fit_full_result_single) res = tuple( func(fitResult, key) for key in params ) return res def get_fit_full_result(self, fitResult, dask='parallelized', **kwargs): firstIndex = { key: fitResult[key][0] for key in fitResult.dims } firstFitResult = fitResult.sel(firstIndex).item() params = list(firstFitResult.params.keys()) output_core_dims=[ [] for _ in range(len(params))] kwargs.update( { "dask": dask, "output_core_dims": output_core_dims, } ) value = xr.apply_ufunc(self._get_fit_full_result, fitResult, kwargs=dict(params=params), **kwargs) value = xr.Dataset( data_vars={ params[i]: value[i] for i in range(len(params)) }, attrs=fitResult.attrs ) return value