implement reading from csv

This commit is contained in:
Jianshun Gao 2023-05-22 19:35:09 +02:00
parent 9e6d3a8230
commit 43106834a5
2 changed files with 94 additions and 2 deletions

View File

@ -284,7 +284,7 @@ class FitAnalyser():
self.fitDim = fitDim self.fitDim = fitDim
def print_params_set_templat(self, params=None): def print_params_set_template(self, params=None):
if params is None: if params is None:
params = self.fitModel.make_params() params = self.fitModel.make_params()

View File

@ -1,4 +1,6 @@
import xarray as xr import xarray as xr
import dask.dataframe as df
import pandas as pd
import numpy as np import numpy as np
from collections import OrderedDict from collections import OrderedDict
from functools import partial from functools import partial
@ -188,6 +190,7 @@ def read_hdf5_file(filePath, group=None, datesetOfGlobal=None, preprocess=None,
return ds return ds
def _assign_scan_axis_partial_and_remove_everything(x, datesetOfGlobal, fullFilePath): def _assign_scan_axis_partial_and_remove_everything(x, datesetOfGlobal, fullFilePath):
scanAxis = datesetOfGlobal.scanAxis scanAxis = datesetOfGlobal.scanAxis
filePath = x.encoding["source"].replace("\\", "/") filePath = x.encoding["source"].replace("\\", "/")
@ -208,6 +211,7 @@ def _read_run_time_from_hdf5(x):
runTime = datetime.strptime(x.attrs['run time'], '%Y%m%dT%H%M%S') runTime = datetime.strptime(x.attrs['run time'], '%Y%m%dT%H%M%S')
return runTime return runTime
def read_hdf5_run_time(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs): def read_hdf5_run_time(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
filePath = np.sort(np.atleast_1d(filePath)) filePath = np.sort(np.atleast_1d(filePath))
@ -282,6 +286,7 @@ def read_hdf5_run_time(filePath, group=None, datesetOfGlobal=None, preprocess=No
return ds return ds
def read_hdf5_global(filePath, preprocess=None, join="outer", combine="nested", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs): def read_hdf5_global(filePath, preprocess=None, join="outer", combine="nested", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
filePath = np.sort(np.atleast_1d(filePath)) filePath = np.sort(np.atleast_1d(filePath))
@ -321,4 +326,91 @@ def read_hdf5_global(filePath, preprocess=None, join="outer", combine="nested",
datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis) datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
return datesetOfGlobal return datesetOfGlobal
def _read_csv_file_pandas(filePath, **kwargs):
res = pd.read_csv(filePath, **kwargs)
res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
return res
def _read_csv_file_dask(filePath, **kwargs):
res = df.read_csv(filePath, **kwargs)
res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
return res
def read_csv_file(filePath, maxFileNum=None, dask='parallelized', vectorize=True, csvEngine='pandas', daskKwargs={}, csvKwargs={}, **kwargs):
filePath = np.sort(np.atleast_1d(filePath))
filePathAbs = []
for i in range(len(filePath)):
filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
fullFilePath = []
for i in range(len(filePathAbs)):
fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
fullFilePath = np.array(fullFilePath).flatten()
for i in range(len(fullFilePath)):
fullFilePath[i] = fullFilePath[i].replace("\\", "/")
if not maxFileNum is None:
fullFilePath = fullFilePath[0:int(maxFileNum)]
if csvEngine=='pandas':
res_first = pd.read_csv(fullFilePath[0], **csvKwargs)
elif csvEngine=='dask':
res_first = df.read_csv(fullFilePath[0], **csvKwargs)
res_first = xr.Dataset.from_dataframe(res_first)
data_vars = list(res_first.keys())
# print(data_vars)
# print(np.shape(data_vars)[1])
if len(np.shape(data_vars)) > 1:
data_vars = np.array(
[
''.join(data_vars[i])
for i in range(np.shape(data_vars)[0])
]
)
fullFilePath = xr.DataArray(
data=fullFilePath,
dims=['fileIndex']
)
newDimKey = np.append(['data_vars'], list(res_first.dims.keys()))
newDimKey = np.append(newDimKey, ['x', 'y', 'z'])
newDimKey = np.append(newDimKey, [ chr(i) for i in range(97, 97+23)])
kwargs.update(
{
'dask': dask,
'vectorize': vectorize,
'output_core_dims': [newDimKey[0:len(res_first.dims) + 1]],
"dask_gufunc_kwargs": daskKwargs,
}
)
if csvEngine=='pandas':
res = xr.apply_ufunc(_read_csv_file_pandas, fullFilePath, kwargs=csvKwargs, **kwargs)
elif csvEngine=='dask':
res = xr.apply_ufunc(_read_csv_file_dask, fullFilePath, kwargs=csvKwargs, **kwargs)
res = res.assign_coords({'data_vars': data_vars})
res = res.to_dataset(dim='data_vars')
for key in list(res_first.coords.keys()):
res = res.assign_coords({key: res_first[key]})
return res