finish of fitting

This commit is contained in:
Jianshun Gao 2023-05-04 13:47:33 +02:00
parent 8aa180c276
commit c049919d9d
6 changed files with 1827 additions and 173 deletions

490
Analyser/FitAnalyser.py Normal file
View File

@ -0,0 +1,490 @@
import numpy as np
import lmfit
from lmfit.models import (ConstantModel, ComplexConstantModel, LinearModel, QuadraticModel,
PolynomialModel, SineModel, GaussianModel, Gaussian2dModel, LorentzianModel,
SplitLorentzianModel, VoigtModel, PseudoVoigtModel, MoffatModel,
Pearson7Model, StudentsTModel, BreitWignerModel, LognormalModel,
DampedOscillatorModel, ExponentialGaussianModel, SkewedGaussianModel,
SkewedVoigtModel, ThermalDistributionModel, DoniachModel, PowerLawModel,
ExponentialModel, StepModel, RectangleModel, ExpressionModel, DampedHarmonicOscillatorModel)
from lmfit.models import (guess_from_peak, guess_from_peak2d, fwhm_expr, height_expr,
update_param_vals)
from lmfit.lineshapes import (not_zero, breit_wigner, damped_oscillator, dho, doniach,
expgaussian, exponential, gaussian, gaussian2d,
linear, lognormal, lorentzian, moffat, parabolic,
pearson7, powerlaw, pvoigt, rectangle, sine,
skewed_gaussian, skewed_voigt, split_lorentzian, step,
students_t, thermal_distribution, tiny, voigt)
from lmfit import Model
import numpy as np
from numpy import (arctan, copysign, cos, exp, isclose, isnan, log, pi, real,
sin, sqrt, where)
from scipy.special import erf, erfc
from scipy.special import gamma as gamfcn
from scipy.special import wofz
from scipy.optimize import curve_fit
import xarray as xr
log2 = log(2)
s2pi = sqrt(2*pi)
s2 = sqrt(2.0)
def gaussianWithOffset(x, amplitude=1.0, center=0.0, sigma=1.0, offset=0.0):
"""Return a 1-dimensional Gaussian function with an offset.
gaussian(x, amplitude, center, sigma) =
(amplitude/(s2pi*sigma)) * exp(-(1.0*x-center)**2 / (2*sigma**2))
"""
return ((amplitude/(max(tiny, s2pi*sigma)))
* exp(-(1.0*x-center)**2 / max(tiny, (2*sigma**2))) + offset)
def lorentzianWithOffset(x, amplitude=1.0, center=0.0, sigma=1.0, offset=0.0):
return ((amplitude/(1 + ((1.0*x-center)/max(tiny, sigma))**2))
/ max(tiny, (pi*sigma)) + offset)
def exponentialWithOffset(x, amplitude=1.0, decay=1.0, offset=0.0):
decay = not_zero(decay)
return amplitude * exp(-x/decay) + offset
def expansion(x, amplitude=1.0, offset=0.0):
return np.sqrt(amplitude*x*x + offset)
def dampingOscillation(x, center=0, amplitude=1.0, frequency=1.0, decay=1.0, offset=0.0):
return amplitude * np.exp(-decay*x)*np.sin(2*np.pi*frequency*(x-center)) + offset
def double_structure(x, x1=0.25, x2=0.75, amplitude=1.0, center=0.0, sigma=1.0, a=-1.0, b=0, c=0):
y = np.zeros(x.shape)
return ((amplitude/(max(tiny, s2pi*sigma)))
* exp(-(1.0*x-center)**2 / max(tiny, (2*sigma**2))))
class GaussianWithOffsetModel(Model):
fwhm_factor = 2*np.sqrt(2*np.log(2))
height_factor = 1./np.sqrt(2*np.pi)
def __init__(self, independent_vars=['x'], nan_policy='raise', prefix='', name=None, **kwargs):
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
'independent_vars': independent_vars})
super().__init__(gaussianWithOffset, **kwargs)
self._set_paramhints_prefix()
def _set_paramhints_prefix(self):
self.set_param_hint('sigma', min=0)
self.set_param_hint('fwhm', expr=fwhm_expr(self))
self.set_param_hint('height', expr=height_expr(self))
def guess(self, data, x, negative=False, **kwargs):
offset = np.min(data)
data = data - offset
pars = guess_from_peak(self, data, x, negative)
pars.add('offset', value=offset)
return update_param_vals(pars, self.prefix, **kwargs)
class LorentzianWithOffsetModel(Model):
fwhm_factor = 2.0
height_factor = 1./np.pi
def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise',
**kwargs):
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
'independent_vars': independent_vars})
super().__init__(lorentzianWithOffset, **kwargs)
self._set_paramhints_prefix()
def _set_paramhints_prefix(self):
self.set_param_hint('sigma', min=0)
self.set_param_hint('fwhm', expr=fwhm_expr(self))
self.set_param_hint('height', expr=height_expr(self))
def guess(self, data, x, negative=False, **kwargs):
"""Estimate initial model parameter values from data."""
offset = np.min(data)
data = data - offset
pars = guess_from_peak(self, data, x, negative, ampscale=1.25)
pars.add('offset', value=offset)
return update_param_vals(pars, self.prefix, **kwargs)
class ExponentialWithOffsetModel(Model):
def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise',
**kwargs):
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
'independent_vars': independent_vars})
super().__init__(exponentialWithOffset, **kwargs)
def guess(self, data, x, **kwargs):
"""Estimate initial model parameter values from data."""
offset = np.min(data)
data = data - offset
try:
sval, oval = np.polyfit(x, np.log(abs(data)+1.e-15), 1)
except TypeError:
sval, oval = 1., np.log(abs(max(data)+1.e-9))
pars = self.make_params(amplitude=np.exp(oval), decay=-1.0/sval)
pars.add('offset', value=offset)
return update_param_vals(pars, self.prefix, **kwargs)
class ExpansionModel(Model):
def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise',
**kwargs):
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
'independent_vars': independent_vars})
super().__init__(expansion, **kwargs)
def guess(self, data, x, **kwargs):
"""Estimate initial model parameter values from data."""
popt1, pcov1 = curve_fit(expansion, x, data)
pars = self.make_params(amplitude=popt1[0], offset=popt1[1])
return update_param_vals(pars, self.prefix, **kwargs)
class DampingOscillationModel(Model):
def __init__(self, independent_vars=['x'], prefix='', nan_policy='raise',
**kwargs):
kwargs.update({'prefix': prefix, 'nan_policy': nan_policy,
'independent_vars': independent_vars})
super().__init__(dampingOscillation, **kwargs)
def guess(self, data, x, **kwargs):
"""Estimate initial model parameter values from data."""
try:
popt1, pcov1 = curve_fit(dampingOscillation, x, data, np.array(0, 5, 5e2, 1e3, 16))
pars = self.make_params(center=popt1[0], amplitude=popt1[1], frequency=popt1[2], decay=popt1[3], offset=popt1[4])
except:
pars = self.make_params(center=0, amplitude=5.0, frequency=5e2, decay=1.0e3, offset=16.0)
return update_param_vals(pars, self.prefix, **kwargs)
lmfit_models = {'Constant': ConstantModel,
'Complex Constant': ComplexConstantModel,
'Linear': LinearModel,
'Quadratic': QuadraticModel,
'Polynomial': PolynomialModel,
'Gaussian': GaussianModel,
'Gaussian-2D': Gaussian2dModel,
'Lorentzian': LorentzianModel,
'Split-Lorentzian': SplitLorentzianModel,
'Voigt': VoigtModel,
'PseudoVoigt': PseudoVoigtModel,
'Moffat': MoffatModel,
'Pearson7': Pearson7Model,
'StudentsT': StudentsTModel,
'Breit-Wigner': BreitWignerModel,
'Log-Normal': LognormalModel,
'Damped Oscillator': DampedOscillatorModel,
'Damped Harmonic Oscillator': DampedHarmonicOscillatorModel,
'Exponential Gaussian': ExponentialGaussianModel,
'Skewed Gaussian': SkewedGaussianModel,
'Skewed Voigt': SkewedVoigtModel,
'Thermal Distribution': ThermalDistributionModel,
'Doniach': DoniachModel,
'Power Law': PowerLawModel,
'Exponential': ExponentialModel,
'Step': StepModel,
'Rectangle': RectangleModel,
'Expression': ExpressionModel,
'Gaussian With Offset':GaussianWithOffsetModel,
'Lorentzian With Offset':LorentzianWithOffsetModel,
'Expansion':ExpansionModel,
'Damping Oscillation Model':DampingOscillationModel
}
class FitAnalyser():
def __init__(self, fitModel, fitDim=1, **kwargs) -> None:
if isinstance(fitModel, str):
self.fitModel = lmfit_models[fitModel](**kwargs)
else:
self.fitModel = fitModel
self.fitDim = fitDim
def _guess_1D(self, data, x):
return self.fitModel.guess(data=data, x=x)
def _guess_2D(self, data, x, y):
return self.fitModel.guess(data=data, x=x, y=y)
def guess(self, dataArray, x=None, y=None, input_core_dims=None, dask='parallelized', vectorize=True, **kwargs):
kwargs.update(
{
"dask": dask,
"vectorize": vectorize,
"input_core_dims": input_core_dims
}
)
if input_core_dims is None:
kwargs.update(
{
"input_core_dims": [['x']],
}
)
if x is None:
if 'x' in dataArray.dims:
x = dataArray['x'].to_numpy()
else:
if isinstance(x, str):
x = dataArray[x].to_numpy()
if input_core_dims is None:
kwargs.update(
{
"input_core_dims": [[x]],
}
)
if self.fitDim == 1:
return xr.apply_ufunc(self._guess_1D, dataArray, kwargs={'x':x},
output_dtypes=[type(self.fitModel.make_params())],
**kwargs
)
if self.fitDim == 2:
if y is None:
if 'y' in dataArray.dims:
y = dataArray['y'].to_numpy()
if input_core_dims is None:
kwargs.update(
{
"input_core_dims": [['x', 'y']],
}
)
else:
if isinstance(y, str):
y = dataArray[y].to_numpy()
kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y)
elif input_core_dims is None:
kwargs.update(
{
"input_core_dims": [['x', 'y']],
}
)
_x, _y = np.meshgrid(x, y)
_x = _x.flatten()
_y = _y.flatten()
dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1]))
kwargs["input_core_dims"][0] = ['_z']
return xr.apply_ufunc(self._guess_2D, dataArray, kwargs={'x':_x, 'y':_y},
output_dtypes=[type(self.fitModel.make_params())],
**kwargs
)
def _fit_1D(self, data, params, x):
# try:
return self.fitModel.fit(data=data, x=x, params=params)
def _fit_2D(self, data, params, x, y):
return self.fitModel.fit(data=data, x=x, y=y, params=params)
def fit(self, dataArray, paramsArray, x=None, y=None, input_core_dims=None, dask='parallelized', vectorize=True, **kwargs):
kwargs.update(
{
"dask": dask,
"vectorize": vectorize,
"input_core_dims": input_core_dims,
}
)
if input_core_dims is None:
kwargs.update(
{
"input_core_dims": [['x'], []],
}
)
if x is None:
if 'x' in dataArray.dims:
x = dataArray['x'].to_numpy()
else:
if isinstance(x, str):
x = dataArray[x].to_numpy()
if input_core_dims is None:
kwargs.update(
{
"input_core_dims": [[x], []],
}
)
if isinstance(paramsArray, type(self.fitModel.make_params())):
if self.fitDim == 1:
return xr.apply_ufunc(self._fit_1D, dataArray, kwargs={'params':paramsArray,'x':x},
output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))],
**kwargs)
if self.fitDim == 2:
if y is None:
if 'y' in dataArray.dims:
y = dataArray['y'].to_numpy()
if input_core_dims is None:
kwargs.update(
{
"input_core_dims": [['x', 'y'], []],
}
)
else:
if isinstance(y, str):
y = dataArray[y].to_numpy()
kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y)
elif input_core_dims is None:
kwargs.update(
{
"input_core_dims": [['x', 'y'], []],
}
)
_x, _y = np.meshgrid(x, y)
_x = _x.flatten()
_y = _y.flatten()
dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1]))
kwargs["input_core_dims"][0] = ['_z']
return xr.apply_ufunc(self._fit_2D, dataArray, kwargs={'params':paramsArray,'x':_x, 'y':_y},
output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))],
**kwargs)
else:
if self.fitDim == 1:
return xr.apply_ufunc(self._fit_1D, dataArray, paramsArray, kwargs={'x':x},
output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))],
**kwargs)
if self.fitDim == 2:
if input_core_dims is None:
kwargs.update(
{
"input_core_dims": [['x', 'y'], []],
}
)
if y is None:
if 'y' in dataArray.dims:
y = dataArray['y'].to_numpy()
else:
if isinstance(y, str):
y = dataArray[y].to_numpy()
kwargs["input_core_dims"][0] = np.append(kwargs["input_core_dims"][0], y)
_x, _y = np.meshgrid(x, y)
_x = _x.flatten()
_y = _y.flatten()
dataArray = dataArray.stack(_z=(kwargs["input_core_dims"][0][0], kwargs["input_core_dims"][0][1]))
kwargs["input_core_dims"][0] = ['_z']
return xr.apply_ufunc(self._fit_2D, dataArray, paramsArray, kwargs={'x':_x, 'y':_y},
output_dtypes=[type(lmfit.model.ModelResult(self.fitModel, self.fitModel.make_params()))],
**kwargs)
def _eval_1D(self, fitResult, x):
return self.fitModel.eval(x=x, **fitResult.best_values)
def _eval_2D(self, fitResult, x, y, shape):
res = self.fitModel.eval(x=x, y=y, **fitResult.best_values)
return res.reshape(shape)
def eval(self, fitResultArray, x=None, y=None, output_core_dims=None, prefix="", dask='parallelized', vectorize=True, **kwargs):
kwargs.update(
{
"dask": dask,
"vectorize": vectorize,
"output_core_dims": output_core_dims,
}
)
if self.fitDim == 1:
if output_core_dims is None:
kwargs.update(
{
"output_core_dims": [[prefix+'x']],
"output_dtypes": float,
}
)
output_core_dims = [prefix+'x']
kwargs.update(
{
"dask_gufunc_kwargs": {
'output_sizes': {
output_core_dims[0]: np.size(x),
},
},
}
)
return xr.apply_ufunc(self._eval_1D, fitResultArray, kwargs={"x":x}, **kwargs)
if self.fitDim == 2:
if output_core_dims is None:
kwargs.update(
{
"output_core_dims": [[prefix+'x', prefix+'y']],
"output_dtypes": float,
}
)
output_core_dims = [prefix+'x', prefix+'y']
kwargs.update(
{
"dask_gufunc_kwargs": {
'output_sizes': {
output_core_dims[0]: np.size(x),
output_core_dims[1]: np.size(y),
},
# 'output_dtypes': {
# output_core_dims[0]: float,
# output_core_dims[1]: float,
# },
},
}
)
_x, _y = np.meshgrid(x, y)
_x = _x.flatten()
_y = _y.flatten()
return xr.apply_ufunc(self._eval_2D, fitResultArray, kwargs={"x":_x, "y":_y, "shape":(len(x), len(y))}, **kwargs)

