You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

573 lines
19 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. import xarray as xr
  2. import dask.dataframe as df
  3. import pandas as pd
  4. import numpy as np
  5. from collections import OrderedDict
  6. from functools import partial
  7. import copy
  8. import glob
  9. import os
  10. from datetime import datetime
  11. def _read_globals_attrs(variable_attrs, context=None):
  12. """Find global parameters of shots, including scan axes.
  13. :param variable_attrs: The attrs of current shot.
  14. :type variable_attrs: dict
  15. :param context: _description_, defaults to None
  16. :type context: _type_, optional
  17. :return: The globals attrs of the whole shot.
  18. :rtype: dict
  19. """
  20. # Combine attributes from different variables according to combine_attrs
  21. if not variable_attrs:
  22. # no attributes to merge
  23. return None
  24. from xarray.core.utils import equivalent
  25. result = {}
  26. dropped_attrs = OrderedDict()
  27. for attrs in variable_attrs:
  28. result.update(
  29. {
  30. key: value
  31. for key, value in attrs.items()
  32. if key not in result and key not in dropped_attrs.keys()
  33. }
  34. )
  35. result = {
  36. key: value
  37. for key, value in result.items()
  38. if key not in attrs or equivalent(attrs[key], value)
  39. }
  40. dropped_attrs.update(
  41. {
  42. key: []
  43. for key in attrs if key not in result
  44. }
  45. )
  46. for attrs in variable_attrs:
  47. dropped_attrs.update(
  48. {
  49. key: np.append(dropped_attrs[key], attrs[key])
  50. for key in dropped_attrs.keys()
  51. }
  52. )
  53. scan_attrs = OrderedDict()
  54. scan_length = []
  55. for attrs_key in dropped_attrs.keys():
  56. flag = True
  57. for key in scan_attrs.keys():
  58. if equivalent(scan_attrs[key], dropped_attrs[attrs_key]):
  59. flag = False
  60. result.update({attrs_key: key})
  61. break
  62. if flag:
  63. scan_attrs.update({
  64. attrs_key: dropped_attrs[attrs_key]
  65. })
  66. scan_length = np.append(scan_length, len(dropped_attrs[attrs_key]))
  67. result.update(
  68. {
  69. key: value
  70. for key, value in scan_attrs.items()
  71. }
  72. )
  73. result.update(
  74. {
  75. "scanAxis": list(scan_attrs.keys()),
  76. "scanAxisLength": scan_length,
  77. }
  78. )
  79. # if result['scanAxis'] == []:
  80. # result['scanAxis'] = ['runs',]
  81. return result
  82. def _read_shot_number_from_hdf5(x):
  83. """Add the current shot number to the data read from HDF5 file.
  84. :param x: The data of current shot
  85. :type x: xarray DataArray
  86. :return: The data with current shot number
  87. :rtype: xarray DataArray
  88. """
  89. filePath = x.encoding["source"]
  90. shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0]
  91. return x.assign(shotNum=shotNum)
  92. def _assign_scan_axis_partial(x, datesetOfGlobal, fullFilePath):
  93. """Find and add the scan axes to the data read from HDF5 file.
  94. :param x: The data of current shot
  95. :type x: xarray DataArray
  96. :param datesetOfGlobal: The xarray DataSet stored the information of global parameters
  97. :type datesetOfGlobal: xarray DataSet
  98. :param fullFilePath: The full and absolute file path of current shot
  99. :type fullFilePath: str
  100. :return: The data of current shot with scan axes
  101. :rtype: xarray DataArray
  102. """
  103. scanAxis = datesetOfGlobal.scanAxis
  104. filePath = x.encoding["source"].replace("\\", "/")
  105. shotNum = np.where(fullFilePath==filePath)
  106. shotNum = np.squeeze(shotNum)
  107. # shotNum = filePath.split("_")[-1].split("_")[-1].split(".")[0]
  108. x = x.assign(shotNum=filePath.split("_")[-1].split("_")[-1].split(".")[0])
  109. x = x.expand_dims(list(scanAxis))
  110. return x.assign_coords(
  111. {
  112. key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)])
  113. for key in scanAxis
  114. }
  115. )
  116. def _update_globals_attrs(variable_attrs, context=None):
  117. # for live plot panel
  118. pass
  119. def update_hdf5_file():
  120. # for live plot panel
  121. pass
  122. def read_hdf5_file(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
  123. """Read the data from HDF5 files in given path.
  124. :param filePath: The path of HDF5 files, which python glob.glob() can read. It has to end with '.h5'.
  125. :type filePath: str
  126. :param group: The path of the group in HDF5 file where data is, defaults to None. Please use '/', instead of '\\'
  127. :type group: str, optional
  128. :param datesetOfGlobal: A xarry dataSet stored the global parameters of the data, defaults to None
  129. :type datesetOfGlobal: xarry DataSet, optional
  130. :param preprocess: The function you want to run for each file after read before combination, defaults to None
  131. :type preprocess: a handle to function, optional
  132. :param join: over write of the same argument in xarray.open_mfdataset, defaults to "outer"
  133. :type join: str, optional
  134. :param parallel: over write of the same argument in xarray.open_mfdataset, defaults to True
  135. :type parallel: bool, optional
  136. :param engine: The engine to read HDF5 file, defaults to "h5netcdf"
  137. :type engine: str, optional
  138. :param phony_dims: Please read the introduction of h5netcdf package, defaults to "access"
  139. :type phony_dims: str, optional
  140. :param excludeAxis: The name of axes, whose value changes together with scan axes, defaults to []
  141. :type excludeAxis: list, optional
  142. :param maxFileNum: The maximal number of files to read, defaults to None
  143. :type maxFileNum: int, optional
  144. :return: A xarray dataSet contain the data read from specified HDF5 file, including scan axes and shot number.
  145. :rtype: xarray DataSet
  146. """
  147. filePath = np.sort(np.atleast_1d(filePath))
  148. filePathAbs = []
  149. for i in range(len(filePath)):
  150. filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
  151. fullFilePath = []
  152. for i in range(len(filePathAbs)):
  153. fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
  154. fullFilePath = np.array(fullFilePath).flatten()
  155. for i in range(len(fullFilePath)):
  156. fullFilePath[i] = fullFilePath[i].replace("\\", "/")
  157. if not maxFileNum is None:
  158. fullFilePath = fullFilePath[0:int(maxFileNum)]
  159. kwargs.update(
  160. {
  161. 'join': join,
  162. 'parallel': parallel,
  163. 'engine': engine,
  164. 'phony_dims': phony_dims,
  165. 'group': group
  166. }
  167. )
  168. if datesetOfGlobal is None:
  169. datesetOfGlobal = xr.open_mfdataset(
  170. fullFilePath,
  171. group="globals",
  172. concat_dim="fileNum",
  173. combine="nested",
  174. preprocess=_read_shot_number_from_hdf5,
  175. engine="h5netcdf",
  176. phony_dims="access",
  177. combine_attrs=_read_globals_attrs,
  178. parallel=True, )
  179. datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
  180. _assgin_scan_axis = partial(_assign_scan_axis_partial, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath)
  181. if preprocess is None:
  182. kwargs.update({'preprocess':_assgin_scan_axis})
  183. else:
  184. kwargs.update({'preprocess':preprocess})
  185. ds = xr.open_mfdataset(fullFilePath, **kwargs)
  186. newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)])
  187. oldDimKey = np.sort(
  188. [
  189. key
  190. for key in ds.dims
  191. if not key in datesetOfGlobal.scanAxis
  192. ]
  193. )
  194. renameDict = {
  195. oldDimKey[j]: newDimKey[j]
  196. for j in range(len(oldDimKey))
  197. }
  198. ds = ds.rename_dims(renameDict)
  199. ds.attrs = copy.deepcopy(datesetOfGlobal.attrs)
  200. return ds
  201. def _assign_scan_axis_partial_and_remove_everything(x, datesetOfGlobal, fullFilePath):
  202. """Find ONLY and add ONLY the scan axes to the data read from HDF5 file.
  203. :param x: The data of current shot
  204. :type x: xarray DataArray
  205. :param datesetOfGlobal: The xarray DataSet stored the information of global parameters
  206. :type datesetOfGlobal: xarray DataSet
  207. :param fullFilePath: The full and absolute file path of current shot
  208. :type fullFilePath: str
  209. :return: The data of current shot with scan axes
  210. :rtype: xarray DataArray
  211. """
  212. scanAxis = datesetOfGlobal.scanAxis
  213. filePath = x.encoding["source"].replace("\\", "/")
  214. shotNum = np.where(fullFilePath==filePath)
  215. shotNum = np.squeeze(shotNum)
  216. runTime = _read_run_time_from_hdf5(x)
  217. x = xr.Dataset(data_vars={'runTime':runTime})
  218. x = x.expand_dims(list(scanAxis))
  219. return x.assign_coords(
  220. {
  221. key: np.atleast_1d(np.atleast_1d(datesetOfGlobal.attrs[key])[int(shotNum)])
  222. for key in scanAxis
  223. }
  224. )
  225. def _read_run_time_from_hdf5(x):
  226. """Find the run time of give data read from HDF5 file.
  227. :param x: The data of current shot
  228. :type x: xarray DataArray
  229. :return: The data of current shot with last modification time
  230. :rtype: xarray DataArray
  231. """
  232. runTime = datetime.strptime(x.attrs['run time'], '%Y%m%dT%H%M%S')
  233. return runTime
  234. def read_hdf5_run_time(filePath, group=None, datesetOfGlobal=None, preprocess=None, join="outer", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
  235. """Read the run time from HDF5 files in given path.
  236. :param filePath: The path of HDF5 files, which python glob.glob() can read. It has to end with '.h5'.
  237. :type filePath: str
  238. :param group: The path of the group in HDF5 file where run time is, defaults to None. Please use '/', instead of '\\'
  239. :type group: str, optional
  240. :param datesetOfGlobal: A xarry dataSet stored the global parameters of the data, defaults to None
  241. :type datesetOfGlobal: xarry DataSet, optional
  242. :param preprocess: The function you want to run for each file after read before combination, defaults to None
  243. :type preprocess: a handle to function, optional
  244. :param join: over write of the same argument in xarray.open_mfdataset, defaults to "outer"
  245. :type join: str, optional
  246. :param parallel: over write of the same argument in xarray.open_mfdataset, defaults to True
  247. :type parallel: bool, optional
  248. :param engine: The engine to read HDF5 file, defaults to "h5netcdf"
  249. :type engine: str, optional
  250. :param phony_dims: Please read the introduction of h5netcdf package, defaults to "access"
  251. :type phony_dims: str, optional
  252. :param excludeAxis: The name of axes, whose value changes together with scan axes, defaults to []
  253. :type excludeAxis: list, optional
  254. :param maxFileNum: The maximal number of files to read, defaults to None
  255. :type maxFileNum: int, optional
  256. :return: A xarray dataSet contain the data read from specified HDF5 file.
  257. :rtype: xarray DataSet
  258. """
  259. filePath = np.sort(np.atleast_1d(filePath))
  260. filePathAbs = []
  261. for i in range(len(filePath)):
  262. filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
  263. fullFilePath = []
  264. for i in range(len(filePathAbs)):
  265. fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
  266. fullFilePath = np.array(fullFilePath).flatten()
  267. for i in range(len(fullFilePath)):
  268. fullFilePath[i] = fullFilePath[i].replace("\\", "/")
  269. if not maxFileNum is None:
  270. fullFilePath = fullFilePath[0:int(maxFileNum)]
  271. kwargs.update(
  272. {
  273. 'join': join,
  274. 'parallel': parallel,
  275. 'engine': engine,
  276. 'phony_dims': phony_dims,
  277. 'group': group
  278. }
  279. )
  280. if datesetOfGlobal is None:
  281. datesetOfGlobal = xr.open_mfdataset(
  282. fullFilePath,
  283. group="globals",
  284. concat_dim="fileNum",
  285. combine="nested",
  286. preprocess=_read_shot_number_from_hdf5,
  287. engine="h5netcdf",
  288. phony_dims="access",
  289. combine_attrs=_read_globals_attrs,
  290. parallel=True, )
  291. datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
  292. _assgin_scan_axis = partial(_assign_scan_axis_partial_and_remove_everything, datesetOfGlobal=datesetOfGlobal, fullFilePath=fullFilePath)
  293. if preprocess is None:
  294. kwargs.update({'preprocess':_assgin_scan_axis})
  295. else:
  296. kwargs.update({'preprocess':preprocess})
  297. ds = xr.open_mfdataset(fullFilePath, **kwargs)
  298. newDimKey = np.append(['x', 'y', 'z'], [ chr(i) for i in range(97, 97+23)])
  299. oldDimKey = np.sort(
  300. [
  301. key
  302. for key in ds.dims
  303. if not key in datesetOfGlobal.scanAxis
  304. ]
  305. )
  306. renameDict = {
  307. oldDimKey[j]: newDimKey[j]
  308. for j in range(len(oldDimKey))
  309. }
  310. ds = ds.rename_dims(renameDict)
  311. ds.attrs = copy.deepcopy(datesetOfGlobal.attrs)
  312. return ds
  313. def read_hdf5_global(filePath, preprocess=None, join="outer", combine="nested", parallel=True, engine="h5netcdf", phony_dims="access", excludeAxis=[], maxFileNum=None, **kwargs):
  314. """Read the global parameters and find scan axes, from HDF5 files in given path.
  315. :param filePath: The path of HDF5 files, which python glob.glob() can read. It has to end with '.h5'.
  316. :type filePath: str
  317. :param preprocess: The function you want to run for each file after read before combination, defaults to None
  318. :type preprocess: a handle to function, optional
  319. :param join: over write of the same argument in xarray.open_mfdataset, defaults to "outer"
  320. :type join: str, optional
  321. :param combine: over write of the same argument in xarray.open_mfdataset, defaults to "nested"
  322. :type combine: str, optional
  323. :param parallel: over write of the same argument in xarray.open_mfdataset, defaults to True
  324. :type parallel: bool, optional
  325. :param engine: The engine to read HDF5 file, defaults to "h5netcdf"
  326. :type engine: str, optional
  327. :param phony_dims: Please read the introduction of h5netcdf package, defaults to "access"
  328. :type phony_dims: str, optional
  329. :param excludeAxis: The name of axes, whose value changes together with scan axes, defaults to []
  330. :type excludeAxis: list, optional
  331. :param maxFileNum: The maximal number of files to read, defaults to None
  332. :type maxFileNum: int, optional
  333. :return: A xarray dataSet contain the data read from specified HDF5 file.
  334. :rtype: xarray DataSet
  335. """
  336. filePath = np.sort(np.atleast_1d(filePath))
  337. filePathAbs = []
  338. for i in range(len(filePath)):
  339. filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
  340. fullFilePath = []
  341. for i in range(len(filePathAbs)):
  342. fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
  343. fullFilePath = np.array(fullFilePath).flatten()
  344. for i in range(len(fullFilePath)):
  345. fullFilePath[i] = fullFilePath[i].replace("\\", "/")
  346. if not maxFileNum is None:
  347. fullFilePath = fullFilePath[0:int(maxFileNum)]
  348. kwargs.update(
  349. {
  350. 'join': join,
  351. 'parallel': parallel,
  352. 'engine': engine,
  353. 'phony_dims': phony_dims,
  354. 'group': "globals",
  355. 'preprocess': _read_shot_number_from_hdf5,
  356. 'combine_attrs': _read_globals_attrs,
  357. 'combine':combine,
  358. 'concat_dim': "fileNum",
  359. }
  360. )
  361. datesetOfGlobal = xr.open_mfdataset(fullFilePath, **kwargs)
  362. datesetOfGlobal.attrs['scanAxis'] = np.setdiff1d(datesetOfGlobal.attrs['scanAxis'], excludeAxis)
  363. return datesetOfGlobal
  364. def _read_csv_file_pandas(filePath, **kwargs):
  365. """Read csv file using pandas package function read_csv()
  366. :param filePath:The path of csv files.
  367. :type filePath: str
  368. :return: A xarray DataSet stored the data
  369. :rtype: xarray DataSet
  370. """
  371. res = pd.read_csv(filePath, **kwargs)
  372. res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
  373. return res
  374. def _read_csv_file_dask(filePath, **kwargs):
  375. """Read csv file using dask package function read_csv()
  376. :param filePath:The path of csv files.
  377. :type filePath: str
  378. :return: A xarray DataSet stored the data
  379. :rtype: xarray DataSet
  380. """
  381. res = df.read_csv(filePath, **kwargs)
  382. res = xr.Dataset.from_dataframe(res).to_array().to_numpy()
  383. return res
  384. def read_csv_file(filePath, maxFileNum=None, dask='parallelized', vectorize=True, csvEngine='pandas', daskKwargs={}, csvKwargs={}, **kwargs):
  385. """Read the data from csv files in given path.
  386. :param filePath: The path of csv files, which python glob.glob() can read. It has to end with '.csv'.
  387. :type filePath: str
  388. :param maxFileNum: The maximal number of files to read, defaults to None
  389. :type maxFileNum: int, optional
  390. :param dask: over write of the same argument in xarray.apply_ufunc, defaults to 'parallelized'
  391. :type dask: str, optional
  392. :param vectorize: over write of the same argument in xarray.apply_ufunc, defaults to True
  393. :type vectorize: bool, optional
  394. :param csvEngine: The engine to read csv file, defaults to 'pandas'
  395. :type csvEngine: str, optional
  396. :param daskKwargs: over write of the same argument in xarray.apply_ufunc, defaults to {}
  397. :type daskKwargs: dict, optional
  398. :param csvKwargs: The kwargs send to csvEngine, defaults to {}
  399. :type csvKwargs: dict, optional
  400. :return: A xarray DataSet stored the data
  401. :rtype: xarray DataSet
  402. """
  403. filePath = np.sort(np.atleast_1d(filePath))
  404. filePathAbs = []
  405. for i in range(len(filePath)):
  406. filePathAbs.append(os.path.abspath(filePath[i]).replace("\\", "/"))
  407. fullFilePath = []
  408. for i in range(len(filePathAbs)):
  409. fullFilePath.append(list(np.sort(glob.glob(filePathAbs[i]))))
  410. fullFilePath = np.array(fullFilePath).flatten()
  411. for i in range(len(fullFilePath)):
  412. fullFilePath[i] = fullFilePath[i].replace("\\", "/")
  413. if not maxFileNum is None:
  414. fullFilePath = fullFilePath[0:int(maxFileNum)]
  415. if csvEngine=='pandas':
  416. res_first = pd.read_csv(fullFilePath[0], **csvKwargs)
  417. elif csvEngine=='dask':
  418. res_first = df.read_csv(fullFilePath[0], **csvKwargs)
  419. res_first = xr.Dataset.from_dataframe(res_first)
  420. data_vars = list(res_first.keys())
  421. # print(data_vars)
  422. # print(np.shape(data_vars)[1])
  423. if len(np.shape(data_vars)) > 1:
  424. data_vars = np.array(
  425. [
  426. ''.join(data_vars[i])
  427. for i in range(np.shape(data_vars)[0])
  428. ]
  429. )
  430. fullFilePath = xr.DataArray(
  431. data=fullFilePath,
  432. dims=['fileIndex']
  433. )
  434. newDimKey = np.append(['data_vars'], list(res_first.dims.keys()))
  435. newDimKey = np.append(newDimKey, ['x', 'y', 'z'])
  436. newDimKey = np.append(newDimKey, [ chr(i) for i in range(97, 97+23)])
  437. kwargs.update(
  438. {
  439. 'dask': dask,
  440. 'vectorize': vectorize,
  441. 'output_core_dims': [newDimKey[0:len(res_first.dims) + 1]],
  442. "dask_gufunc_kwargs": daskKwargs,
  443. }
  444. )
  445. if csvEngine=='pandas':
  446. res = xr.apply_ufunc(_read_csv_file_pandas, fullFilePath, kwargs=csvKwargs, **kwargs)
  447. elif csvEngine=='dask':
  448. res = xr.apply_ufunc(_read_csv_file_dask, fullFilePath, kwargs=csvKwargs, **kwargs)
  449. res = res.assign_coords({'data_vars': data_vars})
  450. res = res.to_dataset(dim='data_vars')
  451. for key in list(res_first.coords.keys()):
  452. res = res.assign_coords({key: res_first[key]})
  453. return res