import glob from datetime import date import copy import numpy as np from uncertainties import unumpy as unp import xarray as xr def get_mask(dataArray): """Generate a bool mask array for given dataArray :param dataArray: The given dataArray :type dataArray: xarray DataArray :return: the mask array :rtype: numpy array of bool elements """ return np.ones(dataArray.shape, dtype=bool) def remove_bad_shots(dataArray, **kwargs): """Copy and remove bad shots from the dataArray by setting the value to np.nan. If you want fully delete those nan data, please use the function xarray.DataArray.dropna() (see this link https://docs.xarray.dev/en/stable/generated/xarray.DataArray.dropna.html). Here is an example for indexing the bad shots: remove_bad_shots(dataArray, axis_1 = the value (not index) of axis_1, axis_2 = the value of axis_2, ...) For more detials please read 'Positional indexing' section in this link https://docs.xarray.dev/en/stable/user-guide/indexing.html#positional-indexing. :param dataArray: The given dataArray :type dataArray: xarray DataArray :return: The dataArray after removement :rtype: xarray DataArray """ dataArray = copy.deepcopy(dataArray) dataArray.loc[dict(kwargs)] = np.nan return dataArray def auto_rechunk(dataSet): """Rechunk the dataSet or dataArray using auto rechunk function :param dataSet: The given dataArray or dataSet :type dataSet: xarray DataArray or xarray DataSet :return: The chuncked dataArray or dataSet :rtype: xarray DataArray or xarray DataSet """ kwargs = { key: "auto" for key in dataSet.dims } return dataSet.chunk(**kwargs) def copy_chunk(dataSet, dataChunk): """Copy the chunk and apply to another dataArray or dataSet :param dataSet: The dataArray or dataSet will be chunked :type dataSet: xarray DataArray or xarray DataSet :param dataChunk: The dataArray or dataSet giving the chunk :type dataChunk: xarray DataArray or xarray DataSet :return: The chuncked dataArray or dataSet :rtype: xarray DataArray or xarray DataSet """ kwargs = { key: dataChunk.chunksizes[key] for key in dataChunk.chunksizes if key in dataSet.dims } return dataSet.chunk(**kwargs) def get_h5_file_path(folderpath, maxFileNum=None, filename='*.h5',): """Get all the path of HDF5 files in specific folder :param folderpath: the path of the folder :type folderpath: str :param maxFileNum: the maximal number of returned files, defaults to None :type maxFileNum: int, optional :param filename: a string to specify the type of the file to read, defaults to '*.h5' :type filename: str, optional :return: the found file path :rtype: 1D numpy array """ filepath = np.sort(glob.glob(folderpath + filename)) if maxFileNum is None: return filepath else: return filepath[:maxFileNum] def get_date(): """Return the date of today in a format compatible with file path :return: the date of today in a format compatible with file path :rtype: str """ today = date.today() return today.strftime("%Y/%m/%d") def _combine_uncertainty(value, std): """Give a list of value and standard deviation, combine them to a number with unceuncertainty (ufloat), and return them in another list. See this link https://pythonhosted.org/uncertainties/ :param value: The value :type value: float, or array like :param std: The standard deviation :type std: float, or array like :return: The combined value and standard deviation :rtype: ufloat, or uncertainties uarray """ return unp.uarray(value, std) def combine_uncertainty(value, std, dask='parallelized', **kwargs): """Give a xarray DataArray of value and standard deviation, combine them to a number with unceuncertainty (ufloat), and return them in another xarray DataArray . See this link https://pythonhosted.org/uncertainties/ :param value: The value :type value: xarray DataArray :param std: The standard deviation :type std: xarray DataArray :param dask: over write of the same argument in xarray.apply_ufunc, defaults to 'parallelized' :type dask: str, optional :return: The combined value and standard deviation :rtype: xarray DataArray """ kwargs.update( { "dask": dask, } ) return xr.apply_ufunc(_combine_uncertainty, value, std, **kwargs) def _seperate_uncertainty_single(data): """From a number with unceuncertainty, read out the value and standard deviation :param data: The number with unceuncertainty :type data: ufloat :return: a tuple of (value, standard deviations) :rtype: tuple of two floats """ return data.n, data.s def _seperate_uncertainty(data): """From a list of numbers with unceuncertainty, read out the values and standard deviations :param data: The list of numbers with unceuncertainty :type data: ufloat, or uncertainties uarray :return: a tuple of (a numpy array of value, a numpy array of standard deviations) :rtype: tuple of two numpy arrays """ func = np.vectorize(_seperate_uncertainty_single) return func(data) def seperate_uncertainty(data, dask='parallelized', **kwargs): """From a xarray DataArray of numbers with unceuncertainty, read out the values and standard deviations :param data: The xarray DataArray of numbers with unceuncertainty :type data: xarray DataArray :param dask: over write of the same argument in xarray.apply_ufunc, defaults to 'parallelized' :type dask: str, optional :return: a tuple of (a xarray DataArray of value, a xarray DataArray of standard deviations) :rtype: tuple of two xarray DataArray """ kwargs.update( { "dask": dask, "output_core_dims": [[], []], } ) return xr.apply_ufunc(_seperate_uncertainty, data, **kwargs) def get_scanAxis(dataSet): """Give a numpy array of names of scan axes. :param dataSet: The xarray DataSet stored the data. :type dataSet: xarray DataSet :return: The names of scan axes :rtype: a numpy array """ res = dataSet.scanAxis if len(res) == 0: res = [None, None] elif len(res) == 1: res = [res[0], None] elif len(res) == 2 and res[0] == 'runs': res = [res[1], res[0]] return res def print_scanAxis(dataSet): """Print the names and the values of scan axes. :param dataSet: The xarray DataSet stored the data. :type dataSet: xarray DataSet """ scanAxis = dataSet.scanAxis scan = {} for key in scanAxis: scanValue = np.array(dataSet[key]) scanValue, indices = np.unique(scanValue, return_index=True) scan.update( { key: scanValue[indices] } ) print("The detected scaning axes and values are: \n") print(scan) def print_scanAxis_original(dataSet): # print the original (unsorted) scan axes. pass def calculate_mean(dataSet): """Calculte the mean along repetition axis 'runs' :param dataSet: The xarray DataSet or DataArray stored the data. :type dataSet: xarray DataSet or DataArray :return: The mean value :rtype: xarray DataSet or DataArray """ if 'runs' in dataSet.dims: return dataSet.mean(dim='runs') else: return dataSet def calculate_std(dataSet): """Calculte the standard deviation along repetition axis 'runs' :param dataSet: The xarray DataSet or DataArray stored the data. :type dataSet: xarray DataSet or DataArray :return: The standard deviation :rtype: xarray DataSet or DataArray """ if 'runs' in dataSet.dims: return dataSet.std(dim='runs') else: return None def extract_temperature_from_fit(): # by giving the shot number, read data, fit the center of could and fit with ToF, give the temperature of the clound pass def extract_condensate_fraction_from_fit(): # by giving the shot number, read data, fit the of could, give the condenstate fraction of the clound pass def swap_xy(dataSet): """Swap the x ans y axis. :param dataSet: The xarray DataSet or DataArray stored the data. :type dataSet: xarray DataSet or DataArray :return: The xarray DataSet or DataArray with swapped x and y axis. :rtype: xarray DataSet or DataArray """ dataSet = dataSet.rename_dims(dict(x='__x')) dataSet = dataSet.rename_dims(dict(y='x')) dataSet = dataSet.rename_dims(dict(__x='y')) return dataSet