analyseScript/DataContainer/MongoDB.py

400 lines
12 KiB
Python
Raw Normal View History

2023-06-14 09:57:06 +02:00
from collections import OrderedDict
import numpy as np
2023-06-09 18:59:56 +02:00
import pymongo
import xarray_mongodb
import bson
import builtins
2023-06-12 12:52:07 +02:00
import xarray as xr
2023-06-09 18:59:56 +02:00
from ToolFunction.ToolFunction import get_date
npTypeDict = {v: getattr(builtins, k) for k, v in np.sctypeDict.items() if k in vars(builtins)}
npArrayType = type(np.array([0]))
class MongoDB:
def __init__(self, mongoClient, mongoDB, date=None) -> None:
self.mongoClient = mongoClient
self.mongoDB = mongoDB
self.xdb = xarray_mongodb.XarrayMongoDB(mongoDB)
if date is None:
date= get_date()
self.set_date(date)
def _convert_numpy_type(self, data):
for key in data:
typeKey = type(data[key])
if typeKey in npTypeDict:
data[key] = data[key].item()
elif typeKey == npArrayType:
data[key] = data[key].tolist()
else:
try:
data[key] = data[key].item()
except:
pass
return data
2023-06-13 18:16:24 +02:00
def _break_dataSet(self, dataSet, scanAxis=None):
if scanAxis is None:
scanAxis = dataSet.attrs['scanAxis']
dataArray = dataSet.shotNum
stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis))
return stackedDataArray
2023-06-09 18:59:56 +02:00
def set_date(self, date):
date = date.split("/")
self.year = int(date[0])
self.month = int(date[1])
self.day = int(date[2])
def create_global(self, shotNum, dataSet=None, date=None):
if not date is None:
self.set_date(date)
data = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
2023-06-13 18:16:24 +02:00
'runNum': 0,
2023-06-09 18:59:56 +02:00
'global_parameters' : {},
}
global_parameters = self._convert_numpy_type(dataSet.attrs)
if not dataSet is None:
data['global_parameters'].update(global_parameters)
data = self._convert_numpy_type(data)
2023-06-13 18:16:24 +02:00
if 'scanAxis' in dataSet.attrs:
del data['global_parameters']['scanAxis']
del data['global_parameters']['scanAxisLength']
scanAxis = dataSet.attrs['scanAxis']
data['global_parameters'].update(
{
key:0
for key in scanAxis
}
)
stackedDataArray = self._break_dataSet(dataSet)
stackedDataArray = stackedDataArray.groupby('_scanAxis')
for i in stackedDataArray:
stackedDataArray_single = i[1]
data.update(
{
'runNum': int(stackedDataArray_single.item())
}
)
data['global_parameters'].update(
{
key: stackedDataArray_single[key].item()
for key in scanAxis
}
)
if '_id' in data:
del data['_id']
self.mongoDB['global'].insert_one(data)
else:
self.mongoDB['global'].insert_one(data)
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
def _add_data_normal(self, shotNum, runNum, data):
if runNum is None:
runNum = 0
2023-06-09 18:59:56 +02:00
filter = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
2023-06-13 18:16:24 +02:00
'runNum': runNum,
2023-06-09 18:59:56 +02:00
}
self.mongoDB['global'].update_one(filter, {"$set": data}, upsert=False)
2023-06-13 18:16:24 +02:00
def _add_data_xarray_dataArray(self, shotNum, dataArray, scanAxis=None):
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
if scanAxis is None:
scanAxis = list(dataArray.coords)
2023-06-09 18:59:56 +02:00
dataArray.attrs = self._convert_numpy_type(dataArray.attrs)
2023-06-13 18:16:24 +02:00
stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis))
stackedDataArray = stackedDataArray.groupby('_scanAxis')
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
filter = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
}
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
for i in stackedDataArray:
stackedDataArray_single = i[1].drop('_scanAxis')
global_parameters = {
'global_parameters.' + key: stackedDataArray_single[key].item()
for key in scanAxis
}
filter.update(global_parameters)
mongoID, _ = self.xdb.put(stackedDataArray_single)
data_label = {
dataArray.name:
{
'name': dataArray.name,
'mongoID': mongoID,
'engine': 'xarray',
'dtype': 'dataArray',
}
}
self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
def _add_data_xarray_dataSet(self, shotNum, dataSet, name, scanAxis=None):
if scanAxis is None:
scanAxis = list(dataSet.coords)
dataSet.attrs = self._convert_numpy_type(dataSet.attrs)
for key in list(dataSet.data_vars):
dataSet[key].attrs = self._convert_numpy_type(dataSet[key].attrs)
stackedDataSet = dataSet.stack(_scanAxis=tuple(scanAxis))
stackedDataSet = stackedDataSet.groupby('_scanAxis')
2023-06-09 18:59:56 +02:00
filter = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
}
2023-06-13 18:16:24 +02:00
for i in stackedDataSet:
stackedDataSet_single = i[1].drop('_scanAxis')
global_parameters = {
'global_parameters.' + key: stackedDataSet_single[key].item()
for key in scanAxis
}
filter.update(global_parameters)
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
mongoID, _ = self.xdb.put(dataSet)
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
data_label = {
name:
{
'name': name,
'mongoID': mongoID,
'engine': 'xarray',
'dtype': 'dataSet',
}
}
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
2023-06-12 12:52:07 +02:00
2023-06-13 18:16:24 +02:00
def _add_data_additional(self, shotNum, runNum, data, name):
if runNum is None:
runNum = 0
2023-06-12 12:52:07 +02:00
filter = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
2023-06-13 18:16:24 +02:00
'runNum': runNum,
2023-06-12 12:52:07 +02:00
}
mongoID = self.mongoDB.additional.insert_one(data).inserted_id
data_label = {
name:
{
'name': name,
'mongoID': mongoID,
'engine': 'additional',
'dtype': 'dict',
}
}
self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
def add_data(self, shotNum, data, runNum=None, date=None, name=None, engine='normal'):
2023-06-09 19:34:37 +02:00
if not date is None:
self.set_date(date)
if engine == 'normal':
2023-06-13 18:16:24 +02:00
self._add_data_normal(shotNum=shotNum, runNum=runNum, data=data)
2023-06-09 19:34:37 +02:00
elif engine == 'xarray':
2023-06-12 12:52:07 +02:00
if isinstance(data, type(xr.Dataset())):
2023-06-09 19:34:37 +02:00
self._add_data_xarray_dataSet(shotNum=shotNum, data=data, name=name)
else:
self._add_data_xarray_dataArray(shotNum=shotNum, data=data, name=name)
2023-06-12 12:52:07 +02:00
elif engine == 'additional':
2023-06-13 18:16:24 +02:00
self._add_data_additional(shotNum=shotNum, runNum=runNum, data=data, name=name)
2023-06-12 12:52:07 +02:00
2023-06-13 18:16:24 +02:00
def read_global_single(self, shotNum, runNum, date=None):
2023-06-12 12:52:07 +02:00
if not date is None:
self.set_date(date)
filter = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
2023-06-13 18:16:24 +02:00
'runNum': runNum,
2023-06-12 12:52:07 +02:00
}
return self.mongoDB['global'].find_one(filter)
2023-06-14 09:57:06 +02:00
def read_global_all(self, shotNum, date=None):
from xarray.core.utils import equivalent
if not date is None:
self.set_date(date)
filter = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
}
result = {}
dropped_attrs = OrderedDict()
docs = self.mongoDB['global'].find(filter)
for doc in docs:
global_parameters = doc['global_parameters']
result.update(
{
key: value
for key, value in global_parameters.items()
if key not in result and key not in dropped_attrs.keys()
}
)
result = {
key: value
for key, value in result.items()
if key not in global_parameters or equivalent(global_parameters[key], value)
}
dropped_attrs.update(
{
key: []
for key in global_parameters if key not in result
}
)
for doc in docs:
global_parameters = doc['global_parameters']
dropped_attrs.update(
{
key: np.append(dropped_attrs[key], global_parameters[key])
for key in dropped_attrs.keys()
}
)
scan_attrs = OrderedDict()
scan_length = []
for attrs_key in dropped_attrs.keys():
flag = True
for key in scan_attrs.keys():
if equivalent(scan_attrs[key], dropped_attrs[attrs_key]):
flag = False
result.update({attrs_key: key})
break
if flag:
scan_attrs.update({
attrs_key: dropped_attrs[attrs_key]
})
scan_length = np.append(scan_length, len(dropped_attrs[attrs_key]))
result.update(
{
key: value
for key, value in scan_attrs.items()
}
)
result.update(
{
"scanAxis": list(scan_attrs.keys()),
"scanAxisLength": scan_length,
}
)
return result
2023-06-12 12:52:07 +02:00
def _load_data_single(self, mongoID, engine):
if engine == 'xarray':
return self.xdb.get(mongoID)
if engine == 'additional':
return self.mongoDB.additional.find_one({'_id': mongoID})
2023-06-09 18:59:56 +02:00
2023-06-13 18:16:24 +02:00
def load_data_single(self, shotNum=None, runNum=None, globalDict=None, date=None, field=None):
2023-06-12 12:52:07 +02:00
if not date is None:
self.set_date(date)
if globalDict is None:
2023-06-13 18:16:24 +02:00
globalDict = self.read_global_single(shotNum=shotNum, runNum=runNum)
2023-06-12 12:52:07 +02:00
if field is None:
field = globalDict
2023-06-13 18:16:24 +02:00
res = field
2023-06-12 12:52:07 +02:00
for key in field:
if isinstance(globalDict[key], dict) and ('mongoID' in globalDict[key]):
mongoID = globalDict[key]['mongoID']
engine = globalDict[key]['engine']
res.update(
{
key: self._load_data_single(mongoID=mongoID, engine=engine)
}
)
return res
def load_data(self, shotNum=None, globalDict=None, date=None, field=None):
2023-06-09 18:59:56 +02:00
pass