You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

308 lines
9.5 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. import pymongo
  2. import xarray_mongodb
  3. import bson
  4. import builtins
  5. import xarray as xr
  6. from ToolFunction.ToolFunction import get_date
  7. npTypeDict = {v: getattr(builtins, k) for k, v in np.sctypeDict.items() if k in vars(builtins)}
  8. npArrayType = type(np.array([0]))
  9. class MongoDB:
  10. def __init__(self, mongoClient, mongoDB, date=None) -> None:
  11. self.mongoClient = mongoClient
  12. self.mongoDB = mongoDB
  13. self.xdb = xarray_mongodb.XarrayMongoDB(mongoDB)
  14. if date is None:
  15. date= get_date()
  16. self.set_date(date)
  17. def _convert_numpy_type(self, data):
  18. for key in data:
  19. typeKey = type(data[key])
  20. if typeKey in npTypeDict:
  21. data[key] = data[key].item()
  22. elif typeKey == npArrayType:
  23. data[key] = data[key].tolist()
  24. else:
  25. try:
  26. data[key] = data[key].item()
  27. except:
  28. pass
  29. return data
  30. def _break_dataSet(self, dataSet, scanAxis=None):
  31. if scanAxis is None:
  32. scanAxis = dataSet.attrs['scanAxis']
  33. dataArray = dataSet.shotNum
  34. stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis))
  35. return stackedDataArray
  36. def set_date(self, date):
  37. date = date.split("/")
  38. self.year = int(date[0])
  39. self.month = int(date[1])
  40. self.day = int(date[2])
  41. def create_global(self, shotNum, dataSet=None, date=None):
  42. if not date is None:
  43. self.set_date(date)
  44. data = {
  45. 'year': self.year,
  46. 'month': self.month,
  47. 'day': self.day,
  48. 'shotNum': shotNum,
  49. 'runNum': 0,
  50. 'global_parameters' : {},
  51. }
  52. global_parameters = self._convert_numpy_type(dataSet.attrs)
  53. if not dataSet is None:
  54. data['global_parameters'].update(global_parameters)
  55. data = self._convert_numpy_type(data)
  56. if 'scanAxis' in dataSet.attrs:
  57. del data['global_parameters']['scanAxis']
  58. del data['global_parameters']['scanAxisLength']
  59. scanAxis = dataSet.attrs['scanAxis']
  60. data['global_parameters'].update(
  61. {
  62. key:0
  63. for key in scanAxis
  64. }
  65. )
  66. stackedDataArray = self._break_dataSet(dataSet)
  67. stackedDataArray = stackedDataArray.groupby('_scanAxis')
  68. for i in stackedDataArray:
  69. stackedDataArray_single = i[1]
  70. data.update(
  71. {
  72. 'runNum': int(stackedDataArray_single.item())
  73. }
  74. )
  75. data['global_parameters'].update(
  76. {
  77. key: stackedDataArray_single[key].item()
  78. for key in scanAxis
  79. }
  80. )
  81. if '_id' in data:
  82. del data['_id']
  83. self.mongoDB['global'].insert_one(data)
  84. else:
  85. self.mongoDB['global'].insert_one(data)
  86. def _add_data_normal(self, shotNum, runNum, data):
  87. if runNum is None:
  88. runNum = 0
  89. filter = {
  90. 'year': self.year,
  91. 'month': self.month,
  92. 'day': self.day,
  93. 'shotNum': shotNum,
  94. 'runNum': runNum,
  95. }
  96. self.mongoDB['global'].update_one(filter, {"$set": data}, upsert=False)
  97. def _add_data_xarray_dataArray(self, shotNum, dataArray, scanAxis=None):
  98. if scanAxis is None:
  99. scanAxis = list(dataArray.coords)
  100. dataArray.attrs = self._convert_numpy_type(dataArray.attrs)
  101. stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis))
  102. stackedDataArray = stackedDataArray.groupby('_scanAxis')
  103. filter = {
  104. 'year': self.year,
  105. 'month': self.month,
  106. 'day': self.day,
  107. 'shotNum': shotNum,
  108. }
  109. for i in stackedDataArray:
  110. stackedDataArray_single = i[1].drop('_scanAxis')
  111. global_parameters = {
  112. 'global_parameters.' + key: stackedDataArray_single[key].item()
  113. for key in scanAxis
  114. }
  115. filter.update(global_parameters)
  116. mongoID, _ = self.xdb.put(stackedDataArray_single)
  117. data_label = {
  118. dataArray.name:
  119. {
  120. 'name': dataArray.name,
  121. 'mongoID': mongoID,
  122. 'engine': 'xarray',
  123. 'dtype': 'dataArray',
  124. }
  125. }
  126. self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
  127. def _add_data_xarray_dataSet(self, shotNum, dataSet, name, scanAxis=None):
  128. if scanAxis is None:
  129. scanAxis = list(dataSet.coords)
  130. dataSet.attrs = self._convert_numpy_type(dataSet.attrs)
  131. for key in list(dataSet.data_vars):
  132. dataSet[key].attrs = self._convert_numpy_type(dataSet[key].attrs)
  133. stackedDataSet = dataSet.stack(_scanAxis=tuple(scanAxis))
  134. stackedDataSet = stackedDataSet.groupby('_scanAxis')
  135. filter = {
  136. 'year': self.year,
  137. 'month': self.month,
  138. 'day': self.day,
  139. 'shotNum': shotNum,
  140. }
  141. for i in stackedDataSet:
  142. stackedDataSet_single = i[1].drop('_scanAxis')
  143. global_parameters = {
  144. 'global_parameters.' + key: stackedDataSet_single[key].item()
  145. for key in scanAxis
  146. }
  147. filter.update(global_parameters)
  148. mongoID, _ = self.xdb.put(dataSet)
  149. data_label = {
  150. name:
  151. {
  152. 'name': name,
  153. 'mongoID': mongoID,
  154. 'engine': 'xarray',
  155. 'dtype': 'dataSet',
  156. }
  157. }
  158. self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
  159. def _add_data_additional(self, shotNum, runNum, data, name):
  160. if runNum is None:
  161. runNum = 0
  162. filter = {
  163. 'year': self.year,
  164. 'month': self.month,
  165. 'day': self.day,
  166. 'shotNum': shotNum,
  167. 'runNum': runNum,
  168. }
  169. mongoID = self.mongoDB.additional.insert_one(data).inserted_id
  170. data_label = {
  171. name:
  172. {
  173. 'name': name,
  174. 'mongoID': mongoID,
  175. 'engine': 'additional',
  176. 'dtype': 'dict',
  177. }
  178. }
  179. self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
  180. def add_data(self, shotNum, data, runNum=None, date=None, name=None, engine='normal'):
  181. if not date is None:
  182. self.set_date(date)
  183. if engine == 'normal':
  184. self._add_data_normal(shotNum=shotNum, runNum=runNum, data=data)
  185. elif engine == 'xarray':
  186. if isinstance(data, type(xr.Dataset())):
  187. self._add_data_xarray_dataSet(shotNum=shotNum, data=data, name=name)
  188. else:
  189. self._add_data_xarray_dataArray(shotNum=shotNum, data=data, name=name)
  190. elif engine == 'additional':
  191. self._add_data_additional(shotNum=shotNum, runNum=runNum, data=data, name=name)
  192. def read_global_single(self, shotNum, runNum, date=None):
  193. if not date is None:
  194. self.set_date(date)
  195. filter = {
  196. 'year': self.year,
  197. 'month': self.month,
  198. 'day': self.day,
  199. 'shotNum': shotNum,
  200. 'runNum': runNum,
  201. }
  202. return self.mongoDB['global'].find_one(filter)
  203. def _load_data_single(self, mongoID, engine):
  204. if engine == 'xarray':
  205. return self.xdb.get(mongoID)
  206. if engine == 'additional':
  207. return self.mongoDB.additional.find_one({'_id': mongoID})
  208. def load_data_single(self, shotNum=None, runNum=None, globalDict=None, date=None, field=None):
  209. if not date is None:
  210. self.set_date(date)
  211. if globalDict is None:
  212. globalDict = self.read_global_single(shotNum=shotNum, runNum=runNum)
  213. if field is None:
  214. field = globalDict
  215. res = field
  216. for key in field:
  217. if isinstance(globalDict[key], dict) and ('mongoID' in globalDict[key]):
  218. mongoID = globalDict[key]['mongoID']
  219. engine = globalDict[key]['engine']
  220. res.update(
  221. {
  222. key: self._load_data_single(mongoID=mongoID, engine=engine)
  223. }
  224. )
  225. return res
  226. def load_data(self, shotNum=None, globalDict=None, date=None, field=None):
  227. pass