import glob from datetime import date import copy import numpy as np from uncertainties import unumpy as unp import xarray as xr def get_mask(dataArray): """generate a bool mask array for given dataArray :param dataArray: The given dataArray :type dataArray: xarray DataArray :return: the mask array :rtype: numpy array of bool elements """ return np.ones(dataArray.shape, dtype=bool) def remove_bad_shots(dataArray, **kwargs): """copy and remove bad shots from the dataArray :param dataArray: The given dataArray :type dataArray: xarray DataArray :return: The dataArray after removement :rtype: xarray DataArray """ dataArray = copy.deepcopy(dataArray) dataArray.loc[dict(kwargs)] = np.nan return dataArray def auto_rechunk(dataSet): """rechunk the dataSet or dataArray using auto rechunk function :param dataSet: The given dataArray or dataSet :type dataSet: xarray DataArray or xarray DataSet :return: The chuncked dataArray or dataSet :rtype: xarray DataArray or xarray DataSet """ kwargs = { key: "auto" for key in dataSet.dims } return dataSet.chunk(**kwargs) def copy_chunk(dataSet, dataChunk): """copy the chunk and apply to another dataArray or dataSet :param dataSet: The dataArray or dataSet will be chunked :type dataSet: xarray DataArray or xarray DataSet :param dataChunk: The dataArray or dataSet giving the chunk :type dataChunk: xarray DataArray or xarray DataSet :return: The chuncked dataArray or dataSet :rtype: xarray DataArray or xarray DataSet """ kwargs = { key: dataChunk.chunksizes[key] for key in dataChunk.chunksizes if key in dataSet.dims } return dataSet.chunk(**kwargs) def get_h5_file_path(folderpath, maxFileNum=None, filename='*.h5',): """_summary_ :param folderpath: _description_ :type folderpath: _type_ :param maxFileNum: _description_, defaults to None :type maxFileNum: _type_, optional :param filename: _description_, defaults to '*.h5' :type filename: str, optional :return: _description_ :rtype: _type_ """ filepath = np.sort(glob.glob(folderpath + filename)) if maxFileNum is None: return filepath else: return filepath[:maxFileNum] def get_date(): today = date.today() return today.strftime("%Y/%m/%d") def _combine_uncertainty(value, std): return unp.uarray(value, std) def combine_uncertainty(value, std, dask='parallelized', **kwargs): kwargs.update( { "dask": dask, } ) return xr.apply_ufunc(_combine_uncertainty, value, std, **kwargs) def _seperate_uncertainty_single(data): return data.n, data.s def _seperate_uncertainty(data): func = np.vectorize(_seperate_uncertainty_single) return func(data) def seperate_uncertainty(data, dask='parallelized', **kwargs): kwargs.update( { "dask": dask, "output_core_dims": [[], []], } ) return xr.apply_ufunc(_seperate_uncertainty, data, **kwargs) def get_scanAxis(dataSet): res = dataSet.scanAxis if len(res) == 0: res = [None, None] elif len(res) == 1: res = [res[0], None] elif len(res) == 2 and res[0] == 'runs': res = [res[1], res[0]] return res def print_scanAxis(dataSet): scanAxis = dataSet.scanAxis scan = {} for key in scanAxis: scanValue = np.array(dataSet[key]) scanValue, indices = np.unique(scanValue, return_index=True) scan.update( { key: scanValue[indices] } ) print("The detected scaning axes and values are: \n") print(scan) def calculate_mean(dataSet): if 'runs' in dataSet.dims: return dataSet.mean(dim='runs') else: return dataSet def calculate_std(dataSet): if 'runs' in dataSet.dims: return dataSet.std(dim='runs') else: return None def extract_temperature_from_fit(): pass def extract_condensate_fraction_from_fit(): pass def swap_xy(dataSet): dataSet = dataSet.rename_dims(dict(x='__x')) dataSet = dataSet.rename_dims(dict(y='x')) dataSet = dataSet.rename_dims(dict(__x='y')) return dataSet