import xarray as xr import dask.dataframe as df import pandas as pd import numpy as np from collections import OrderedDict from functools import partial import copy import glob import os from datetime import datetime def _read_globals_attrs(variable_attrs, context=None): """Combine attributes from different variables according to combine_attrs""" if not variable_attrs: # no attributes to merge return None from xarray.core.utils import equivalent result = {} dropped_attrs = OrderedDict() for attrs in variable_attrs: result.update( { key: value for key, value in attrs.items() if key not in result and key not in dropped_attrs.keys() } ) result = { key: value for key, value in result.items() if key not in attrs or equivalent(attrs[key], value) } dropped_attrs.update( { key: [] for key in attrs if key not in result } ) for attrs in variable_attrs: dropped_attrs.update( { key: np.append(dropped_attrs[key], attrs[key]) for key in dropped_attrs.keys() } ) scan_attrs = OrderedDict() scan_length = [] for attrs_key in dropped_attrs.keys(): flag = True for key in scan_attrs.keys(): if equivalent(scan_attrs[key], dropped_attrs[attrs_key]): flag = False result.update({attrs_key: key}) break if flag: scan_attrs.update({ attrs_key: dropped_attrs[attrs_key] }) scan_length = np.append(scan_length, len(dropped_attrs[attrs_key])) result.update( { key: value for key, value in scan_attrs.items() } ) result.update( { "scanAxis": list(scan_attrs.keys()), "scanAxisLength": scan_length, } ) # if result['scanAxis'] == []: # result['scanAxis'] = ['runs',] return result def _read_shot_number_from_hdf5(x): filePath = x.encoding["source"] shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0] return x.assign(shotNum=shotNum) def _assign_scan_axis_partial(x, datesetOfGlobal, fullFilePath): scanAxis = datesetOfGlobal.scanAxis filePath = x.encoding["source"].replace("\\", "/") shotNum = np.where(fullFilePath==filePath) shotNum = np.squeeze(shotNum) # shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0] x = x.assign(shotNum=filePath.split("_")[-1].split("_")[-1].split(".")[0]) x = x.expand_dims(list(scanAxis)) return x.assign_coords( { key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)]) for key in scanAxis } ) def _update_globals_attrs(variable_attrs, context=None): pass def update_hdf5_file(): pass def read_hdf5_file(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs): filePath = np.sort(np.atleast_1d(filePath)) filePathAbs = [] for i in range(len(filePath)): filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/")) fullFilePath = [] for i in range(len(filePathAbs)): fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i])))) fullFilePath = np.array(fullFilePath).flatten() for i in range(len(fullFilePath)): fullFilePath[i] = fullFilePath[i].replace("\\", "/") if not maxFileNum is None: fullFilePath = fullFilePath[0:int(maxFileNum)] kwargs.update( { 'join': join, 'parallel': parallel, 'engine': engine, 'phony_dims': phony_dims, 'group': group } ) if datesetOfGlobal is None: datesetOfGlobal = xr.open_mfdataset( fullFilePath, group="globals", concat_dim="fileNum", combine="nested", preprocess=_read_shot_number_from_hdf5, engine="h5netcdf", phony_dims="access", combine_attrs=_read_globals_attrs, parallel=True, ) datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis) _assgin_scan_axis = partial(_assign_scan_axis_partial, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath) if preprocess is None: kwargs.update({'preprocess':_assgin_scan_axis}) else: kwargs.update({'preprocess':preprocess}) ds = xr.open_mfdataset(fullFilePath, **kwargs) newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)]) oldDimKey = np.sort( [ key for key in ds.dims if not key in datesetOfGlobal.scanAxis ] ) renameDict = { oldDimKey[j]: newDimKey[j] for j in range(len(oldDimKey)) } ds = ds.rename_dims(renameDict) ds.attrs = copy.deepcopy(datesetOfGlobal.attrs) return ds def _assign_scan_axis_partial_and_remove_everything(x, datesetOfGlobal, fullFilePath): scanAxis = datesetOfGlobal.scanAxis filePath = x.encoding["source"].replace("\\", "/") shotNum = np.where(fullFilePath==filePath) shotNum = np.squeeze(shotNum) runTime = _read_run_time_from_hdf5(x) x = xr.Dataset(data_vars={'runTime':runTime}) x = x.expand_dims(list(scanAxis)) return x.assign_coords( { key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)]) for key in scanAxis } ) def _read_run_time_from_hdf5(x): runTime = datetime.strptime(x.attrs['run time'], '%Y%m%dT%H%M%S') return runTime def read_hdf5_run_time(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs): filePath = np.sort(np.atleast_1d(filePath)) filePathAbs = [] for i in range(len(filePath)): filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/")) fullFilePath = [] for i in range(len(filePathAbs)): fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i])))) fullFilePath = np.array(fullFilePath).flatten() for i in range(len(fullFilePath)): fullFilePath[i] = fullFilePath[i].replace("\\", "/") if not maxFileNum is None: fullFilePath = fullFilePath[0:int(maxFileNum)] kwargs.update( { 'join': join, 'parallel': parallel, 'engine': engine, 'phony_dims': phony_dims, 'group': group } ) if datesetOfGlobal is None: datesetOfGlobal = xr.open_mfdataset( fullFilePath, group="globals", concat_dim="fileNum", combine="nested", preprocess=_read_shot_number_from_hdf5, engine="h5netcdf", phony_dims="access", combine_attrs=_read_globals_attrs, parallel=True, ) datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis) _assgin_scan_axis = partial(_assign_scan_axis_partial_and_remove_everything, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath) if preprocess is None: kwargs.update({'preprocess':_assgin_scan_axis}) else: kwargs.update({'preprocess':preprocess}) ds = xr.open_mfdataset(fullFilePath, **kwargs) newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)]) oldDimKey = np.sort( [ key for key in ds.dims if not key in datesetOfGlobal.scanAxis ] ) renameDict = { oldDimKey[j]: newDimKey[j] for j in range(len(oldDimKey)) } ds = ds.rename_dims(renameDict) ds.attrs = copy.deepcopy(datesetOfGlobal.attrs) return ds def read_hdf5_global(filePath, preprocess=None, join="outer", combine="nested", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs): filePath = np.sort(np.atleast_1d(filePath)) filePathAbs = [] for i in range(len(filePath)): filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/")) fullFilePath = [] for i in range(len(filePathAbs)): fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i])))) fullFilePath = np.array(fullFilePath).flatten() for i in range(len(fullFilePath)): fullFilePath[i] = fullFilePath[i].replace("\\", "/") if not maxFileNum is None: fullFilePath = fullFilePath[0:int(maxFileNum)] kwargs.update( { 'join': join, 'parallel': parallel, 'engine': engine, 'phony_dims': phony_dims, 'group': "globals", 'preprocess': _read_shot_number_from_hdf5, 'combine_attrs': _read_globals_attrs, 'combine':combine, 'concat_dim': "fileNum", } ) datesetOfGlobal = xr.open_mfdataset(fullFilePath, **kwargs) datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis) datesetOfGlobal. return datesetOfGlobal def _read_csv_file_pandas(filePath, **kwargs): res = pd.read_csv(filePath, **kwargs) res = xr.Dataset.from_dataframe(res).to_array().to_numpy() return res def _read_csv_file_dask(filePath, **kwargs): res = df.read_csv(filePath, **kwargs) res = xr.Dataset.from_dataframe(res).to_array().to_numpy() return res def read_csv_file(filePath, maxFileNum=None, dask='parallelized', vectorize=True, csvEngine='pandas', daskKwargs={}, csvKwargs={}, **kwargs): filePath = np.sort(np.atleast_1d(filePath)) filePathAbs = [] for i in range(len(filePath)): filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/")) fullFilePath = [] for i in range(len(filePathAbs)): fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i])))) fullFilePath = np.array(fullFilePath).flatten() for i in range(len(fullFilePath)): fullFilePath[i] = fullFilePath[i].replace("\\", "/") if not maxFileNum is None: fullFilePath = fullFilePath[0:int(maxFileNum)] if csvEngine=='pandas': res_first = pd.read_csv(fullFilePath[0], **csvKwargs) elif csvEngine=='dask': res_first = df.read_csv(fullFilePath[0], **csvKwargs) res_first = xr.Dataset.from_dataframe(res_first) data_vars = list(res_first.keys()) # print(data_vars) # print(np.shape(data_vars)[1]) if len(np.shape(data_vars)) > 1: data_vars = np.array( [ ''.join(data_vars[i]) for i in range(np.shape(data_vars)[0]) ] ) fullFilePath = xr.DataArray( data=fullFilePath, dims=['fileIndex'] ) newDimKey = np.append(['data_vars'], list(res_first.dims.keys())) newDimKey = np.append(newDimKey, ['x', 'y', 'z']) newDimKey = np.append(newDimKey, [ chr(i) for i in range(97, 97+23)]) kwargs.update( { 'dask': dask, 'vectorize': vectorize, 'output_core_dims': [newDimKey[0:len(res_first.dims) + 1]], "dask_gufunc_kwargs": daskKwargs, } ) if csvEngine=='pandas': res = xr.apply_ufunc(_read_csv_file_pandas, fullFilePath, kwargs=csvKwargs, **kwargs) elif csvEngine=='dask': res = xr.apply_ufunc(_read_csv_file_dask, fullFilePath, kwargs=csvKwargs, **kwargs) res = res.assign_coords({'data_vars': data_vars}) res = res.to_dataset(dim='data_vars') for key in list(res_first.coords.keys()): res = res.assign_coords({key: res_first[key]}) return res