You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

557 lines
18 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. from collections import OrderedDict
  2. import numpy as np
  3. import pymongo
  4. import xarray_mongodb
  5. import bson
  6. import builtins
  7. import xarray as xr
  8. from ToolFunction.ToolFunction import get_date
  9. npTypeDict = {v: getattr(builtins, k) for k, v in np.sctypeDict.items() if k in vars(builtins)}
  10. npArrayType = type(np.array([0]))
  11. class MongoDB:
  12. """A class for communicate with our MongoDB.
  13. """
  14. def __init__(self, mongoClient, mongoDB, date=None) -> None:
  15. """Initialize the class with given handle to our MongoDB client and database.
  16. :param mongoClient: The handle to MongoDB client
  17. :type mongoClient: pymongo MongoDB client or other MongoDB client
  18. :param mongoDB: The handle to MongoDB database
  19. :type mongoDB: pymongo MongoDB database or other MongoDB database
  20. :param date: The date of data ('YYYY/MM/DD'), can be setted later, defaults to None
  21. :type date: str, optional
  22. """
  23. self.mongoClient = mongoClient
  24. self.mongoDB = mongoDB
  25. self.xdb = xarray_mongodb.XarrayMongoDB(mongoDB)
  26. if date is None:
  27. date= get_date()
  28. self.set_date(date)
  29. def _convert_numpy_type(self, data):
  30. """Convert from numpy type to normal python type.
  31. :param data: The data need to be converted
  32. :type data: numpy data type
  33. :return: The converted data
  34. :rtype: normal python data type
  35. """
  36. for key in data:
  37. typeKey = type(data[key])
  38. if typeKey in npTypeDict:
  39. data[key] = data[key].item()
  40. elif typeKey == npArrayType:
  41. data[key] = data[key].tolist()
  42. else:
  43. try:
  44. data[key] = data[key].item()
  45. except:
  46. pass
  47. return data
  48. def _break_dataSet(self, dataSet, scanAxis=None):
  49. """Stack the scan axes of data
  50. :param dataSet: The xarray DataSet or DataArray stored the data
  51. :type dataSet: xarray DataSet or DataArray
  52. :param scanAxis: a list of the name of scan axes, defaults to None
  53. :type scanAxis: list or array like, optional
  54. :return: The stacked xarray DataSet or DataArray stored the data
  55. :rtype: xarray DataSet or DataArray
  56. """
  57. if scanAxis is None:
  58. scanAxis = dataSet.attrs['scanAxis']
  59. dataArray = dataSet.shotNum
  60. stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis))
  61. return stackedDataArray
  62. def set_date(self, date):
  63. """Set the date of data
  64. :param date: The date of data ('YYYY/MM/DD')
  65. :type date: str
  66. """
  67. date = date.split("/")
  68. self.year = int(date[0])
  69. self.month = int(date[1])
  70. self.day = int(date[2])
  71. def create_global(self, shotNum, dataSet=None, date=None):
  72. """Creat a the global document in MongoDB
  73. :param shotNum: The shot number
  74. :type shotNum: str
  75. :param dataSet: The xarray DataSet stored the global parameters, defaults to None
  76. :type dataSet: xarray DataSet, optional
  77. :param date: the date of the data, defaults to None
  78. :type date: str, optional
  79. """
  80. if not date is None:
  81. self.set_date(date)
  82. data = {
  83. 'year': self.year,
  84. 'month': self.month,
  85. 'day': self.day,
  86. 'shotNum': shotNum,
  87. }
  88. self.mongoDB['global'].delete_many(data)
  89. data = {
  90. 'year': self.year,
  91. 'month': self.month,
  92. 'day': self.day,
  93. 'shotNum': shotNum,
  94. 'runNum': 0,
  95. 'global_parameters' : {},
  96. }
  97. global_parameters = self._convert_numpy_type(dataSet.attrs)
  98. if not dataSet is None:
  99. data['global_parameters'].update(global_parameters)
  100. data = self._convert_numpy_type(data)
  101. if 'scanAxis' in dataSet.attrs:
  102. del data['global_parameters']['scanAxis']
  103. del data['global_parameters']['scanAxisLength']
  104. scanAxis = dataSet.attrs['scanAxis']
  105. data['global_parameters'].update(
  106. {
  107. key:0
  108. for key in scanAxis
  109. }
  110. )
  111. stackedDataArray = self._break_dataSet(dataSet)
  112. try:
  113. stackedDataArray.load()
  114. except:
  115. pass
  116. stackedDataArray = stackedDataArray.groupby('_scanAxis')
  117. for i in stackedDataArray:
  118. stackedDataArray_single = i[1]
  119. data.update(
  120. {
  121. 'runNum': int(stackedDataArray_single.item())
  122. }
  123. )
  124. data['global_parameters'].update(
  125. {
  126. key: stackedDataArray_single[key].item()
  127. for key in scanAxis
  128. }
  129. )
  130. if '_id' in data:
  131. del data['_id']
  132. self.mongoDB['global'].insert_one(data)
  133. else:
  134. self.mongoDB['global'].insert_one(data)
  135. def _add_data_normal(self, shotNum, runNum, data):
  136. """Write the data directly to the global document
  137. :param shotNum: The shot number
  138. :type shotNum: str
  139. :param runNum: The run number
  140. :type runNum: int
  141. :param data: The data to be written
  142. :type data: normal python data type
  143. """
  144. if runNum is None:
  145. runNum = 0
  146. filter = {
  147. 'year': self.year,
  148. 'month': self.month,
  149. 'day': self.day,
  150. 'shotNum': shotNum,
  151. 'runNum': runNum,
  152. }
  153. self.mongoDB['global'].update_one(filter, {"$set": data}, upsert=False)
  154. def _add_data_xarray_dataArray(self, shotNum, dataArray, name=None, scanAxis=None):
  155. """Write the data in a type of xarray DataArray to the MongoDb.
  156. :param shotNum: The shot number
  157. :type shotNum: str
  158. :param dataArray: The xarray DataArray to be written
  159. :type dataArray: xarray DataArray
  160. :param name: The name of the DataArray, defaults to None
  161. :type name: str, optional
  162. :param scanAxis: The scan axes of the data, defaults to None
  163. :type scanAxis: array like, optional
  164. """
  165. if scanAxis is None:
  166. scanAxis = list(dataArray.coords)
  167. dataArray.attrs = self._convert_numpy_type(dataArray.attrs)
  168. stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis))
  169. stackedDataArray = stackedDataArray.groupby('_scanAxis')
  170. filter = {
  171. 'year': self.year,
  172. 'month': self.month,
  173. 'day': self.day,
  174. 'shotNum': shotNum,
  175. }
  176. for i in stackedDataArray:
  177. stackedDataArray_single = i[1].drop('_scanAxis')
  178. global_parameters = {
  179. 'global_parameters.' + key: stackedDataArray_single[key].item()
  180. for key in scanAxis
  181. }
  182. filter.update(global_parameters)
  183. mongoID, _ = self.xdb.put(stackedDataArray_single)
  184. data_label = {
  185. dataArray.name:
  186. {
  187. 'name': dataArray.name,
  188. 'mongoID': mongoID,
  189. 'engine': 'xarray',
  190. 'dtype': 'dataArray',
  191. }
  192. }
  193. self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
  194. def _add_data_xarray_dataSet(self, shotNum, dataSet, name, scanAxis=None):
  195. """Write the data in a type of xarray DataSet to the MongoDb.
  196. :param shotNum: The shot number
  197. :type shotNum: str
  198. :param dataSet: The xarray DataSet to be written
  199. :type dataSet: xarray DataSet
  200. :param name: The name of the DataArray, defaults to None
  201. :type name: str, optional
  202. :param scanAxis: The scan axes of the data, defaults to None
  203. :type scanAxis: array like, optional
  204. """
  205. if scanAxis is None:
  206. scanAxis = list(dataSet.coords)
  207. dataSet.attrs = self._convert_numpy_type(dataSet.attrs)
  208. for key in list(dataSet.data_vars):
  209. dataSet[key].attrs = self._convert_numpy_type(dataSet[key].attrs)
  210. stackedDataSet = dataSet.stack(_scanAxis=tuple(scanAxis))
  211. stackedDataSet = stackedDataSet.groupby('_scanAxis')
  212. filter = {
  213. 'year': self.year,
  214. 'month': self.month,
  215. 'day': self.day,
  216. 'shotNum': shotNum,
  217. }
  218. for i in stackedDataSet:
  219. stackedDataSet_single = i[1].drop('_scanAxis')
  220. global_parameters = {
  221. 'global_parameters.' + key: stackedDataSet_single[key].item()
  222. for key in scanAxis
  223. }
  224. filter.update(global_parameters)
  225. mongoID, _ = self.xdb.put(dataSet)
  226. data_label = {
  227. name:
  228. {
  229. 'name': name,
  230. 'mongoID': mongoID,
  231. 'engine': 'xarray',
  232. 'dtype': 'dataSet',
  233. }
  234. }
  235. self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
  236. def _add_data_additional(self, shotNum, runNum, data, name):
  237. """Write the data in an additional document
  238. :param shotNum: The shot number
  239. :type shotNum: str
  240. :param runNum: The run number
  241. :type runNum: int
  242. :param data: The data to be written
  243. :type data: normal python data type
  244. :param name: The name of the data
  245. :type name: str
  246. """
  247. if runNum is None:
  248. runNum = 0
  249. filter = {
  250. 'year': self.year,
  251. 'month': self.month,
  252. 'day': self.day,
  253. 'shotNum': shotNum,
  254. 'runNum': runNum,
  255. }
  256. mongoID = self.mongoDB.additional.insert_one(data).inserted_id
  257. data_label = {
  258. name:
  259. {
  260. 'name': name,
  261. 'mongoID': mongoID,
  262. 'engine': 'additional',
  263. 'dtype': 'dict',
  264. }
  265. }
  266. self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
  267. def add_data(self, shotNum, data, runNum=None, date=None, name=None, engine='normal'):
  268. """Write a new data to MongoDB
  269. :param shotNum: The shot number
  270. :type shotNum: str
  271. :param data: The data needs to be written
  272. :type data: standard python data type, numpy type, xarray DataArray or xarray DataSet
  273. :param runNum: The run number, defaults to None
  274. :type runNum: int, optional
  275. :param date: The date of the data ('YYYY/MM/DD'), defaults to None
  276. :type date: str, optional
  277. :param name: The name of the data, defaults to None
  278. :type name: str, optional
  279. :param engine: The engine for different types of the data, defaults to 'normal'
  280. :type engine: str, optional
  281. """
  282. if not date is None:
  283. self.set_date(date)
  284. if engine == 'normal':
  285. self._add_data_normal(shotNum=shotNum, runNum=runNum, data=data)
  286. elif engine == 'xarray':
  287. if isinstance(data, type(xr.Dataset())):
  288. self._add_data_xarray_dataSet(shotNum=shotNum, dataSet=data, name=name)
  289. else:
  290. self._add_data_xarray_dataArray(shotNum=shotNum, dataArray=data, name=name)
  291. elif engine == 'additional':
  292. self._add_data_additional(shotNum=shotNum, runNum=runNum, data=data, name=name)
  293. def read_global_single(self, shotNum, runNum, date=None):
  294. """Read the global document of specified shot and run from MongoDB.
  295. :param shotNum: The shot number
  296. :type shotNum: str
  297. :param runNum: The run number
  298. :type runNum: int
  299. :param date: The date of the data ('YYYY/MM/DD'), defaults to None
  300. :type date: str, optional
  301. :return: The global document
  302. :rtype: dict
  303. """
  304. if not date is None:
  305. self.set_date(date)
  306. filter = {
  307. 'year': self.year,
  308. 'month': self.month,
  309. 'day': self.day,
  310. 'shotNum': shotNum,
  311. 'runNum': runNum,
  312. }
  313. return self.mongoDB['global'].find_one(filter)
  314. def read_global_all(self, shotNum, date=None):
  315. """Read the global document of all runs in the specified shot from MongoDB, and extract the scan axes.
  316. :param shotNum: The shot number
  317. :type shotNum: str
  318. :param date: The date of the data ('YYYY/MM/DD'), defaults to None
  319. :type date: str, optional
  320. :return: The global document
  321. :rtype: dict
  322. """
  323. from xarray.core.utils import equivalent
  324. if not date is None:
  325. self.set_date(date)
  326. filter = {
  327. 'year': self.year,
  328. 'month': self.month,
  329. 'day': self.day,
  330. 'shotNum': shotNum,
  331. }
  332. result = {}
  333. dropped_attrs = OrderedDict()
  334. docs = self.mongoDB['global'].find(filter).sort('runNum')
  335. docs = [doc['global_parameters'] for doc in docs]
  336. for doc in docs:
  337. global_parameters = doc
  338. result.update(
  339. {
  340. key: value
  341. for key, value in global_parameters.items()
  342. if key not in result and key not in dropped_attrs.keys()
  343. }
  344. )
  345. result = {
  346. key: value
  347. for key, value in result.items()
  348. if key not in global_parameters or equivalent(global_parameters[key], value)
  349. }
  350. dropped_attrs.update(
  351. {
  352. key: []
  353. for key in global_parameters if key not in result
  354. }
  355. )
  356. for doc in docs:
  357. global_parameters = doc
  358. dropped_attrs.update(
  359. {
  360. key: np.append(dropped_attrs[key], global_parameters[key])
  361. for key in dropped_attrs.keys()
  362. }
  363. )
  364. scan_attrs = OrderedDict()
  365. scan_length = []
  366. for attrs_key in dropped_attrs.keys():
  367. flag = True
  368. for key in scan_attrs.keys():
  369. if equivalent(scan_attrs[key], dropped_attrs[attrs_key]):
  370. flag = False
  371. result.update({attrs_key: key})
  372. break
  373. if flag:
  374. scan_attrs.update({
  375. attrs_key: dropped_attrs[attrs_key]
  376. })
  377. scan_length = np.append(scan_length, len(dropped_attrs[attrs_key]))
  378. result.update(
  379. {
  380. key: value
  381. for key, value in scan_attrs.items()
  382. }
  383. )
  384. result.update(
  385. {
  386. "scanAxis": list(scan_attrs.keys()),
  387. "scanAxisLength": scan_length,
  388. }
  389. )
  390. return result
  391. def _load_data_single(self, mongoID, engine):
  392. """load the document according to given _ID
  393. :param mongoID: the ID of document in MongoDB
  394. :type mongoID: MongoDB ID object
  395. :param engine: _description_
  396. :type engine: _type_
  397. :return: The engine for different types of the data
  398. :rtype: str
  399. """
  400. if engine == 'xarray':
  401. return self.xdb.get(mongoID)
  402. if engine == 'additional':
  403. return self.mongoDB.additional.find_one({'_id': mongoID})
  404. def load_data_single(self, shotNum=None, runNum=None, globalDict=None, date=None, field=None):
  405. """go through the given global document and find all the MongoDB object, then replace them with the document they linked to.
  406. :param shotNum: The shot number, defaults to None
  407. :type shotNum: str, optional
  408. :param runNum: The run number, defaults to None
  409. :type runNum: int, optional
  410. :param globalDict: The given global document, defaults to None
  411. :type globalDict: dict, optional
  412. :param date: The date of the data ('YYYY/MM/DD'), defaults to None
  413. :type date: str, optional
  414. :param field: If not None, the function will only go through these keys, defaults to None
  415. :type field: array like, optional
  416. :return: The document with loaded data
  417. :rtype: dict
  418. """
  419. if not date is None:
  420. self.set_date(date)
  421. if globalDict is None:
  422. globalDict = self.read_global_single(shotNum=shotNum, runNum=runNum)
  423. if field is None:
  424. field = globalDict
  425. res = {}
  426. for key in field:
  427. if isinstance(globalDict[key], dict) and ('mongoID' in globalDict[key]):
  428. mongoID = globalDict[key]['mongoID']
  429. engine = globalDict[key]['engine']
  430. res.update(
  431. {
  432. key: self._load_data_single(mongoID=mongoID, engine=engine)
  433. }
  434. )
  435. return res
  436. def load_data(self, shotNum=None, globalDict=None, date=None, field=None):
  437. # load all the data of specified shot
  438. pass