905 lines
34 KiB
Python
905 lines
34 KiB
Python
import numpy as np
|
|
from uncertainties import ufloat
|
|
|
|
import lmfit
|
|
from lmfit.models import (ConstantModel, ComplexConstantModel, LinearModel, QuadraticModel,
|
|
PolynomialModel, SineModel, GaussianModel, Gaussian2dModel, LorentzianModel,
|
|
SplitLorentzianModel, VoigtModel, PseudoVoigtModel, MoffatModel,
|
|
Pearson7Model, StudentsTModel, BreitWignerModel, LognormalModel,
|
|
DampedOscillatorModel, ExponentialGaussianModel, SkewedGaussianModel,
|
|
SkewedVoigtModel, ThermalDistributionModel, DoniachModel, PowerLawModel,
|
|
ExponentialModel, StepModel, RectangleModel, ExpressionModel, DampedHarmonicOscillatorModel)
|
|
from lmfit.models import (guess_from_peak, guess_from_peak2d, fwhm_expr, height_expr,
|
|
update_param_vals)
|
|
from lmfit.lineshapes import (not_zero, breit_wigner, damped_oscillator, dho, doniach,
|
|
expgaussian, exponential, gaussian, gaussian2d,
|
|
linear, lognormal, lorentzian, moffat, parabolic,
|
|
pearson7, powerlaw, pvoigt, rectangle, sine,
|
|
skewed_gaussian, skewed_voigt, split_lorentzian, step,
|
|
students_t, thermal_distribution, tiny, voigt)
|
|
from lmfit import Model
|
|
import numpy as np
|
|
from numpy import (arctan, copysign, cos, exp, isclose, isnan, log, pi, real,
|
|
sin, sqrt, where)
|
|
from scipy.special import erf, erfc
|
|
from scipy.special import gamma as gamfcn
|
|
from scipy.special import wofz
|
|
from scipy.optimize import curve_fit
|
|
|
|
import xarray as xr
|
|
|
|
|
|
log2 = log(2)
|
|
s2pi = sqrt(2*pi)
|
|
s2 = sqrt(2.0)
|
|
|
|
|
|
def gaussianWithOffset(x, amplitude=1.0, center=0.0, sigma=1.0, offset=0.0):
|
|
"""Return a 1-dimensional Gaussian function with an offset.
|
|
|
|
gaussian(x, amplitude, center, sigma) =
|
|
(amplitude/(s2pi*sigma)) * exp(-(1.0*x-center)**2 / (2*sigma**2))
|
|
|
|
"""
|
|
return ((amplitude/(max(tiny, s2pi*sigma)))
|
|
* exp(-(1.0*x-center)**2 / max(tiny, (2*sigma**2))) + offset)
|
|
|
|
|
|
def lorentzianWithOffset(x, amplitude=1.0, center=0.0, sigma=1.0, offset=0.0):
|
|
return ((amplitude/(1 + ((1.0*x-center)/max(tiny, sigma))**2))
|
|
/ max(tiny, (pi*sigma)) + offset)
|
|
|
|
|
|
def exponentialWithOffset(x, amplitude=1.0, decay=1.0, offset=0.0):
|
|
decay = not_zero(decay)
|
|
return amplitude * exp(-x/decay) + offset
|
|
|
|
|
|
def expansion(x, amplitude=1.0, offset=0.0):
|
|
return np.sqrt(amplitude*x*x + offset)
|
|
|
|
|
|
def dampingOscillation(x, center=0, amplitude=1.0, frequency=1.0, decay=1.0, offset=0.0):
|
|
return amplitude * np.exp(-decay*x)*np.sin(2*np.pi*frequency*(x-center)) + offset
|
|
|
|
|
|
def two_gaussian2d(x, y=0.0, A_amplitude=1.0, A_centerx=0.0, A_centery=0.0, A_sigmax=1.0,
|
|
A_sigmay=1.0, B_amplitude=1.0, B_centerx=0.0, B_centery=0.0, B_sigmax=1.0,
|
|
B_sigmay=1.0):
|
|
"""Return a 2-dimensional Gaussian function.
|
|
|
|
gaussian2d(x, y, amplitude, centerx, centery, sigmax, sigmay) =
|
|
amplitude/(2*pi*sigmax*sigmay) * exp(-(x-centerx)**2/(2*sigmax**2)
|
|
-(y-centery)**2/(2*sigmay**2))
|
|
|
|
"""
|
|
z = A_amplitude*(gaussian(x, amplitude=1, center=A_centerx, sigma=A_sigmax) *
|
|
gaussian(y, amplitude=1, center=A_centery, sigma=A_sigmay))
|
|
z += B_amplitude*(gaussian(x, amplitude=1, center=B_centerx, sigma=B_sigmax) *
|
|
gaussian(y, amplitude=1, center=B_centery, sigma=B_sigmay))
|
|
return z
|
|
|
|
|
|
def ThomasFermi_2d(x, y=0.0, centerx=0.0, centery=0.0, amplitude=1.0, sigmax=1.0, sigmay=1.0):
|
|
res = (1- ((x-centerx)/(sigmax))**2 - ((y-centery)/(sigmay))**2)**(3 / 2)
|
|
return amplitude * 5 / 2 / np.pi / max(tiny, sigmax * sigmay) * np.where(res > 0, res, 0)
|
|
|
|
|
|
def polylog(power, numerator):
|
|
|
|
dataShape = numerator.shape
|
|
numerator = np.tile(numerator, (20, 1))
|
|
|
|
denominator = np.arange(1, 21)
|
|
denominator = np.tile(denominator, (dataShape[0], 1))
|
|
denominator = denominator.T
|
|
|
|
data = numerator / denominator
|
|
|
|
return np.sum(np.power(data, power), axis=0)
|
|
|
|
|
|
def polylog2_2d(x, y=0.0, centerx=0.0, centery=0.0, amplitude=1.0, sigmax=1.0, sigmay=1.0):
|
|
## Approximation of the polylog function with 2D gaussian as argument. -> discribes the thermal part of the cloud
|
|
return amplitude / np.pi / 1.59843 / max(tiny, sigmax * sigmay) * polylog(2, np.exp( -((x-centerx)**2/(2 * (sigmax)**2))-((y-centery)**2/( 2 * (sigmay)**2)) ))
|
|
|
|
|
|
def density_profile_BEC_2d(x, y=0.0, BEC_amplitude=1.0, thermal_amplitude=1.0, BEC_centerx=0.0, BEC_centery=0.0, thermal_centerx=0.0, thermal_centery=0.0,
|
|
BEC_sigmax=1.0, BEC_sigmay=1.0, thermal_sigmax=1.0, thermal_sigmay=1.0):
|
|
|
|
return ThomasFermi_2d(x=x, y=y, centerx=BEC_centerx, centery=BEC_centery,
|
|
amplitude=BEC_amplitude, sigmax=BEC_sigmax, sigmay=BEC_sigmay
|
|
) + polylog2_2d(x=x, y=y, centerx=thermal_centerx, centery=thermal_centery,
|
|
amplitude=thermal_amplitude, sigmax=thermal_sigmax, sigmay=thermal_sigmay)
|
|
|
|
|
|
class GaussianWithOffsetModel(Model):
|
|
|
|
fwhm_factor = 2*np.sqrt(2*np.log(2))
|
|
height_factor = 1./np.sqrt(2*np.pi)
|
|
|
|
def __init__(self, independent_vars=['x'], nan_policy='raise', prefix='', name=None, **kwargs):
|
|
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
super().__init__(gaussianWithOffset, **kwargs)
|
|
self._set_paramhints_prefix()
|
|
|
|
def _set_paramhints_prefix(self):
|
|
self.set_param_hint('sigma', min=0)
|
|
self.set_param_hint('fwhm', expr=fwhm_expr(self))
|
|
self.set_param_hint('height', expr=height_expr(self))
|
|
|
|
def guess(self, data, x, negative=False, **kwargs):
|
|
offset = np.min(data)
|
|
data = data - offset
|
|
pars = guess_from_peak(self, data, x, negative)
|
|
pars.add('offset', value=offset)
|
|
return update_param_vals(pars, self.prefix, **kwargs)
|
|
|
|
|
|
class LorentzianWithOffsetModel(Model):
|
|
|
|
fwhm_factor = 2.0
|
|
height_factor = 1./np.pi
|
|
|
|
def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
super().__init__(lorentzianWithOffset, **kwargs)
|
|
self._set_paramhints_prefix()
|
|
|
|
def _set_paramhints_prefix(self):
|
|
self.set_param_hint('sigma', min=0)
|
|
self.set_param_hint('fwhm', expr=fwhm_expr(self))
|
|
self.set_param_hint('height', expr=height_expr(self))
|
|
|
|
def guess(self, data, x, negative=False, **kwargs):
|
|
"""Estimate initial model parameter values from data."""
|
|
offset = np.min(data)
|
|
data = data - offset
|
|
pars = guess_from_peak(self, data, x, negative, ampscale=1.25)
|
|
pars.add('offset', value=offset)
|
|
return update_param_vals(pars, self.prefix, **kwargs)
|
|
|
|
|
|
class ExponentialWithOffsetModel(Model):
|
|
|
|
def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
super().__init__(exponentialWithOffset, **kwargs)
|
|
|
|
def guess(self, data, x, **kwargs):
|
|
"""Estimate initial model parameter values from data."""
|
|
offset = np.min(data)
|
|
data = data - offset
|
|
try:
|
|
sval, oval = np.polyfit(x, np.log(abs(data)+1.e-15), 1)
|
|
except TypeError:
|
|
sval, oval = 1., np.log(abs(max(data)+1.e-9))
|
|
pars = self.make_params(amplitude=np.exp(oval), decay=-1.0/sval)
|
|
pars.add('offset', value=offset)
|
|
return update_param_vals(pars, self.prefix, **kwargs)
|
|
|
|
|
|
class ExpansionModel(Model):
|
|
|
|
def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
super().__init__(expansion, **kwargs)
|
|
|
|
def guess(self, data, x, **kwargs):
|
|
"""Estimate initial model parameter values from data."""
|
|
|
|
popt1, pcov1 = curve_fit(expansion, x, data)
|
|
pars = self.make_params(amplitude=popt1[0], offset=popt1[1])
|
|
|
|
return update_param_vals(pars, self.prefix, **kwargs)
|
|
|
|
|
|
class DampingOscillationModel(Model):
|
|
|
|
def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
super().__init__(dampingOscillation, **kwargs)
|
|
|
|
def guess(self, data, x, **kwargs):
|
|
"""Estimate initial model parameter values from data."""
|
|
try:
|
|
popt1, pcov1 = curve_fit(dampingOscillation, x, data, np.array(0, 5, 5e2, 1e3, 16))
|
|
pars = self.make_params(center=popt1[0], amplitude=popt1[1], frequency=popt1[2], decay=popt1[3], offset=popt1[4])
|
|
except:
|
|
pars = self.make_params(center=0, amplitude=5.0, frequency=5e2, decay=1.0e3, offset=16.0)
|
|
|
|
return update_param_vals(pars, self.prefix, **kwargs)
|
|
|
|
|
|
class TwoGaussian2dModel(Model):
|
|
|
|
fwhm_factor = 2*np.sqrt(2*np.log(2))
|
|
height_factor = 1./2*np.pi
|
|
|
|
def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
|
|
self.helperModel = Gaussian2dModel()
|
|
|
|
super().__init__(two_gaussian2d, **kwargs)
|
|
|
|
self._set_paramhints_prefix()
|
|
|
|
def _set_paramhints_prefix(self):
|
|
self.set_param_hint('delta', value=-1, max=0)
|
|
self.set_param_hint('A_sigmax', expr=f'{self.prefix}delta + {self.prefix}B_sigmax')
|
|
|
|
def guess(self, data, x, y, negative=False, **kwargs):
|
|
pars_guess = guess_from_peak2d(self.helperModel, data, x, y, negative)
|
|
pars = self.make_params(A_amplitude=pars_guess['amplitude'].value, A_centerx=pars_guess['centerx'].value, A_centery=pars_guess['centery'].value,
|
|
A_sigmax=pars_guess['sigmax'].value, A_sigmay=pars_guess['sigmay'].value,
|
|
B_amplitude=pars_guess['amplitude'].value, B_centerx=pars_guess['centerx'].value, B_centery=pars_guess['centery'].value,
|
|
B_sigmax=pars_guess['sigmax'].value, B_sigmay=pars_guess['sigmay'].value)
|
|
|
|
pars[f'{self.prefix}A_sigmax'].set(expr=f'delta + {self.prefix}B_sigmax')
|
|
pars.add(f'{self.prefix}delta', value=-1, max=0, min=-np.inf, vary=True)
|
|
pars[f'{self.prefix}A_sigmay'].set(min=0.0)
|
|
pars[f'{self.prefix}B_sigmax'].set(min=0.0)
|
|
pars[f'{self.prefix}B_sigmay'].set(min=0.0)
|
|
|
|
return pars
|
|
|
|
|
|
class Polylog22dModel(Model):
|
|
|
|
fwhm_factor = 2*np.sqrt(2*np.log(2))
|
|
height_factor = 1./2*np.pi
|
|
|
|
def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
super().__init__(polylog2_2d, **kwargs)
|
|
self._set_paramhints_prefix()
|
|
|
|
def _set_paramhints_prefix(self):
|
|
self.set_param_hint('Rx', min=0)
|
|
self.set_param_hint('Ry', min=0)
|
|
|
|
def guess(self, data, x, y, negative=False, **kwargs):
|
|
"""Estimate initial model parameter values from data."""
|
|
pars = guess_from_peak2d(self, data, x, y, negative)
|
|
return update_param_vals(pars, self.prefix, **kwargs)
|
|
|
|
|
|
class ThomasFermi2dModel(Model):
|
|
|
|
fwhm_factor = 1
|
|
height_factor = 0.5
|
|
|
|
def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
super().__init__(ThomasFermi_2d, **kwargs)
|
|
self._set_paramhints_prefix()
|
|
|
|
def _set_paramhints_prefix(self):
|
|
self.set_param_hint('Rx', min=0)
|
|
self.set_param_hint('Ry', min=0)
|
|
|
|
def guess(self, data, x, y, negative=False, **kwargs):
|
|
"""Estimate initial model parameter values from data."""
|
|
pars = guess_from_peak2d(self, data, x, y, negative)
|
|
|
|
# amplitude = pars['amplitude'].value
|
|
# simgax = pars['sigmax'].value
|
|
# sigmay = pars['sigmay'].value
|
|
|
|
# pars['amplitude'].set(value=amplitude/s2pi/simgax/sigmay)
|
|
|
|
simgax = pars['sigmax'].value
|
|
sigmay = pars['sigmay'].value
|
|
pars['simgax'].set(value=simgax / 2.355)
|
|
pars['sigmay'].set(value=sigmay / 2.355)
|
|
|
|
return update_param_vals(pars, self.prefix, **kwargs)
|
|
|
|
|
|
class DensityProfileBEC2dModel(Model):
|
|
|
|
fwhm_factor = 2*np.sqrt(2*np.log(2))
|
|
height_factor = 1./2*np.pi
|
|
|
|
def __init__(self, independent_vars=['x', 'y'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
super().__init__(density_profile_BEC_2d, **kwargs)
|
|
self._set_paramhints_prefix()
|
|
|
|
def _set_paramhints_prefix(self):
|
|
self.set_param_hint('BEC_sigmax', min=0)
|
|
self.set_param_hint('BEC_sigmay', min=0)
|
|
self.set_param_hint('thermal_sigmax', min=0)
|
|
# self.set_param_hint('thermal_sigmay', min=0)
|
|
self.set_param_hint('BEC_amplitude', min=0)
|
|
self.set_param_hint('thermal_amplitude', min=0)
|
|
|
|
self.set_param_hint('thermalAspectRatio', min=0.8, max=1.2)
|
|
self.set_param_hint('thermal_sigmay', expr=f'{self.prefix}thermalAspectRatio * {self.prefix}thermal_sigmax')
|
|
|
|
self.set_param_hint('condensate_fraction', expr=f'{self.prefix}BEC_amplitude / ({self.prefix}BEC_amplitude + {self.prefix}thermal_amplitude)')
|
|
|
|
def guess(self, data, x, y, negative=False, pureBECThreshold=0.5, **kwargs):
|
|
"""Estimate initial model parameter values from data."""
|
|
fitModel = TwoGaussian2dModel()
|
|
pars = fitModel.guess(data, x=x, y=y, negative=negative)
|
|
fitResult = fitModel.fit(data, x=x, y=y, params=pars, **kwargs)
|
|
pars_guess = fitResult.params
|
|
|
|
BEC_amplitude = pars_guess['A_amplitude'].value
|
|
thermal_amplitude = pars_guess['B_amplitude'].value
|
|
|
|
pars = self.make_params(BEC_amplitude=BEC_amplitude,
|
|
thermal_amplitude=thermal_amplitude,
|
|
BEC_centerx=pars_guess['A_centerx'].value, BEC_centery=pars_guess['A_centery'].value,
|
|
BEC_sigmax=(pars_guess['A_sigmax'].value / 2.355), BEC_sigmay=(pars_guess['A_sigmay'].value / 2.355),
|
|
thermal_centerx=pars_guess['B_centerx'].value, thermal_centery=pars_guess['B_centery'].value,
|
|
thermal_sigmax=(pars_guess['B_sigmax'].value * s2),
|
|
thermalAspectRatio=(pars_guess['B_sigmax'].value * s2) / (pars_guess['B_sigmay'].value * s2)
|
|
# thermal_sigmay=(pars_guess['B_sigmay'].value * s2)
|
|
)
|
|
|
|
if BEC_amplitude / (thermal_amplitude + BEC_amplitude) > pureBECThreshold:
|
|
pars[f'{self.prefix}thermal_amplitude'].set(value=0)
|
|
pars[f'{self.prefix}BEC_amplitude'].set(value=(thermal_amplitude + BEC_amplitude))
|
|
|
|
return update_param_vals(pars, self.prefix, **kwargs)
|
|
|
|
|
|
class NewFitModel(Model):
|
|
|
|
def __init__(self, func, independent_vars=['x'], prefix='', nan_policy='raise',
|
|
**kwargs):
|
|
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
|
|
'independent_vars': independent_vars})
|
|
|
|
super().__init__(func, **kwargs)
|
|
|
|
def guess(self, *args, **kwargs):
|
|
return self.make_params()
|
|
|
|
|
|
lmfit_models = {'Constant': ConstantModel,
|
|
'Complex Constant': ComplexConstantModel,
|
|
'Linear': LinearModel,
|
|
'Quadratic': QuadraticModel,
|
|
'Polynomial': PolynomialModel,
|
|
'Gaussian': GaussianModel,
|
|
'Gaussian-2D': Gaussian2dModel,
|
|
'Lorentzian': LorentzianModel,
|
|
'Split-Lorentzian': SplitLorentzianModel,
|
|
'Voigt': VoigtModel,
|
|
'PseudoVoigt': PseudoVoigtModel,
|
|
'Moffat': MoffatModel,
|
|
'Pearson7': Pearson7Model,
|
|
'StudentsT': StudentsTModel,
|
|
'Breit-Wigner': BreitWignerModel,
|
|
'Log-Normal': LognormalModel,
|
|
'Damped Oscillator': DampedOscillatorModel,
|
|
'Damped Harmonic Oscillator': DampedHarmonicOscillatorModel,
|
|
'Exponential Gaussian': ExponentialGaussianModel,
|
|
'Skewed Gaussian': SkewedGaussianModel,
|
|
'Skewed Voigt': SkewedVoigtModel,
|
|
'Thermal Distribution': ThermalDistributionModel,
|
|
'Doniach': DoniachModel,
|
|
'Power Law': PowerLawModel,
|
|
'Exponential': ExponentialModel,
|
|
'Step': StepModel,
|
|
'Rectangle': RectangleModel,
|
|
'Expression': ExpressionModel,
|
|
'Gaussian With Offset':GaussianWithOffsetModel,
|
|
'Lorentzian With Offset':LorentzianWithOffsetModel,
|
|
'Expansion':ExpansionModel,
|
|
'Damping Oscillation Model':DampingOscillationModel,
|
|
'Two Gaussian-2D':TwoGaussian2dModel,
|
|
}
|
|
|
|
|
|
class FitAnalyser():
|
|
|
|
def __init__(self, fitModel, fitDim=1, **kwargs) -> None:
|
|
|
|
if isinstance(fitModel, str):
|
|
self.fitModel = lmfit_models[fitModel](**kwargs)
|
|
else:
|
|
self.fitModel = fitModel
|
|
|
|
self.fitDim = fitDim
|
|
|
|
def print_params_set_template(self, params=None):
|
|
|
|
if params is None:
|
|
params = self.fitModel.make_params()
|
|
|
|
for key in params:
|
|
res = "params.add("
|
|
res += "name=\"" + key + "\", "
|
|
if not params[key].expr is None:
|
|
res += "expr=" + params[key].expr +")"
|
|
else:
|
|
res += "value=" + f'{params[key].value:3g}' + ", "
|
|
if str(params[key].max)=="inf":
|
|
res += "max=np.inf, "
|
|
else:
|
|
res += "max=" + f'{params[key].max:3g}' + ", "
|
|
if str(params[key].min)=="-inf":
|
|
res += "min=-np.inf, "
|
|
else:
|
|
res += "min=" + f'{params[key].min:3g}' + ", "
|
|
res += "vary=" + str(params[key].vary) + ")"
|
|
print(res)
|
|
|
|
def _guess_1D(self, data, x, **kwargs):
|
|
return self.fitModel.guess(data=data, x=x, **kwargs)
|
|
|
|
def _guess_2D(self, data, x, y, **kwargs):
|
|
data = data.flatten(order='F')
|
|
return self.fitModel.guess(data=data, x=x, y=y, **kwargs)
|
|
|
|
def guess(self, dataArray, x=None, y=None, guess_kwargs={}, input_core_dims=None, dask='parallelized', vectorize=True, keep_attrs=True, daskKwargs=None, **kwargs):
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask": dask,
|
|
"vectorize": vectorize,
|
|
"input_core_dims": input_core_dims,
|
|
'keep_attrs': keep_attrs,
|
|
|
|
}
|
|
)
|
|
|
|
if not daskKwargs is None:
|
|
kwargs.update({"dask_gufunc_kwargs": daskKwargs})
|
|
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [['x']],
|
|
}
|
|
)
|
|
|
|
if x is None:
|
|
if 'x' in dataArray.dims:
|
|
x = dataArray['x'].to_numpy()
|
|
else:
|
|
if isinstance(x, str):
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [[x]],
|
|
}
|
|
)
|
|
x = dataArray[x].to_numpy()
|
|
|
|
if self.fitDim == 1:
|
|
|
|
guess_kwargs.update(
|
|
{
|
|
'x':x,
|
|
}
|
|
)
|
|
|
|
return xr.apply_ufunc(self._guess_1D, dataArray, kwargs=guess_kwargs,
|
|
output_dtypes=[type(self.fitModel.make_params())],
|
|
**kwargs
|
|
)
|
|
|
|
if self.fitDim == 2:
|
|
|
|
if y is None:
|
|
if 'y' in dataArray.dims:
|
|
y = dataArray['y'].to_numpy()
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [['x', 'y']],
|
|
}
|
|
)
|
|
else:
|
|
if isinstance(y, str):
|
|
kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y)
|
|
y = dataArray[y].to_numpy()
|
|
elif input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [['x', 'y']],
|
|
}
|
|
)
|
|
|
|
_x, _y = np.meshgrid(x, y)
|
|
_x = _x.flatten()
|
|
_y = _y.flatten()
|
|
|
|
# dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1]))
|
|
|
|
# kwargs["input_core_dims"][0] = ['_z']
|
|
|
|
guess_kwargs.update(
|
|
{
|
|
'x':_x,
|
|
'y':_y,
|
|
}
|
|
)
|
|
|
|
return xr.apply_ufunc(self._guess_2D, dataArray, kwargs=guess_kwargs,
|
|
output_dtypes=[type(self.fitModel.make_params())],
|
|
**kwargs
|
|
)
|
|
|
|
def _fit_1D(self, data, params, x):
|
|
return self.fitModel.fit(data=data, x=x, params=params, nan_policy='omit')
|
|
|
|
def _fit_2D(self, data, params, x, y):
|
|
data = data.flatten(order='F')
|
|
return self.fitModel.fit(data=data, x=x, y=y, params=params, nan_policy='omit')
|
|
|
|
def fit(self, dataArray, paramsArray, x=None, y=None, input_core_dims=None, dask='parallelized', vectorize=True, keep_attrs=True, daskKwargs=None, **kwargs):
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask": dask,
|
|
"vectorize": vectorize,
|
|
"input_core_dims": input_core_dims,
|
|
'keep_attrs': keep_attrs,
|
|
}
|
|
)
|
|
|
|
if not daskKwargs is None:
|
|
kwargs.update({"dask_gufunc_kwargs": daskKwargs})
|
|
|
|
if isinstance(paramsArray, type(self.fitModel.make_params())):
|
|
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [['x']],
|
|
}
|
|
)
|
|
|
|
if x is None:
|
|
if 'x' in dataArray.dims:
|
|
x = dataArray['x'].to_numpy()
|
|
else:
|
|
if isinstance(x, str):
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [[x]],
|
|
}
|
|
)
|
|
x = dataArray[x].to_numpy()
|
|
|
|
if self.fitDim == 1:
|
|
return xr.apply_ufunc(self._fit_1D, dataArray, kwargs={'params':paramsArray,'x':x},
|
|
output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))],
|
|
**kwargs)
|
|
|
|
if self.fitDim == 2:
|
|
|
|
if y is None:
|
|
if 'y' in dataArray.dims:
|
|
y = dataArray['y'].to_numpy()
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [['x', 'y']],
|
|
}
|
|
)
|
|
else:
|
|
if isinstance(y, str):
|
|
kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y)
|
|
y = dataArray[y].to_numpy()
|
|
elif input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [['x', 'y']],
|
|
}
|
|
)
|
|
|
|
_x, _y = np.meshgrid(x, y)
|
|
_x = _x.flatten()
|
|
_y = _y.flatten()
|
|
|
|
# dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1]))
|
|
|
|
# kwargs["input_core_dims"][0] = ['_z']
|
|
|
|
return xr.apply_ufunc(self._fit_2D, dataArray, kwargs={'params':paramsArray,'x':_x, 'y':_y},
|
|
output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))],
|
|
**kwargs)
|
|
|
|
else:
|
|
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [['x'], []],
|
|
}
|
|
)
|
|
|
|
if x is None:
|
|
if 'x' in dataArray.dims:
|
|
x = dataArray['x'].to_numpy()
|
|
else:
|
|
if isinstance(x, str):
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [[x], []],
|
|
}
|
|
)
|
|
x = dataArray[x].to_numpy()
|
|
|
|
if self.fitDim == 1:
|
|
return xr.apply_ufunc(self._fit_1D, dataArray, paramsArray, kwargs={'x':x},
|
|
output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))],
|
|
**kwargs)
|
|
|
|
if self.fitDim == 2:
|
|
|
|
if input_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"input_core_dims": [['x', 'y'], []],
|
|
}
|
|
)
|
|
|
|
if y is None:
|
|
if 'y' in dataArray.dims:
|
|
y = dataArray['y'].to_numpy()
|
|
else:
|
|
if isinstance(y, str):
|
|
y = dataArray[y].to_numpy()
|
|
kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y)
|
|
|
|
_x, _y = np.meshgrid(x, y)
|
|
_x = _x.flatten()
|
|
_y = _y.flatten()
|
|
|
|
# dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1]))
|
|
|
|
# kwargs["input_core_dims"][0] = ['_z']
|
|
|
|
return xr.apply_ufunc(self._fit_2D, dataArray, paramsArray, kwargs={'x':_x, 'y':_y},
|
|
output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))],
|
|
**kwargs)
|
|
|
|
def _eval_1D(self, fitResult, x):
|
|
return self.fitModel.eval(x=x, **fitResult.best_values)
|
|
|
|
def _eval_2D(self, fitResult, x, y, shape):
|
|
res = self.fitModel.eval(x=x, y=y, **fitResult.best_values)
|
|
return res.reshape(shape, order='F')
|
|
|
|
def eval(self, fitResultArray, x=None, y=None, output_core_dims=None, prefix="", dask='parallelized', vectorize=True, daskKwargs=None, **kwargs):
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask": dask,
|
|
"vectorize": vectorize,
|
|
"output_core_dims": output_core_dims,
|
|
}
|
|
)
|
|
|
|
if daskKwargs is None:
|
|
daskKwargs = {}
|
|
|
|
if self.fitDim == 1:
|
|
|
|
if output_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"output_core_dims": prefix+'x',
|
|
}
|
|
)
|
|
output_core_dims = [prefix+'x']
|
|
|
|
daskKwargs.update(
|
|
{
|
|
'output_sizes': {
|
|
output_core_dims[0]: np.size(x),
|
|
},
|
|
'meta': np.ndarray((0,0), dtype=float)
|
|
}
|
|
)
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask_gufunc_kwargs": daskKwargs,
|
|
}
|
|
)
|
|
|
|
res = xr.apply_ufunc(self._eval_1D, fitResultArray, kwargs={"x":x}, **kwargs)
|
|
return res.assign_coords({prefix+'x':np.array(x)})
|
|
|
|
if self.fitDim == 2:
|
|
if output_core_dims is None:
|
|
kwargs.update(
|
|
{
|
|
"output_core_dims": [[prefix+'x', prefix+'y']],
|
|
}
|
|
)
|
|
output_core_dims = [prefix+'x', prefix+'y']
|
|
|
|
daskKwargs.update(
|
|
{
|
|
'output_sizes': {
|
|
output_core_dims[0]: np.size(x),
|
|
output_core_dims[1]: np.size(y),
|
|
},
|
|
'meta': np.ndarray((0,0), dtype=float)
|
|
},
|
|
)
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask_gufunc_kwargs": daskKwargs,
|
|
}
|
|
)
|
|
|
|
_x, _y = np.meshgrid(x, y)
|
|
_x = _x.flatten()
|
|
_y = _y.flatten()
|
|
|
|
res = xr.apply_ufunc(self._eval_2D, fitResultArray, kwargs={"x":_x, "y":_y, "shape":(len(x), len(y))}, **kwargs)
|
|
return res.assign_coords({prefix+'x':np.array(x), prefix+'y':np.array(y)})
|
|
|
|
def _get_fit_value_single(self, fitResult, key):
|
|
return fitResult.params[key].value
|
|
|
|
def _get_fit_value(self, fitResult, params):
|
|
func = np.vectorize(self._get_fit_value_single)
|
|
res = tuple(
|
|
func(fitResult, key)
|
|
for key in params
|
|
)
|
|
|
|
return res
|
|
|
|
def get_fit_value(self, fitResult, dask='parallelized', **kwargs):
|
|
firstIndex = {
|
|
key: fitResult[key][0]
|
|
for key in fitResult.dims
|
|
}
|
|
firstFitResult = fitResult.sel(firstIndex).item()
|
|
|
|
params = list(firstFitResult.params.keys())
|
|
|
|
output_core_dims=[ [] for _ in range(len(params))]
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask": dask,
|
|
"output_core_dims": output_core_dims,
|
|
}
|
|
)
|
|
|
|
value = xr.apply_ufunc(self._get_fit_value, fitResult, kwargs=dict(params=params), **kwargs)
|
|
|
|
value = xr.Dataset(
|
|
data_vars={
|
|
params[i]: value[i]
|
|
for i in range(len(params))
|
|
},
|
|
attrs=fitResult.attrs
|
|
)
|
|
|
|
return value
|
|
|
|
def _get_fit_std_single(self, fitResult, key):
|
|
return fitResult.params[key].stderr
|
|
|
|
def _get_fit_std(self, fitResult, params):
|
|
func = np.vectorize(self._get_fit_std_single)
|
|
res = tuple(
|
|
func(fitResult, key)
|
|
for key in params
|
|
)
|
|
|
|
return res
|
|
|
|
def get_fit_std(self, fitResult, dask='parallelized', **kwargs):
|
|
firstIndex = {
|
|
key: fitResult[key][0]
|
|
for key in fitResult.dims
|
|
}
|
|
firstFitResult = fitResult.sel(firstIndex).item()
|
|
|
|
params = list(firstFitResult.params.keys())
|
|
|
|
output_core_dims=[ [] for _ in range(len(params))]
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask": dask,
|
|
"output_core_dims": output_core_dims,
|
|
}
|
|
)
|
|
|
|
value = xr.apply_ufunc(self._get_fit_std, fitResult, kwargs=dict(params=params), **kwargs)
|
|
|
|
value = xr.Dataset(
|
|
data_vars={
|
|
params[i]: value[i]
|
|
for i in range(len(params))
|
|
},
|
|
attrs=fitResult.attrs
|
|
)
|
|
|
|
return value
|
|
|
|
def _get_fit_full_result_single(self, fitResult, key):
|
|
|
|
if not fitResult.params[key].value is None:
|
|
value = fitResult.params[key].value
|
|
else:
|
|
value = np.nan
|
|
|
|
if not fitResult.params[key].stderr is None:
|
|
std = fitResult.params[key].stderr
|
|
else:
|
|
std = np.nan
|
|
|
|
return ufloat(value, std)
|
|
|
|
def _get_fit_full_result(self, fitResult, params):
|
|
func = np.vectorize(self._get_fit_full_result_single)
|
|
res = tuple(
|
|
func(fitResult, key)
|
|
for key in params
|
|
)
|
|
|
|
return res
|
|
|
|
def get_fit_full_result(self, fitResult, dask='parallelized', **kwargs):
|
|
firstIndex = {
|
|
key: fitResult[key][0]
|
|
for key in fitResult.dims
|
|
}
|
|
firstFitResult = fitResult.sel(firstIndex).item()
|
|
|
|
params = list(firstFitResult.params.keys())
|
|
|
|
output_core_dims=[ [] for _ in range(len(params))]
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask": dask,
|
|
"output_core_dims": output_core_dims,
|
|
}
|
|
)
|
|
|
|
value = xr.apply_ufunc(self._get_fit_full_result, fitResult, kwargs=dict(params=params), **kwargs)
|
|
|
|
value = xr.Dataset(
|
|
data_vars={
|
|
params[i]: value[i]
|
|
for i in range(len(params))
|
|
},
|
|
attrs=fitResult.attrs
|
|
)
|
|
|
|
return value
|
|
|
|
|
|
|