You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

417 lines
12 KiB

import xarray as xr
import dask.dataframe as df
import pandas as pd
import numpy as np
from collections import OrderedDict
from functools import partial
import copy
import glob
import os
from datetime import datetime
def _read_globals_attrs(variable_attrs, context=None):
"""Combine attributes from different variables according to combine_attrs"""
if not variable_attrs:
# no attributes to merge
return None
from xarray.core.utils import equivalent
result = {}
dropped_attrs = OrderedDict()
for attrs in variable_attrs:
result.update(
{
key: value
for key, value in attrs.items()
if key not in result and key not in dropped_attrs.keys()
}
)
result = {
key: value
for key, value in result.items()
if key not in attrs or equivalent(attrs[key], value)
}
dropped_attrs.update(
{
key: []
for key in attrs if key not in result
}
)
for attrs in variable_attrs:
dropped_attrs.update(
{
key: np.append(dropped_attrs[key], attrs[key])
for key in dropped_attrs.keys()
}
)
scan_attrs = OrderedDict()
scan_length = []
for attrs_key in dropped_attrs.keys():
flag = True
for key in scan_attrs.keys():
if equivalent(scan_attrs[key], dropped_attrs[attrs_key]):
flag = False
result.update({attrs_key: key})
break
if flag:
scan_attrs.update({
attrs_key: dropped_attrs[attrs_key]
})
scan_length = np.append(scan_length, len(dropped_attrs[attrs_key]))
result.update(
{
key: value
for key, value in scan_attrs.items()
}
)
result.update(
{
"scanAxis": list(scan_attrs.keys()),
"scanAxisLength": scan_length,
}
)
# if result['scanAxis'] == []:
# result['scanAxis'] = ['runs',]
return result
def _read_shot_number_from_hdf5(x):
filePath = x.encoding["source"]
shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0]
return x.assign(shotNum=shotNum)
def _assign_scan_axis_partial(x, datesetOfGlobal, fullFilePath):
scanAxis = datesetOfGlobal.scanAxis
filePath = x.encoding["source"].replace("\\", "/")
shotNum = np.where(fullFilePath==filePath)
shotNum = np.squeeze(shotNum)
# shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0]
x = x.assign(shotNum=filePath.split("_")[-1].split("_")[-1].split(".")[0])
x = x.expand_dims(list(scanAxis))
return x.assign_coords(
{
key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)])
for key in scanAxis
}
)
def _update_globals_attrs(variable_attrs, context=None):
pass
def update_hdf5_file():
pass
def read_hdf5_file(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
kwargs.update(
{
'join': join,
'parallel': parallel,
'engine': engine,
'phony_dims': phony_dims,
'group': group
}
)
if datesetOfGlobal is None:
datesetOfGlobal = xr.open_mfdataset(
fullFilePath,
group="globals",
concat_dim="fileNum",
combine="nested",
preprocess=_read_shot_number_from_hdf5,
engine="h5netcdf",
phony_dims="access",
combine_attrs=_read_globals_attrs,
parallel=True, )
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
_assgin_scan_axis = partial(_assign_scan_axis_partial, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath)
if preprocess is None:
kwargs.update({'preprocess':_assgin_scan_axis})
else:
kwargs.update({'preprocess':preprocess})
ds = xr.open_mfdataset(fullFilePath, **kwargs)
newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)])
oldDimKey = np.sort(
[
key
for key in ds.dims
if not key in datesetOfGlobal.scanAxis
]
)
renameDict = {
oldDimKey[j]: newDimKey[j]
for j in range(len(oldDimKey))
}
ds = ds.rename_dims(renameDict)
ds.attrs = copy.deepcopy(datesetOfGlobal.attrs)
return ds
def _assign_scan_axis_partial_and_remove_everything(x, datesetOfGlobal, fullFilePath):
scanAxis = datesetOfGlobal.scanAxis
filePath = x.encoding["source"].replace("\\", "/")
shotNum = np.where(fullFilePath==filePath)
shotNum = np.squeeze(shotNum)
runTime = _read_run_time_from_hdf5(x)
x = xr.Dataset(data_vars={'runTime':runTime})
x = x.expand_dims(list(scanAxis))
return x.assign_coords(
{
key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)])
for key in scanAxis
}
)
def _read_run_time_from_hdf5(x):
runTime = datetime.strptime(x.attrs['run time'], '%Y%m%dT%H%M%S')
return runTime
def read_hdf5_run_time(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
kwargs.update(
{
'join': join,
'parallel': parallel,
'engine': engine,
'phony_dims': phony_dims,
'group': group
}
)
if datesetOfGlobal is None:
datesetOfGlobal = xr.open_mfdataset(
fullFilePath,
group="globals",
concat_dim="fileNum",
combine="nested",
preprocess=_read_shot_number_from_hdf5,
engine="h5netcdf",
phony_dims="access",
combine_attrs=_read_globals_attrs,
parallel=True, )
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
_assgin_scan_axis = partial(_assign_scan_axis_partial_and_remove_everything, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath)
if preprocess is None:
kwargs.update({'preprocess':_assgin_scan_axis})
else:
kwargs.update({'preprocess':preprocess})
ds = xr.open_mfdataset(fullFilePath, **kwargs)
newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)])
oldDimKey = np.sort(
[
key
for key in ds.dims
if not key in datesetOfGlobal.scanAxis
]
)
renameDict = {
oldDimKey[j]: newDimKey[j]
for j in range(len(oldDimKey))
}
ds = ds.rename_dims(renameDict)
ds.attrs = copy.deepcopy(datesetOfGlobal.attrs)
return ds
def read_hdf5_global(filePath, preprocess=None, join="outer", combine="nested", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
kwargs.update(
{
'join': join,
'parallel': parallel,
'engine': engine,
'phony_dims': phony_dims,
'group': "globals",
'preprocess': _read_shot_number_from_hdf5,
'combine_attrs': _read_globals_attrs,
'combine':combine,
'concat_dim': "fileNum",
}
)
datesetOfGlobal = xr.open_mfdataset(fullFilePath, **kwargs)
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
return datesetOfGlobal
def _read_csv_file_pandas(filePath, **kwargs):
res = pd.read_csv(filePath, **kwargs)
res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
return res
def _read_csv_file_dask(filePath, **kwargs):
res = df.read_csv(filePath, **kwargs)
res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
return res
def read_csv_file(filePath, maxFileNum=None, dask='parallelized', vectorize=True, csvEngine='pandas', daskKwargs={}, csvKwargs={}, **kwargs):
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
if csvEngine=='pandas':
res_first = pd.read_csv(fullFilePath[0], **csvKwargs)
elif csvEngine=='dask':
res_first = df.read_csv(fullFilePath[0], **csvKwargs)
res_first = xr.Dataset.from_dataframe(res_first)
data_vars = list(res_first.keys())
# print(data_vars)
# print(np.shape(data_vars)[1])
if len(np.shape(data_vars)) > 1:
data_vars = np.array(
[
''.join(data_vars[i])
for i in range(np.shape(data_vars)[0])
]
)
fullFilePath = xr.DataArray(
data=fullFilePath,
dims=['fileIndex']
)
newDimKey = np.append(['data_vars'], list(res_first.dims.keys()))
newDimKey = np.append(newDimKey, ['x', 'y', 'z'])
newDimKey = np.append(newDimKey, [ chr(i) for i in range(97, 97+23)])
kwargs.update(
{
'dask': dask,
'vectorize': vectorize,
'output_core_dims': [newDimKey[0:len(res_first.dims) + 1]],
"dask_gufunc_kwargs": daskKwargs,
}
)
if csvEngine=='pandas':
res = xr.apply_ufunc(_read_csv_file_pandas, fullFilePath, kwargs=csvKwargs, **kwargs)
elif csvEngine=='dask':
res = xr.apply_ufunc(_read_csv_file_dask, fullFilePath, kwargs=csvKwargs, **kwargs)
res = res.assign_coords({'data_vars': data_vars})
res = res.to_dataset(dim='data_vars')
for key in list(res_first.coords.keys()):
res = res.assign_coords({key: res_first[key]})
return res