implenment loading data from MongoDB.

This commit is contained in:
Jianshun Gao 2023-10-30 17:53:03 +01:00
parent fbe70b4fca
commit f1e547b1be
2 changed files with 2457 additions and 110 deletions

View File

@ -2,12 +2,16 @@ from collections import OrderedDict
import numpy as np import numpy as np
import pymongo import pymongo
from pymongo import MongoClient
import xarray_mongodb import xarray_mongodb
import bson import bson
import builtins import builtins
import xarray as xr import xarray as xr
# import sys
# #sys.path.insert(0, 'C:/Users/Fabrizio Klassen/PycharmProjects/DyLabDataViewer/src/bin/Analyser/AnalyserScript')
# import sys
# sys.path.append('../')
from ToolFunction.ToolFunction import get_date from ToolFunction.ToolFunction import get_date
@ -18,7 +22,7 @@ npArrayType = type(np.array([0]))
class MongoDB: class MongoDB:
"""A class for communicate with our MongoDB. """A class for communicate with our MongoDB.
""" """
def __init__(self, mongoClient, mongoDB, date=None) -> None: def __init__(self, mongoClient, mongoDB, date=None) -> None:
"""Initialize the class with given handle to our MongoDB client and database. """Initialize the class with given handle to our MongoDB client and database.
@ -32,11 +36,11 @@ class MongoDB:
self.mongoClient = mongoClient self.mongoClient = mongoClient
self.mongoDB = mongoDB self.mongoDB = mongoDB
self.xdb = xarray_mongodb.XarrayMongoDB(mongoDB) self.xdb = xarray_mongodb.XarrayMongoDB(mongoDB)
if date is None: if date is None:
date= get_date() date= get_date()
self.set_date(date) self.set_date(date)
def _convert_numpy_type(self, data): def _convert_numpy_type(self, data):
"""Convert from numpy type to normal python type. """Convert from numpy type to normal python type.
@ -57,7 +61,7 @@ class MongoDB:
except: except:
pass pass
return data return data
def _break_dataSet(self, dataSet, scanAxis=None): def _break_dataSet(self, dataSet, scanAxis=None):
"""Stack the scan axes of data """Stack the scan axes of data
@ -68,16 +72,16 @@ class MongoDB:
:return: The stacked xarray DataSet or DataArray stored the data :return: The stacked xarray DataSet or DataArray stored the data
:rtype: xarray DataSet or DataArray :rtype: xarray DataSet or DataArray
""" """
if scanAxis is None: if scanAxis is None:
scanAxis = dataSet.attrs['scanAxis'] scanAxis = dataSet.attrs['scanAxis']
dataArray = dataSet.shotNum dataArray = dataSet.shotNum
stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis)) stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis))
return stackedDataArray return stackedDataArray
def set_date(self, date): def set_date(self, date):
"""Set the date of data """Set the date of data
@ -88,7 +92,7 @@ class MongoDB:
self.year = int(date[0]) self.year = int(date[0])
self.month = int(date[1]) self.month = int(date[1])
self.day = int(date[2]) self.day = int(date[2])
def create_global(self, shotNum, dataSet=None, date=None): def create_global(self, shotNum, dataSet=None, date=None):
"""Creat a the global document in MongoDB """Creat a the global document in MongoDB
@ -101,7 +105,7 @@ class MongoDB:
""" """
if not date is None: if not date is None:
self.set_date(date) self.set_date(date)
data = { data = {
'year': self.year, 'year': self.year,
'month': self.month, 'month': self.month,
@ -110,7 +114,7 @@ class MongoDB:
} }
self.mongoDB['global'].delete_many(data) self.mongoDB['global'].delete_many(data)
data = { data = {
'year': self.year, 'year': self.year,
'month': self.month, 'month': self.month,
@ -119,19 +123,19 @@ class MongoDB:
'runNum': 0, 'runNum': 0,
'global_parameters' : {}, 'global_parameters' : {},
} }
global_parameters = self._convert_numpy_type(dataSet.attrs) global_parameters = self._convert_numpy_type(dataSet.attrs)
if not dataSet is None: if not dataSet is None:
data['global_parameters'].update(global_parameters) data['global_parameters'].update(global_parameters)
data = self._convert_numpy_type(data) data = self._convert_numpy_type(data)
if 'scanAxis' in dataSet.attrs: if 'scanAxis' in dataSet.attrs:
del data['global_parameters']['scanAxis'] del data['global_parameters']['scanAxis']
del data['global_parameters']['scanAxisLength'] del data['global_parameters']['scanAxisLength']
scanAxis = dataSet.attrs['scanAxis'] scanAxis = dataSet.attrs['scanAxis']
data['global_parameters'].update( data['global_parameters'].update(
{ {
@ -139,40 +143,40 @@ class MongoDB:
for key in scanAxis for key in scanAxis
} }
) )
stackedDataArray = self._break_dataSet(dataSet) stackedDataArray = self._break_dataSet(dataSet)
try: try:
stackedDataArray.load() stackedDataArray.load()
except: except:
pass pass
stackedDataArray = stackedDataArray.groupby('_scanAxis') stackedDataArray = stackedDataArray.groupby('_scanAxis')
for i in stackedDataArray: for i in stackedDataArray:
stackedDataArray_single = i[1] stackedDataArray_single = i[1]
data.update( data.update(
{ {
'runNum': int(stackedDataArray_single.item()) 'runNum': int(stackedDataArray_single.item())
} }
) )
data['global_parameters'].update( data['global_parameters'].update(
{ {
key: stackedDataArray_single[key].item() key: stackedDataArray_single[key].item()
for key in scanAxis for key in scanAxis
} }
) )
if '_id' in data: if '_id' in data:
del data['_id'] del data['_id']
self.mongoDB['global'].insert_one(data) self.mongoDB['global'].insert_one(data)
else: else:
self.mongoDB['global'].insert_one(data) self.mongoDB['global'].insert_one(data)
def _add_data_normal(self, shotNum, runNum, data): def _add_data_normal(self, shotNum, runNum, data):
"""Write the data directly to the global document """Write the data directly to the global document
@ -183,20 +187,20 @@ class MongoDB:
:param data: The data to be written :param data: The data to be written
:type data: normal python data type :type data: normal python data type
""" """
if runNum is None: if runNum is None:
runNum = 0 runNum = 0
filter = { filter = {
'year': self.year, 'year': self.year,
'month': self.month, 'month': self.month,
'day': self.day, 'day': self.day,
'shotNum': shotNum, 'shotNum': shotNum,
'runNum': runNum, 'runNum': runNum,
} }
self.mongoDB['global'].update_one(filter, {"$set": data}, upsert=False) self.mongoDB['global'].update_one(filter, {"$set": data}, upsert=False)
def _add_data_xarray_dataArray(self, shotNum, dataArray, name=None, scanAxis=None): def _add_data_xarray_dataArray(self, shotNum, dataArray, name=None, scanAxis=None):
"""Write the data in a type of xarray DataArray to the MongoDb. """Write the data in a type of xarray DataArray to the MongoDb.
@ -209,37 +213,37 @@ class MongoDB:
:param scanAxis: The scan axes of the data, defaults to None :param scanAxis: The scan axes of the data, defaults to None
:type scanAxis: array like, optional :type scanAxis: array like, optional
""" """
if scanAxis is None: if scanAxis is None:
scanAxis = list(dataArray.coords) scanAxis = list(dataArray.coords)
dataArray.attrs = self._convert_numpy_type(dataArray.attrs) dataArray.attrs = self._convert_numpy_type(dataArray.attrs)
stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis)) stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis))
stackedDataArray = stackedDataArray.groupby('_scanAxis') stackedDataArray = stackedDataArray.groupby('_scanAxis')
filter = { filter = {
'year': self.year, 'year': self.year,
'month': self.month, 'month': self.month,
'day': self.day, 'day': self.day,
'shotNum': shotNum, 'shotNum': shotNum,
} }
for i in stackedDataArray: for i in stackedDataArray:
stackedDataArray_single = i[1].drop('_scanAxis') stackedDataArray_single = i[1].drop('_scanAxis')
global_parameters = { global_parameters = {
'global_parameters.' + key: stackedDataArray_single[key].item() 'global_parameters.' + key: stackedDataArray_single[key].item()
for key in scanAxis for key in scanAxis
} }
filter.update(global_parameters) filter.update(global_parameters)
mongoID, _ = self.xdb.put(stackedDataArray_single) mongoID, _ = self.xdb.put(stackedDataArray_single)
data_label = { data_label = {
dataArray.name: dataArray.name:
{ {
'name': dataArray.name, 'name': dataArray.name,
'mongoID': mongoID, 'mongoID': mongoID,
@ -247,9 +251,9 @@ class MongoDB:
'dtype': 'dataArray', 'dtype': 'dataArray',
} }
} }
self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False) self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
def _add_data_xarray_dataSet(self, shotNum, dataSet, name, scanAxis=None): def _add_data_xarray_dataSet(self, shotNum, dataSet, name, scanAxis=None):
"""Write the data in a type of xarray DataSet to the MongoDb. """Write the data in a type of xarray DataSet to the MongoDb.
@ -262,40 +266,40 @@ class MongoDB:
:param scanAxis: The scan axes of the data, defaults to None :param scanAxis: The scan axes of the data, defaults to None
:type scanAxis: array like, optional :type scanAxis: array like, optional
""" """
if scanAxis is None: if scanAxis is None:
scanAxis = list(dataSet.coords) scanAxis = list(dataSet.coords)
dataSet.attrs = self._convert_numpy_type(dataSet.attrs) dataSet.attrs = self._convert_numpy_type(dataSet.attrs)
for key in list(dataSet.data_vars): for key in list(dataSet.data_vars):
dataSet[key].attrs = self._convert_numpy_type(dataSet[key].attrs) dataSet[key].attrs = self._convert_numpy_type(dataSet[key].attrs)
stackedDataSet = dataSet.stack(_scanAxis=tuple(scanAxis)) stackedDataSet = dataSet.stack(_scanAxis=tuple(scanAxis))
stackedDataSet = stackedDataSet.groupby('_scanAxis') stackedDataSet = stackedDataSet.groupby('_scanAxis')
filter = { filter = {
'year': self.year, 'year': self.year,
'month': self.month, 'month': self.month,
'day': self.day, 'day': self.day,
'shotNum': shotNum, 'shotNum': shotNum,
} }
for i in stackedDataSet: for i in stackedDataSet:
stackedDataSet_single = i[1].drop('_scanAxis') stackedDataSet_single = i[1].drop('_scanAxis')
global_parameters = { global_parameters = {
'global_parameters.' + key: stackedDataSet_single[key].item() 'global_parameters.' + key: stackedDataSet_single[key].item()
for key in scanAxis for key in scanAxis
} }
filter.update(global_parameters) filter.update(global_parameters)
mongoID, _ = self.xdb.put(dataSet) mongoID, _ = self.xdb.put(stackedDataSet_single)
data_label = { data_label = {
name: name:
{ {
'name': name, 'name': name,
'mongoID': mongoID, 'mongoID': mongoID,
@ -303,9 +307,9 @@ class MongoDB:
'dtype': 'dataSet', 'dtype': 'dataSet',
} }
} }
self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False) self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
def _add_data_additional(self, shotNum, runNum, data, name): def _add_data_additional(self, shotNum, runNum, data, name):
"""Write the data in an additional document """Write the data in an additional document
@ -318,22 +322,22 @@ class MongoDB:
:param name: The name of the data :param name: The name of the data
:type name: str :type name: str
""" """
if runNum is None: if runNum is None:
runNum = 0 runNum = 0
filter = { filter = {
'year': self.year, 'year': self.year,
'month': self.month, 'month': self.month,
'day': self.day, 'day': self.day,
'shotNum': shotNum, 'shotNum': shotNum,
'runNum': runNum, 'runNum': runNum,
} }
mongoID = self.mongoDB.additional.insert_one(data).inserted_id mongoID = self.mongoDB.additional.insert_one(data).inserted_id
data_label = { data_label = {
name: name:
{ {
'name': name, 'name': name,
'mongoID': mongoID, 'mongoID': mongoID,
@ -341,9 +345,9 @@ class MongoDB:
'dtype': 'dict', 'dtype': 'dict',
} }
} }
self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False) self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False)
def add_data(self, shotNum, data, runNum=None, date=None, name=None, engine='normal'): def add_data(self, shotNum, data, runNum=None, date=None, name=None, engine='normal'):
"""Write a new data to MongoDB """Write a new data to MongoDB
@ -362,7 +366,7 @@ class MongoDB:
""" """
if not date is None: if not date is None:
self.set_date(date) self.set_date(date)
if engine == 'normal': if engine == 'normal':
self._add_data_normal(shotNum=shotNum, runNum=runNum, data=data) self._add_data_normal(shotNum=shotNum, runNum=runNum, data=data)
elif engine == 'xarray': elif engine == 'xarray':
@ -372,7 +376,7 @@ class MongoDB:
self._add_data_xarray_dataArray(shotNum=shotNum, dataArray=data, name=name) self._add_data_xarray_dataArray(shotNum=shotNum, dataArray=data, name=name)
elif engine == 'additional': elif engine == 'additional':
self._add_data_additional(shotNum=shotNum, runNum=runNum, data=data, name=name) self._add_data_additional(shotNum=shotNum, runNum=runNum, data=data, name=name)
def read_global_single(self, shotNum, runNum, date=None): def read_global_single(self, shotNum, runNum, date=None):
"""Read the global document of specified shot and run from MongoDB. """Read the global document of specified shot and run from MongoDB.
@ -385,20 +389,20 @@ class MongoDB:
:return: The global document :return: The global document
:rtype: dict :rtype: dict
""" """
if not date is None: if not date is None:
self.set_date(date) self.set_date(date)
filter = { filter = {
'year': self.year, 'year': self.year,
'month': self.month, 'month': self.month,
'day': self.day, 'day': self.day,
'shotNum': shotNum, 'shotNum': shotNum,
'runNum': runNum, 'runNum': runNum,
} }
return self.mongoDB['global'].find_one(filter) return self.mongoDB['global'].find_one(filter)
def read_global_all(self, shotNum, date=None): def read_global_all(self, shotNum, date=None):
"""Read the global document of all runs in the specified shot from MongoDB, and extract the scan axes. """Read the global document of all runs in the specified shot from MongoDB, and extract the scan axes.
@ -409,29 +413,28 @@ class MongoDB:
:return: The global document :return: The global document
:rtype: dict :rtype: dict
""" """
from xarray.core.utils import equivalent from xarray.core.utils import equivalent
if not date is None: if not date is None:
self.set_date(date) self.set_date(date)
filter = { filter = {
'year': self.year, 'year': self.year,
'month': self.month, 'month': self.month,
'day': self.day, 'day': self.day,
'shotNum': shotNum, 'shotNum': shotNum,
} }
result = {} result = {}
dropped_attrs = OrderedDict() dropped_attrs = OrderedDict()
docs = self.mongoDB['global'].find(filter).sort('runNum') docs = self.mongoDB['global'].find(filter).sort('runNum')
docs = [doc['global_parameters'] for doc in docs] docs = [doc['global_parameters'] for doc in docs]
for doc in docs: for doc in docs:
global_parameters = doc global_parameters = doc
result.update( result.update(
{ {
key: value key: value
@ -439,34 +442,34 @@ class MongoDB:
if key not in result and key not in dropped_attrs.keys() if key not in result and key not in dropped_attrs.keys()
} }
) )
result = { result = {
key: value key: value
for key, value in result.items() for key, value in result.items()
if key not in global_parameters or equivalent(global_parameters[key], value) if key not in global_parameters or equivalent(global_parameters[key], value)
} }
dropped_attrs.update( dropped_attrs.update(
{ {
key: [] key: []
for key in global_parameters if key not in result for key in global_parameters if key not in result
} }
) )
for doc in docs: for doc in docs:
global_parameters = doc global_parameters = doc
dropped_attrs.update( dropped_attrs.update(
{ {
key: np.append(dropped_attrs[key], global_parameters[key]) key: np.append(dropped_attrs[key], global_parameters[key])
for key in dropped_attrs.keys() for key in dropped_attrs.keys()
} }
) )
scan_attrs = OrderedDict() scan_attrs = OrderedDict()
scan_length = [] scan_length = []
for attrs_key in dropped_attrs.keys(): for attrs_key in dropped_attrs.keys():
flag = True flag = True
for key in scan_attrs.keys(): for key in scan_attrs.keys():
@ -495,7 +498,7 @@ class MongoDB:
) )
return result return result
def _load_data_single(self, mongoID, engine): def _load_data_single(self, mongoID, engine):
"""load the document according to given _ID """load the document according to given _ID
@ -506,12 +509,12 @@ class MongoDB:
:return: The engine for different types of the data :return: The engine for different types of the data
:rtype: str :rtype: str
""" """
if engine == 'xarray': if engine == 'xarray':
return self.xdb.get(mongoID) return self.xdb.get(mongoID)
if engine == 'additional': if engine == 'additional':
return self.mongoDB.additional.find_one({'_id': mongoID}) return self.mongoDB.additional.find_one({'_id': mongoID})
def load_data_single(self, shotNum=None, runNum=None, globalDict=None, date=None, field=None): def load_data_single(self, shotNum=None, runNum=None, globalDict=None, date=None, field=None):
"""go through the given global document and find all the MongoDB object, then replace them with the document they linked to. """go through the given global document and find all the MongoDB object, then replace them with the document they linked to.
@ -528,31 +531,137 @@ class MongoDB:
:return: The document with loaded data :return: The document with loaded data
:rtype: dict :rtype: dict
""" """
if not date is None: if not date is None:
self.set_date(date) self.set_date(date)
if globalDict is None: if globalDict is None:
globalDict = self.read_global_single(shotNum=shotNum, runNum=runNum) globalDict = self.read_global_single(shotNum=shotNum, runNum=runNum)
if field is None: if field is None:
field = globalDict field = globalDict
res = {} res = {}
for key in field: for key in field:
if isinstance(globalDict[key], dict) and ('mongoID' in globalDict[key]): if isinstance(globalDict[key], dict) and ('mongoID' in globalDict[key]):
mongoID = globalDict[key]['mongoID'] mongoID = globalDict[key]['mongoID']
engine = globalDict[key]['engine'] engine = globalDict[key]['engine']
res.update( res.update(
{ {
key: self._load_data_single(mongoID=mongoID, engine=engine) key: self._load_data_single(mongoID=mongoID, engine=engine)
} }
) )
return res return res
def load_data(self, shotNum=None, globalDict=None, date=None, field=None): def load_data(self, shotNum, data_key=None, globalDict=None, date=None):
# load all the data of specified shot """load observables of given shot"""
pass """the documents of a given shot can carry a variety of data types, i.e. optical density,
N_count, centerx etc. In order to not load all the data and take too much RAM, the user
is presented with a drop down featuring all possible observables. Only after selection the
actual data is being loaded
:param shotNum: The shot number, defaults to None
:type shotNum: str, optional
:param date: The date of the data ('YYYY/MM/DD'), defaults to None
:type date: str, optional
:return: All data types in the given shot
:rtype: list
"""
# set date
if not date is None:
self.set_date(date)
# collect global parameters and scan axes
if globalDict is None:
globalDict = self.read_global_all(shotNum=shotNum, date=date)
# initialize output dictionary
res = {'year': self.year, 'month': self.month, 'day': self.day, 'global_parameters': {}}
# add all global parameters except scan axes
res['global_parameters'].update(
{
key: value
for key, value in globalDict.items()
if key not in ['scanAxis', 'scanAxisLength']
}
)
# find data
filter = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
}
docs = self.mongoDB['global'].find(filter).sort('runNum')
if data_key is None:
data_key = [key for key in docs[0] if not key in ['year', 'month', 'day', 'shotNum', 'runNum', 'global_parameters', '_id']]
for key in data_key:
res[key] = self._load_data(shotNum=shotNum, data_key=key, globalDict=globalDict)
res['global_parameters'].update(
{
'scanAxis': globalDict['scanAxis'],
'scanAxisLength': globalDict['scanAxisLength'],
}
)
return res
def _load_data(self, shotNum, data_key, globalDict):
"""load all the data of specified shot"""
"""go through the given global document and find all the MongoDB object, then replace them with the document they linked to.
:param shotNum: The shot number, defaults to None
:type shotNum: str, optional
:param globalDict: All global parameters plus scan axes and scan axes length, defaults to None
:type globalDict: dict, optional
:param date: The date of the data ('YYYY/MM/DD'), defaults to None
:type date: str, optional
:return: Data from all runs of given shot including global parameters and date
:rtype: dict
"""
# collect data from all docs of given shot
filter = {
'year': self.year,
'month': self.month,
'day': self.day,
'shotNum': shotNum,
}
# find matching docs
docs = self.mongoDB['global'].find(filter).sort('runNum')
data = []
for doc in docs:
key=data_key
if isinstance(doc[key], dict) and ('mongoID' in doc[key]):
mongoID = doc[key]['mongoID']
engine = doc[key]['engine']
single_data = self._load_data_single(mongoID=mongoID, engine=engine)
print(single_data)
for axis in globalDict['scanAxis']:
if not axis in single_data.dims:
single_data = single_data.expand_dims(axis)
else:
engine = None
single_data = doc[key]
data.append(single_data)
# combine data along coordinate axes
try:
if engine =='xarray':
data = xr.combine_by_coords(data)
except:
pass
return data

2238
testLoadMongoDB.ipynb Normal file

File diff suppressed because one or more lines are too long