108 lines
2.3 KiB
Python
108 lines
2.3 KiB
Python
import glob
|
|
from datetime import date
|
|
|
|
import numpy as np
|
|
from uncertainties import unumpy as unp
|
|
|
|
import xarray as xr
|
|
|
|
|
|
def get_mask(dataArray):
|
|
return np.ones(dataArray.shape, dtype=bool)
|
|
|
|
|
|
def remove_bad_shots(dataArray, **kwargs):
|
|
dataArray.loc[dict(kwargs)] = np.nan
|
|
|
|
|
|
def auto_rechunk(dataSet):
|
|
kwargs = {
|
|
key: "auto"
|
|
for key in dataSet.dims
|
|
}
|
|
return dataSet.chunk(**kwargs)
|
|
|
|
|
|
def copy_chunk(dataSet, dataChunk):
|
|
kwargs = {
|
|
key: dataChunk.chunksizes[key]
|
|
for key in dataChunk.chunksizes
|
|
if key in dataSet.dims
|
|
}
|
|
return dataSet.chunk(**kwargs)
|
|
|
|
|
|
def get_h5_file_path(folderpath, maxFileNum=None, filename='*.h5',):
|
|
filepath = np.sort(glob.glob(folderpath + filename))
|
|
if maxFileNum is None:
|
|
return filepath
|
|
else:
|
|
return filepath[:maxFileNum]
|
|
|
|
|
|
def get_date():
|
|
today = date.today()
|
|
return today.strftime("%Y/%m/%d")
|
|
|
|
|
|
def _combine_uncertainty(value, std):
|
|
return unp.uarray(value, std)
|
|
|
|
|
|
def combine_uncertainty(value, std, dask='parallelized', **kwargs):
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask": dask,
|
|
}
|
|
)
|
|
|
|
return xr.apply_ufunc(_combine_uncertainty, value, std, **kwargs)
|
|
|
|
|
|
def _seperate_uncertainty_single(data):
|
|
return data.n, data.s
|
|
|
|
def _seperate_uncertainty(data):
|
|
func = np.vectorize(_seperate_uncertainty_single)
|
|
return func(data)
|
|
|
|
def seperate_uncertainty(data, dask='parallelized', **kwargs):
|
|
|
|
kwargs.update(
|
|
{
|
|
"dask": dask,
|
|
"output_core_dims": [[], []],
|
|
}
|
|
)
|
|
|
|
return xr.apply_ufunc(_seperate_uncertainty, data, **kwargs)
|
|
|
|
|
|
def get_scanAxis(dataSet):
|
|
res = dataSet.scanAxis
|
|
|
|
if len(res) == 0:
|
|
res = [None, None]
|
|
elif len(res) == 1:
|
|
res = [res[0], None]
|
|
elif len(res) == 2 and res[0] == 'runs':
|
|
res = [res[1], res[0]]
|
|
|
|
return res
|
|
|
|
def print_scanAxis(dataSet):
|
|
scanAxis = dataSet.scanAxis
|
|
|
|
scan = {}
|
|
|
|
for key in scanAxis:
|
|
scanValue = np.array(dataSet[key])
|
|
scanValue, indices = np.unique(scanValue, return_index=True)
|
|
scan.update(
|
|
{
|
|
key: scanValue[indices]
|
|
}
|
|
)
|
|
print("The detected scaning axes and values are: /n")
|
|
print(scan) |