You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

574 lines
19 KiB

import xarray as xr
import dask.dataframe as df
import pandas as pd
import numpy as np
from collections import OrderedDict
from functools import partial
import copy
import glob
import os
from datetime import datetime
def _read_globals_attrs(variable_attrs, context=None):
"""Find global parameters of shots, including scan axes.
:param variable_attrs: The attrs of current shot.
:type variable_attrs: dict
:param context: _description_, defaults to None
:type context: _type_, optional
:return: The globals attrs of the whole shot.
:rtype: dict
"""
# Combine attributes from different variables according to combine_attrs
if not variable_attrs:
# no attributes to merge
return None
from xarray.core.utils import equivalent
result = {}
dropped_attrs = OrderedDict()
for attrs in variable_attrs:
result.update(
{
key: value
for key, value in attrs.items()
if key not in result and key not in dropped_attrs.keys()
}
)
result = {
key: value
for key, value in result.items()
if key not in attrs or equivalent(attrs[key], value)
}
dropped_attrs.update(
{
key: []
for key in attrs if key not in result
}
)
for attrs in variable_attrs:
dropped_attrs.update(
{
key: np.append(dropped_attrs[key], attrs[key])
for key in dropped_attrs.keys()
}
)
scan_attrs = OrderedDict()
scan_length = []
for attrs_key in dropped_attrs.keys():
flag = True
for key in scan_attrs.keys():
if equivalent(scan_attrs[key], dropped_attrs[attrs_key]):
flag = False
result.update({attrs_key: key})
break
if flag:
scan_attrs.update({
attrs_key: dropped_attrs[attrs_key]
})
scan_length = np.append(scan_length, len(dropped_attrs[attrs_key]))
result.update(
{
key: value
for key, value in scan_attrs.items()
}
)
result.update(
{
"scanAxis": list(scan_attrs.keys()),
"scanAxisLength": scan_length,
}
)
# if result['scanAxis'] == []:
# result['scanAxis'] = ['runs',]
return result
def _read_shot_number_from_hdf5(x):
"""Add the current shot number to the data read from HDF5 file.
:param x: The data of current shot
:type x: xarray DataArray
:return: The data with current shot number
:rtype: xarray DataArray
"""
filePath = x.encoding["source"]
shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0]
return x.assign(shotNum=shotNum)
def _assign_scan_axis_partial(x, datesetOfGlobal, fullFilePath):
"""Find and add the scan axes to the data read from HDF5 file.
:param x: The data of current shot
:type x: xarray DataArray
:param datesetOfGlobal: The xarray DataSet stored the information of global parameters
:type datesetOfGlobal: xarray DataSet
:param fullFilePath: The full and absolute file path of current shot
:type fullFilePath: str
:return: The data of current shot with scan axes
:rtype: xarray DataArray
"""
scanAxis = datesetOfGlobal.scanAxis
filePath = x.encoding["source"].replace("\\", "/")
shotNum = np.where(fullFilePath==filePath)
shotNum = np.squeeze(shotNum)
# shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0]
x = x.assign(shotNum=filePath.split("_")[-1].split("_")[-1].split(".")[0])
x = x.expand_dims(list(scanAxis))
return x.assign_coords(
{
key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)])
for key in scanAxis
}
)
def _update_globals_attrs(variable_attrs, context=None):
# for live plot panel
pass
def update_hdf5_file():
# for live plot panel
pass
def read_hdf5_file(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
"""Read the data from HDF5 files in given path.
:param filePath: The path of HDF5 files, which python glob.glob() can read. It has to end with '.h5'.
:type filePath: str
:param group: The path of the group in HDF5 file where data is, defaults to None. Please use '/', instead of '\\'
:type group: str, optional
:param datesetOfGlobal: A xarry dataSet stored the global parameters of the data, defaults to None
:type datesetOfGlobal: xarry DataSet, optional
:param preprocess: The function you want to run for each file after read before combination, defaults to None
:type preprocess: a handle to function, optional
:param join: over write of the same argument in xarray.open_mfdataset, defaults to "outer"
:type join: str, optional
:param parallel: over write of the same argument in xarray.open_mfdataset, defaults to True
:type parallel: bool, optional
:param engine: The engine to read HDF5 file, defaults to "h5netcdf"
:type engine: str, optional
:param phony_dims: Please read the introduction of h5netcdf package, defaults to "access"
:type phony_dims: str, optional
:param excludeAxis: The name of axes, whose value changes together with scan axes, defaults to []
:type excludeAxis: list, optional
:param maxFileNum: The maximal number of files to read, defaults to None
:type maxFileNum: int, optional
:return: A xarray dataSet contain the data read from specified HDF5 file, including scan axes and shot number.
:rtype: xarray DataSet
"""
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
kwargs.update(
{
'join': join,
'parallel': parallel,
'engine': engine,
'phony_dims': phony_dims,
'group': group
}
)
if datesetOfGlobal is None:
datesetOfGlobal = xr.open_mfdataset(
fullFilePath,
group="globals",
concat_dim="fileNum",
combine="nested",
preprocess=_read_shot_number_from_hdf5,
engine="h5netcdf",
phony_dims="access",
combine_attrs=_read_globals_attrs,
parallel=True, )
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
_assgin_scan_axis = partial(_assign_scan_axis_partial, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath)
if preprocess is None:
kwargs.update({'preprocess':_assgin_scan_axis})
else:
kwargs.update({'preprocess':preprocess})
ds = xr.open_mfdataset(fullFilePath, **kwargs)
newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)])
oldDimKey = np.sort(
[
key
for key in ds.dims
if not key in datesetOfGlobal.scanAxis
]
)
renameDict = {
oldDimKey[j]: newDimKey[j]
for j in range(len(oldDimKey))
}
ds = ds.rename_dims(renameDict)
ds.attrs = copy.deepcopy(datesetOfGlobal.attrs)
return ds
def _assign_scan_axis_partial_and_remove_everything(x, datesetOfGlobal, fullFilePath):
"""Find ONLY and add ONLY the scan axes to the data read from HDF5 file.
:param x: The data of current shot
:type x: xarray DataArray
:param datesetOfGlobal: The xarray DataSet stored the information of global parameters
:type datesetOfGlobal: xarray DataSet
:param fullFilePath: The full and absolute file path of current shot
:type fullFilePath: str
:return: The data of current shot with scan axes
:rtype: xarray DataArray
"""
scanAxis = datesetOfGlobal.scanAxis
filePath = x.encoding["source"].replace("\\", "/")
shotNum = np.where(fullFilePath==filePath)
shotNum = np.squeeze(shotNum)
runTime = _read_run_time_from_hdf5(x)
x = xr.Dataset(data_vars={'runTime':runTime})
x = x.expand_dims(list(scanAxis))
return x.assign_coords(
{
key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)])
for key in scanAxis
}
)
def _read_run_time_from_hdf5(x):
"""Find the run time of give data read from HDF5 file.
:param x: The data of current shot
:type x: xarray DataArray
:return: The data of current shot with last modification time
:rtype: xarray DataArray
"""
runTime = datetime.strptime(x.attrs['run time'], '%Y%m%dT%H%M%S')
return runTime
def read_hdf5_run_time(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
"""Read the run time from HDF5 files in given path.
:param filePath: The path of HDF5 files, which python glob.glob() can read. It has to end with '.h5'.
:type filePath: str
:param group: The path of the group in HDF5 file where run time is, defaults to None. Please use '/', instead of '\\'
:type group: str, optional
:param datesetOfGlobal: A xarry dataSet stored the global parameters of the data, defaults to None
:type datesetOfGlobal: xarry DataSet, optional
:param preprocess: The function you want to run for each file after read before combination, defaults to None
:type preprocess: a handle to function, optional
:param join: over write of the same argument in xarray.open_mfdataset, defaults to "outer"
:type join: str, optional
:param parallel: over write of the same argument in xarray.open_mfdataset, defaults to True
:type parallel: bool, optional
:param engine: The engine to read HDF5 file, defaults to "h5netcdf"
:type engine: str, optional
:param phony_dims: Please read the introduction of h5netcdf package, defaults to "access"
:type phony_dims: str, optional
:param excludeAxis: The name of axes, whose value changes together with scan axes, defaults to []
:type excludeAxis: list, optional
:param maxFileNum: The maximal number of files to read, defaults to None
:type maxFileNum: int, optional
:return: A xarray dataSet contain the data read from specified HDF5 file.
:rtype: xarray DataSet
"""
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
kwargs.update(
{
'join': join,
'parallel': parallel,
'engine': engine,
'phony_dims': phony_dims,
'group': group
}
)
if datesetOfGlobal is None:
datesetOfGlobal = xr.open_mfdataset(
fullFilePath,
group="globals",
concat_dim="fileNum",
combine="nested",
preprocess=_read_shot_number_from_hdf5,
engine="h5netcdf",
phony_dims="access",
combine_attrs=_read_globals_attrs,
parallel=True, )
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
_assgin_scan_axis = partial(_assign_scan_axis_partial_and_remove_everything, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath)
if preprocess is None:
kwargs.update({'preprocess':_assgin_scan_axis})
else:
kwargs.update({'preprocess':preprocess})
ds = xr.open_mfdataset(fullFilePath, **kwargs)
newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)])
oldDimKey = np.sort(
[
key
for key in ds.dims
if not key in datesetOfGlobal.scanAxis
]
)
renameDict = {
oldDimKey[j]: newDimKey[j]
for j in range(len(oldDimKey))
}
ds = ds.rename_dims(renameDict)
ds.attrs = copy.deepcopy(datesetOfGlobal.attrs)
return ds
def read_hdf5_global(filePath, preprocess=None, join="outer", combine="nested", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
"""Read the global parameters and find scan axes, from HDF5 files in given path.
:param filePath: The path of HDF5 files, which python glob.glob() can read. It has to end with '.h5'.
:type filePath: str
:param preprocess: The function you want to run for each file after read before combination, defaults to None
:type preprocess: a handle to function, optional
:param join: over write of the same argument in xarray.open_mfdataset, defaults to "outer"
:type join: str, optional
:param combine: over write of the same argument in xarray.open_mfdataset, defaults to "nested"
:type combine: str, optional
:param parallel: over write of the same argument in xarray.open_mfdataset, defaults to True
:type parallel: bool, optional
:param engine: The engine to read HDF5 file, defaults to "h5netcdf"
:type engine: str, optional
:param phony_dims: Please read the introduction of h5netcdf package, defaults to "access"
:type phony_dims: str, optional
:param excludeAxis: The name of axes, whose value changes together with scan axes, defaults to []
:type excludeAxis: list, optional
:param maxFileNum: The maximal number of files to read, defaults to None
:type maxFileNum: int, optional
:return: A xarray dataSet contain the data read from specified HDF5 file.
:rtype: xarray DataSet
"""
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
kwargs.update(
{
'join': join,
'parallel': parallel,
'engine': engine,
'phony_dims': phony_dims,
'group': "globals",
'preprocess': _read_shot_number_from_hdf5,
'combine_attrs': _read_globals_attrs,
'combine':combine,
'concat_dim': "fileNum",
}
)
datesetOfGlobal = xr.open_mfdataset(fullFilePath, **kwargs)
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
return datesetOfGlobal
def _read_csv_file_pandas(filePath, **kwargs):
"""Read csv file using pandas package function read_csv()
:param filePath:The path of csv files.
:type filePath: str
:return: A xarray DataSet stored the data
:rtype: xarray DataSet
"""
res = pd.read_csv(filePath, **kwargs)
res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
return res
def _read_csv_file_dask(filePath, **kwargs):
"""Read csv file using dask package function read_csv()
:param filePath:The path of csv files.
:type filePath: str
:return: A xarray DataSet stored the data
:rtype: xarray DataSet
"""
res = df.read_csv(filePath, **kwargs)
res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
return res
def read_csv_file(filePath, maxFileNum=None, dask='parallelized', vectorize=True, csvEngine='pandas', daskKwargs={}, csvKwargs={}, **kwargs):
"""Read the data from csv files in given path.
:param filePath: The path of csv files, which python glob.glob() can read. It has to end with '.csv'.
:type filePath: str
:param maxFileNum: The maximal number of files to read, defaults to None
:type maxFileNum: int, optional
:param dask: over write of the same argument in xarray.apply_ufunc, defaults to 'parallelized'
:type dask: str, optional
:param vectorize: over write of the same argument in xarray.apply_ufunc, defaults to True
:type vectorize: bool, optional
:param csvEngine: The engine to read csv file, defaults to 'pandas'
:type csvEngine: str, optional
:param daskKwargs: over write of the same argument in xarray.apply_ufunc, defaults to {}
:type daskKwargs: dict, optional
:param csvKwargs: The kwargs send to csvEngine, defaults to {}
:type csvKwargs: dict, optional
:return: A xarray DataSet stored the data
:rtype: xarray DataSet
"""
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
if csvEngine=='pandas':
res_first = pd.read_csv(fullFilePath[0], **csvKwargs)
elif csvEngine=='dask':
res_first = df.read_csv(fullFilePath[0], **csvKwargs)
res_first = xr.Dataset.from_dataframe(res_first)
data_vars = list(res_first.keys())
# print(data_vars)
# print(np.shape(data_vars)[1])
if len(np.shape(data_vars)) > 1:
data_vars = np.array(
[
''.join(data_vars[i])
for i in range(np.shape(data_vars)[0])
]
)
fullFilePath = xr.DataArray(
data=fullFilePath,
dims=['fileIndex']
)
newDimKey = np.append(['data_vars'], list(res_first.dims.keys()))
newDimKey = np.append(newDimKey, ['x', 'y', 'z'])
newDimKey = np.append(newDimKey, [ chr(i) for i in range(97, 97+23)])
kwargs.update(
{
'dask': dask,
'vectorize': vectorize,
'output_core_dims': [newDimKey[0:len(res_first.dims) + 1]],
"dask_gufunc_kwargs": daskKwargs,
}
)
if csvEngine=='pandas':
res = xr.apply_ufunc(_read_csv_file_pandas, fullFilePath, kwargs=csvKwargs, **kwargs)
elif csvEngine=='dask':
res = xr.apply_ufunc(_read_csv_file_dask, fullFilePath, kwargs=csvKwargs, **kwargs)
res = res.assign_coords({'data_vars': data_vars})
res = res.to_dataset(dim='data_vars')
for key in list(res_first.coords.keys()):
res = res.assign_coords({key: res_first[key]})
return res