130
Analyser/ImagingAnalyser.py Normal file
View File

@ -0,0 +1,130 @@
import numpy as np
import xarray as xr
class ImageAnalyser():
def __init__(self) -> None:
self._image_name = {
'atoms': 'atoms',
'background': 'background',
'dark': 'dark',
'OD':'OD',
}
self._center = None
self._span = None
self._fraction = None
@property
def image_name(self):
return self._image_name
@image_name.setter
def image_name(self, value):
self._image_name.update(value)
@property
def center(self):
return self._center
@center.setter
def center(self, value):
self._center = value
@property
def span(self):
return self._span
@span.setter
def span(self, value):
self._span = value
@property
def fraction(self):
return self._fraction
@fraction.setter
def fraction(self, value):
self._fraction = value
def get_offset_from_corner(self, dataArray, x_fraction=None, y_fraction=None, fraction=None, xAxisName='x', yAxisName='y'):
if fraction is None:
if x_fraction is None:
x_fraction = self._fraction[0]
if y_fraction is None:
y_fraction = self._fraction[1]
else:
x_fraction = fraction[0]
y_fraction = fraction[1]
x_number = dataArray[xAxisName].shape[0]
y_number = dataArray[yAxisName].shape[0]
mean = dataArray.isel(x=slice(0, int(x_number * x_fraction)), y=slice(0 , int(y_number * y_fraction))).mean(dim=[xAxisName, yAxisName])
mean += dataArray.isel(x=slice(0, int(x_number * x_fraction)), y=slice(int(y_number - y_number * y_fraction) , int(y_number))).mean(dim=[xAxisName, yAxisName])
mean += dataArray.isel(x=slice(int(x_number - x_number * x_fraction) , int(x_number)), y=slice(0 , int(y_number * y_fraction))).mean(dim=[xAxisName, yAxisName])
mean += dataArray.isel(x=slice(int(x_number - x_number * x_fraction) , int(x_number)), y=slice(int(y_number - y_number * y_fraction) , int(y_number))).mean(dim=[xAxisName, yAxisName])
return mean / 4
def substract_offset(self, dataArray, **kwargs):
return dataArray - self.get_offset_from_corner(dataArray, **kwargs)
def crop_image(self, dataset, center=None, span=None):
if center is None:
center = self._center
if span is None:
span = self._span
x_start = int(center[0] - span[0] / 2)
x_end = int(center[0] + span[0] / 2)
y_end = int(center[1] + span[1] / 2)
y_start = int(center[1] - span[1] / 2)
return dataset.isel(x=slice(x_start, x_end), y=slice(y_start, y_end))
def get_OD(self, imageAtom, imageBackground, imageDrak):
numerator = np.atleast_1d(imageBackground - imageDrak)
denominator = np.atleast_1d(imageAtom - imageDrak)
numerator[numerator == 0] = 1
denominator[denominator == 0] = 1
imageOD = np.abs(np.divide(denominator, numerator))
imageOD= -np.log(imageOD)
if len(imageOD) == 1:
return imageOD[0]
else:
return imageOD
def get_Ncount(self, imageOD):
return np.sum(imageOD)
def get_absorption_images(self, dataset, dask='allowed', **kwargs):
kwargs.update(
{'dask': dask}
)
dataset = dataset.assign(
{
self._image_name['OD']: xr.apply_ufunc(self.get_OD, dataset[self._image_name['atoms']], dataset[self._image_name['background']], dataset[self._image_name['dark']], **kwargs)
}
)
return dataset
def remove_background(self, dataset, dask='allowed', **kwargs):
kwargs.update(
{'dask': dask}
)
xr.apply_ufunc(self.get_OD, dataset[self._image_name['atoms']], dataset[self._image_name['background']], dataset[self._image_name['dark']], **kwargs)

View File

@ -96,6 +96,14 @@ def _assign_scan_axis_partial(x, datesetOfGlobal):
for key in scanAxis
}
)
def _update_globals_attrs(variable_attrs, context=None):
pass
def update_hdf5_file():
pass
def read_hdf5_file(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", **kwargs):

1354
test.ipynb

File diff suppressed because one or more lines are too long

18
test.py Normal file
View File

@ -0,0 +1,18 @@
from DataContainer.ReadData import read_hdf5_file
# filepath = "//DyLabNAS/Data/Evaporative_Cooling/2023/04/18/0003/*.h5"
# filepath = "//DyLabNAS/Data/Evaporative_Cooling/2023/04/18/0003/2023-04-18_0003_Evaporative_Cooling_000.h5"
filepath = "//DyLabNAS/Data/Repetition_scan/2023/04/21/0000/*.h5"
groupList = [
"images/MOT_3D_Camera/in_situ_absorption",
"images/ODT_1_Axis_Camera/in_situ_absorption"
]
prefix = [
"camera_0",
"camera_1"
]
ds = read_hdf5_file(filepath, groupList)