417 lines
12 KiB
Python
417 lines
12 KiB
Python
import xarray as xr
|
|
import dask.dataframe as df
|
|
import pandas as pd
|
|
import numpy as np
|
|
from collections import OrderedDict
|
|
from functools import partial
|
|
import copy
|
|
import glob
|
|
import os
|
|
from datetime import datetime
|
|
|
|
|
|
def _read_globals_attrs(variable_attrs, context=None):
|
|
"""Combine attributes from different variables according to combine_attrs"""
|
|
if not variable_attrs:
|
|
# no attributes to merge
|
|
return None
|
|
|
|
from xarray.core.utils import equivalent
|
|
|
|
result = {}
|
|
dropped_attrs = OrderedDict()
|
|
for attrs in variable_attrs:
|
|
result.update(
|
|
{
|
|
key: value
|
|
for key, value in attrs.items()
|
|
if key not in result and key not in dropped_attrs.keys()
|
|
}
|
|
)
|
|
result = {
|
|
key: value
|
|
for key, value in result.items()
|
|
if key not in attrs or equivalent(attrs[key], value)
|
|
}
|
|
dropped_attrs.update(
|
|
{
|
|
key: []
|
|
for key in attrs if key not in result
|
|
}
|
|
)
|
|
|
|
for attrs in variable_attrs:
|
|
dropped_attrs.update(
|
|
{
|
|
key: np.append(dropped_attrs[key], attrs[key])
|
|
for key in dropped_attrs.keys()
|
|
}
|
|
)
|
|
|
|
scan_attrs = OrderedDict()
|
|
scan_length = []
|
|
for attrs_key in dropped_attrs.keys():
|
|
flag = True
|
|
for key in scan_attrs.keys():
|
|
if equivalent(scan_attrs[key], dropped_attrs[attrs_key]):
|
|
flag = False
|
|
|
|
result.update({attrs_key: key})
|
|
|
|
break
|
|
if flag:
|
|
scan_attrs.update({
|
|
attrs_key: dropped_attrs[attrs_key]
|
|
})
|
|
scan_length = np.append(scan_length, len(dropped_attrs[attrs_key]))
|
|
|
|
result.update(
|
|
{
|
|
key: value
|
|
for key, value in scan_attrs.items()
|
|
}
|
|
)
|
|
|
|
result.update(
|
|
{
|
|
"scanAxis": list(scan_attrs.keys()),
|
|
"scanAxisLength": scan_length,
|
|
}
|
|
)
|
|
|
|
# if result['scanAxis'] == []:
|
|
# result['scanAxis'] = ['runs',]
|
|
|
|
return result
|
|
|
|
|
|
def _read_shot_number_from_hdf5(x):
|
|
filePath = x.encoding["source"]
|
|
shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0]
|
|
return x.assign(shotNum=shotNum)
|
|
|
|
|
|
def _assign_scan_axis_partial(x, datesetOfGlobal, fullFilePath):
|
|
scanAxis = datesetOfGlobal.scanAxis
|
|
filePath = x.encoding["source"].replace("\\", "/")
|
|
shotNum = np.where(fullFilePath==filePath)
|
|
shotNum = np.squeeze(shotNum)
|
|
# shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0]
|
|
x = x.assign(shotNum=shotNum)
|
|
x = x.expand_dims(list(scanAxis))
|
|
|
|
return x.assign_coords(
|
|
{
|
|
key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)])
|
|
for key in scanAxis
|
|
}
|
|
)
|
|
|
|
|
|
def _update_globals_attrs(variable_attrs, context=None):
|
|
pass
|
|
|
|
|
|
def update_hdf5_file():
|
|
pass
|
|
|
|
|
|
def read_hdf5_file(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
|
|
|
|
filePath = np.sort(np.atleast_1d(filePath))
|
|
|
|
filePathAbs = []
|
|
|
|
for i in range(len(filePath)):
|
|
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
|
|
|
|
fullFilePath = []
|
|
for i in range(len(filePathAbs)):
|
|
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
|
|
fullFilePath = np.array(fullFilePath).flatten()
|
|
|
|
for i in range(len(fullFilePath)):
|
|
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
|
|
|
|
if not maxFileNum is None:
|
|
fullFilePath = fullFilePath[0:int(maxFileNum)]
|
|
|
|
kwargs.update(
|
|
{
|
|
'join': join,
|
|
'parallel': parallel,
|
|
'engine': engine,
|
|
'phony_dims': phony_dims,
|
|
'group': group
|
|
}
|
|
)
|
|
|
|
if datesetOfGlobal is None:
|
|
datesetOfGlobal = xr.open_mfdataset(
|
|
fullFilePath,
|
|
group="globals",
|
|
concat_dim="fileNum",
|
|
combine="nested",
|
|
preprocess=_read_shot_number_from_hdf5,
|
|
engine="h5netcdf",
|
|
phony_dims="access",
|
|
combine_attrs=_read_globals_attrs,
|
|
parallel=True, )
|
|
|
|
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
|
|
|
|
_assgin_scan_axis = partial(_assign_scan_axis_partial, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath)
|
|
|
|
if preprocess is None:
|
|
kwargs.update({'preprocess':_assgin_scan_axis})
|
|
else:
|
|
kwargs.update({'preprocess':preprocess})
|
|
|
|
ds = xr.open_mfdataset(fullFilePath, **kwargs)
|
|
|
|
newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)])
|
|
|
|
oldDimKey = np.sort(
|
|
[
|
|
key
|
|
for key in ds.dims
|
|
if not key in datesetOfGlobal.scanAxis
|
|
]
|
|
)
|
|
|
|
renameDict = {
|
|
oldDimKey[j]: newDimKey[j]
|
|
for j in range(len(oldDimKey))
|
|
}
|
|
|
|
ds = ds.rename_dims(renameDict)
|
|
|
|
ds.attrs = copy.deepcopy(datesetOfGlobal.attrs)
|
|
|
|
return ds
|
|
|
|
|
|
def _assign_scan_axis_partial_and_remove_everything(x, datesetOfGlobal, fullFilePath):
|
|
scanAxis = datesetOfGlobal.scanAxis
|
|
filePath = x.encoding["source"].replace("\\", "/")
|
|
shotNum = np.where(fullFilePath==filePath)
|
|
shotNum = np.squeeze(shotNum)
|
|
runTime = _read_run_time_from_hdf5(x)
|
|
x = xr.Dataset(data_vars={'runTime':runTime})
|
|
x = x.expand_dims(list(scanAxis))
|
|
|
|
return x.assign_coords(
|
|
{
|
|
key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)])
|
|
for key in scanAxis
|
|
}
|
|
)
|
|
|
|
|
|
def _read_run_time_from_hdf5(x):
|
|
runTime = datetime.strptime(x.attrs['run time'], '%Y%m%dT%H%M%S')
|
|
return runTime
|
|
|
|
|
|
def read_hdf5_run_time(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
|
|
|
|
filePath = np.sort(np.atleast_1d(filePath))
|
|
|
|
filePathAbs = []
|
|
|
|
for i in range(len(filePath)):
|
|
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
|
|
|
|
fullFilePath = []
|
|
for i in range(len(filePathAbs)):
|
|
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
|
|
fullFilePath = np.array(fullFilePath).flatten()
|
|
|
|
for i in range(len(fullFilePath)):
|
|
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
|
|
|
|
if not maxFileNum is None:
|
|
fullFilePath = fullFilePath[0:int(maxFileNum)]
|
|
|
|
kwargs.update(
|
|
{
|
|
'join': join,
|
|
'parallel': parallel,
|
|
'engine': engine,
|
|
'phony_dims': phony_dims,
|
|
'group': group
|
|
}
|
|
)
|
|
|
|
if datesetOfGlobal is None:
|
|
datesetOfGlobal = xr.open_mfdataset(
|
|
fullFilePath,
|
|
group="globals",
|
|
concat_dim="fileNum",
|
|
combine="nested",
|
|
preprocess=_read_shot_number_from_hdf5,
|
|
engine="h5netcdf",
|
|
phony_dims="access",
|
|
combine_attrs=_read_globals_attrs,
|
|
parallel=True, )
|
|
|
|
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
|
|
|
|
_assgin_scan_axis = partial(_assign_scan_axis_partial_and_remove_everything, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath)
|
|
|
|
if preprocess is None:
|
|
kwargs.update({'preprocess':_assgin_scan_axis})
|
|
else:
|
|
kwargs.update({'preprocess':preprocess})
|
|
|
|
ds = xr.open_mfdataset(fullFilePath, **kwargs)
|
|
|
|
newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)])
|
|
|
|
oldDimKey = np.sort(
|
|
[
|
|
key
|
|
for key in ds.dims
|
|
if not key in datesetOfGlobal.scanAxis
|
|
]
|
|
)
|
|
|
|
renameDict = {
|
|
oldDimKey[j]: newDimKey[j]
|
|
for j in range(len(oldDimKey))
|
|
}
|
|
|
|
ds = ds.rename_dims(renameDict)
|
|
|
|
ds.attrs = copy.deepcopy(datesetOfGlobal.attrs)
|
|
|
|
return ds
|
|
|
|
|
|
def read_hdf5_global(filePath, preprocess=None, join="outer", combine="nested", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
|
|
|
|
filePath = np.sort(np.atleast_1d(filePath))
|
|
|
|
filePathAbs = []
|
|
|
|
for i in range(len(filePath)):
|
|
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
|
|
|
|
fullFilePath = []
|
|
for i in range(len(filePathAbs)):
|
|
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
|
|
fullFilePath = np.array(fullFilePath).flatten()
|
|
|
|
for i in range(len(fullFilePath)):
|
|
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
|
|
|
|
if not maxFileNum is None:
|
|
fullFilePath = fullFilePath[0:int(maxFileNum)]
|
|
|
|
kwargs.update(
|
|
{
|
|
'join': join,
|
|
'parallel': parallel,
|
|
'engine': engine,
|
|
'phony_dims': phony_dims,
|
|
'group': "globals",
|
|
'preprocess': _read_shot_number_from_hdf5,
|
|
'combine_attrs': _read_globals_attrs,
|
|
'combine':combine,
|
|
'concat_dim': "fileNum",
|
|
}
|
|
)
|
|
|
|
datesetOfGlobal = xr.open_mfdataset(fullFilePath, **kwargs)
|
|
|
|
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
|
|
|
|
return datesetOfGlobal
|
|
|
|
|
|
def _read_csv_file_pandas(filePath, **kwargs):
|
|
|
|
res = pd.read_csv(filePath, **kwargs)
|
|
|
|
res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
|
|
return res
|
|
|
|
|
|
def _read_csv_file_dask(filePath, **kwargs):
|
|
|
|
res = df.read_csv(filePath, **kwargs)
|
|
|
|
res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
|
|
return res
|
|
|
|
|
|
def read_csv_file(filePath, maxFileNum=None, dask='parallelized', vectorize=True, csvEngine='pandas', daskKwargs={}, csvKwargs={}, **kwargs):
|
|
filePath = np.sort(np.atleast_1d(filePath))
|
|
|
|
filePathAbs = []
|
|
|
|
for i in range(len(filePath)):
|
|
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
|
|
|
|
fullFilePath = []
|
|
for i in range(len(filePathAbs)):
|
|
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
|
|
fullFilePath = np.array(fullFilePath).flatten()
|
|
|
|
for i in range(len(fullFilePath)):
|
|
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
|
|
|
|
if not maxFileNum is None:
|
|
fullFilePath = fullFilePath[0:int(maxFileNum)]
|
|
|
|
if csvEngine=='pandas':
|
|
res_first = pd.read_csv(fullFilePath[0], **csvKwargs)
|
|
elif csvEngine=='dask':
|
|
res_first = df.read_csv(fullFilePath[0], **csvKwargs)
|
|
|
|
res_first = xr.Dataset.from_dataframe(res_first)
|
|
|
|
data_vars = list(res_first.keys())
|
|
|
|
# print(data_vars)
|
|
# print(np.shape(data_vars)[1])
|
|
|
|
if len(np.shape(data_vars)) > 1:
|
|
data_vars = np.array(
|
|
[
|
|
''.join(data_vars[i])
|
|
for i in range(np.shape(data_vars)[0])
|
|
]
|
|
)
|
|
|
|
fullFilePath = xr.DataArray(
|
|
data=fullFilePath,
|
|
dims=['fileIndex']
|
|
)
|
|
|
|
newDimKey = np.append(['data_vars'], list(res_first.dims.keys()))
|
|
newDimKey = np.append(newDimKey, ['x', 'y', 'z'])
|
|
newDimKey = np.append(newDimKey, [ chr(i) for i in range(97, 97+23)])
|
|
|
|
kwargs.update(
|
|
{
|
|
'dask': dask,
|
|
'vectorize': vectorize,
|
|
'output_core_dims': [newDimKey[0:len(res_first.dims) + 1]],
|
|
"dask_gufunc_kwargs": daskKwargs,
|
|
}
|
|
)
|
|
|
|
if csvEngine=='pandas':
|
|
res = xr.apply_ufunc(_read_csv_file_pandas, fullFilePath, kwargs=csvKwargs, **kwargs)
|
|
elif csvEngine=='dask':
|
|
res = xr.apply_ufunc(_read_csv_file_dask, fullFilePath, kwargs=csvKwargs, **kwargs)
|
|
|
|
res = res.assign_coords({'data_vars': data_vars})
|
|
|
|
res = res.to_dataset(dim='data_vars')
|
|
|
|
for key in list(res_first.coords.keys()):
|
|
res = res.assign_coords({key: res_first[key]})
|
|
|
|
return res |