from collections import OrderedDict import numpy as np import pymongo import xarray_mongodb import bson import builtins import xarray as xr from ToolFunction.ToolFunction import get_date npTypeDict = {v: getattr(builtins, k) for k, v in np.sctypeDict.items() if k in vars(builtins)} npArrayType = type(np.array([0])) class MongoDB: """A class for communicate with our MongoDB. """ def __init__(self, mongoClient, mongoDB, date=None) -> None: """Initialize the class with given handle to our MongoDB client and database. :param mongoClient: The handle to MongoDB client :type mongoClient: pymongo MongoDB client or other MongoDB client :param mongoDB: The handle to MongoDB database :type mongoDB: pymongo MongoDB database or other MongoDB database :param date: The date of data ('YYYY/MM/DD'), can be setted later, defaults to None :type date: str, optional """ self.mongoClient = mongoClient self.mongoDB = mongoDB self.xdb = xarray_mongodb.XarrayMongoDB(mongoDB) if date is None: date= get_date() self.set_date(date) def _convert_numpy_type(self, data): """Convert from numpy type to normal python type. :param data: The data need to be converted :type data: numpy data type :return: The converted data :rtype: normal python data type """ for key in data: typeKey = type(data[key]) if typeKey in npTypeDict: data[key] = data[key].item() elif typeKey == npArrayType: data[key] = data[key].tolist() else: try: data[key] = data[key].item() except: pass return data def _break_dataSet(self, dataSet, scanAxis=None): """Stack the scan axes of data :param dataSet: The xarray DataSet or DataArray stored the data :type dataSet: xarray DataSet or DataArray :param scanAxis: a list of the name of scan axes, defaults to None :type scanAxis: list or array like, optional :return: The stacked xarray DataSet or DataArray stored the data :rtype: xarray DataSet or DataArray """ if scanAxis is None: scanAxis = dataSet.attrs['scanAxis'] dataArray = dataSet.shotNum stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis)) return stackedDataArray def set_date(self, date): """Set the date of data :param date: The date of data ('YYYY/MM/DD') :type date: str """ date = date.split("/") self.year = int(date[0]) self.month = int(date[1]) self.day = int(date[2]) def create_global(self, shotNum, dataSet=None, date=None): """Creat a the global document in MongoDB :param shotNum: The shot number :type shotNum: str :param dataSet: The xarray DataSet stored the global parameters, defaults to None :type dataSet: xarray DataSet, optional :param date: the date of the data, defaults to None :type date: str, optional """ if not date is None: self.set_date(date) data = { 'year': self.year, 'month': self.month, 'day': self.day, 'shotNum': shotNum, } self.mongoDB['global'].delete_many(data) data = { 'year': self.year, 'month': self.month, 'day': self.day, 'shotNum': shotNum, 'runNum': 0, 'global_parameters' : {}, } global_parameters = self._convert_numpy_type(dataSet.attrs) if not dataSet is None: data['global_parameters'].update(global_parameters) data = self._convert_numpy_type(data) if 'scanAxis' in dataSet.attrs: del data['global_parameters']['scanAxis'] del data['global_parameters']['scanAxisLength'] scanAxis = dataSet.attrs['scanAxis'] data['global_parameters'].update( { key:0 for key in scanAxis } ) stackedDataArray = self._break_dataSet(dataSet) try: stackedDataArray.load() except: pass stackedDataArray = stackedDataArray.groupby('_scanAxis') for i in stackedDataArray: stackedDataArray_single = i[1] data.update( { 'runNum': int(stackedDataArray_single.item()) } ) data['global_parameters'].update( { key: stackedDataArray_single[key].item() for key in scanAxis } ) if '_id' in data: del data['_id'] self.mongoDB['global'].insert_one(data) else: self.mongoDB['global'].insert_one(data) def _add_data_normal(self, shotNum, runNum, data): """Write the data directly to the global document :param shotNum: The shot number :type shotNum: str :param runNum: The run number :type runNum: int :param data: The data to be written :type data: normal python data type """ if runNum is None: runNum = 0 filter = { 'year': self.year, 'month': self.month, 'day': self.day, 'shotNum': shotNum, 'runNum': runNum, } self.mongoDB['global'].update_one(filter, {"$set": data}, upsert=False) def _add_data_xarray_dataArray(self, shotNum, dataArray, name=None, scanAxis=None): """Write the data in a type of xarray DataArray to the MongoDb. :param shotNum: The shot number :type shotNum: str :param dataArray: The xarray DataArray to be written :type dataArray: xarray DataArray :param name: The name of the DataArray, defaults to None :type name: str, optional :param scanAxis: The scan axes of the data, defaults to None :type scanAxis: array like, optional """ if scanAxis is None: scanAxis = list(dataArray.coords) dataArray.attrs = self._convert_numpy_type(dataArray.attrs) stackedDataArray = dataArray.stack(_scanAxis=tuple(scanAxis)) stackedDataArray = stackedDataArray.groupby('_scanAxis') filter = { 'year': self.year, 'month': self.month, 'day': self.day, 'shotNum': shotNum, } for i in stackedDataArray: stackedDataArray_single = i[1].drop('_scanAxis') global_parameters = { 'global_parameters.' + key: stackedDataArray_single[key].item() for key in scanAxis } filter.update(global_parameters) mongoID, _ = self.xdb.put(stackedDataArray_single) data_label = { dataArray.name: { 'name': dataArray.name, 'mongoID': mongoID, 'engine': 'xarray', 'dtype': 'dataArray', } } self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False) def _add_data_xarray_dataSet(self, shotNum, dataSet, name, scanAxis=None): """Write the data in a type of xarray DataSet to the MongoDb. :param shotNum: The shot number :type shotNum: str :param dataSet: The xarray DataSet to be written :type dataSet: xarray DataSet :param name: The name of the DataArray, defaults to None :type name: str, optional :param scanAxis: The scan axes of the data, defaults to None :type scanAxis: array like, optional """ if scanAxis is None: scanAxis = list(dataSet.coords) dataSet.attrs = self._convert_numpy_type(dataSet.attrs) for key in list(dataSet.data_vars): dataSet[key].attrs = self._convert_numpy_type(dataSet[key].attrs) stackedDataSet = dataSet.stack(_scanAxis=tuple(scanAxis)) stackedDataSet = stackedDataSet.groupby('_scanAxis') filter = { 'year': self.year, 'month': self.month, 'day': self.day, 'shotNum': shotNum, } for i in stackedDataSet: stackedDataSet_single = i[1].drop('_scanAxis') global_parameters = { 'global_parameters.' + key: stackedDataSet_single[key].item() for key in scanAxis } filter.update(global_parameters) mongoID, _ = self.xdb.put(dataSet) data_label = { name: { 'name': name, 'mongoID': mongoID, 'engine': 'xarray', 'dtype': 'dataSet', } } self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False) def _add_data_additional(self, shotNum, runNum, data, name): """Write the data in an additional document :param shotNum: The shot number :type shotNum: str :param runNum: The run number :type runNum: int :param data: The data to be written :type data: normal python data type :param name: The name of the data :type name: str """ if runNum is None: runNum = 0 filter = { 'year': self.year, 'month': self.month, 'day': self.day, 'shotNum': shotNum, 'runNum': runNum, } mongoID = self.mongoDB.additional.insert_one(data).inserted_id data_label = { name: { 'name': name, 'mongoID': mongoID, 'engine': 'additional', 'dtype': 'dict', } } self.mongoDB['global'].update_one(filter, {"$set": data_label}, upsert=False) def add_data(self, shotNum, data, runNum=None, date=None, name=None, engine='normal'): """Write a new data to MongoDB :param shotNum: The shot number :type shotNum: str :param data: The data needs to be written :type data: standard python data type, numpy type, xarray DataArray or xarray DataSet :param runNum: The run number, defaults to None :type runNum: int, optional :param date: The date of the data ('YYYY/MM/DD'), defaults to None :type date: str, optional :param name: The name of the data, defaults to None :type name: str, optional :param engine: The engine for different types of the data, defaults to 'normal' :type engine: str, optional """ if not date is None: self.set_date(date) if engine == 'normal': self._add_data_normal(shotNum=shotNum, runNum=runNum, data=data) elif engine == 'xarray': if isinstance(data, type(xr.Dataset())): self._add_data_xarray_dataSet(shotNum=shotNum, dataSet=data, name=name) else: self._add_data_xarray_dataArray(shotNum=shotNum, dataArray=data, name=name) elif engine == 'additional': self._add_data_additional(shotNum=shotNum, runNum=runNum, data=data, name=name) def read_global_single(self, shotNum, runNum, date=None): """Read the global document of specified shot and run from MongoDB. :param shotNum: The shot number :type shotNum: str :param runNum: The run number :type runNum: int :param date: The date of the data ('YYYY/MM/DD'), defaults to None :type date: str, optional :return: The global document :rtype: dict """ if not date is None: self.set_date(date) filter = { 'year': self.year, 'month': self.month, 'day': self.day, 'shotNum': shotNum, 'runNum': runNum, } return self.mongoDB['global'].find_one(filter) def read_global_all(self, shotNum, date=None): """Read the global document of all runs in the specified shot from MongoDB, and extract the scan axes. :param shotNum: The shot number :type shotNum: str :param date: The date of the data ('YYYY/MM/DD'), defaults to None :type date: str, optional :return: The global document :rtype: dict """ from xarray.core.utils import equivalent if not date is None: self.set_date(date) filter = { 'year': self.year, 'month': self.month, 'day': self.day, 'shotNum': shotNum, } result = {} dropped_attrs = OrderedDict() docs = self.mongoDB['global'].find(filter).sort('runNum') docs = [doc['global_parameters'] for doc in docs] for doc in docs: global_parameters = doc result.update( { key: value for key, value in global_parameters.items() if key not in result and key not in dropped_attrs.keys() } ) result = { key: value for key, value in result.items() if key not in global_parameters or equivalent(global_parameters[key], value) } dropped_attrs.update( { key: [] for key in global_parameters if key not in result } ) for doc in docs: global_parameters = doc dropped_attrs.update( { key: np.append(dropped_attrs[key], global_parameters[key]) for key in dropped_attrs.keys() } ) scan_attrs = OrderedDict() scan_length = [] for attrs_key in dropped_attrs.keys(): flag = True for key in scan_attrs.keys(): if equivalent(scan_attrs[key], dropped_attrs[attrs_key]): flag = False result.update({attrs_key: key}) break if flag: scan_attrs.update({ attrs_key: dropped_attrs[attrs_key] }) scan_length = np.append(scan_length, len(dropped_attrs[attrs_key])) result.update( { key: value for key, value in scan_attrs.items() } ) result.update( { "scanAxis": list(scan_attrs.keys()), "scanAxisLength": scan_length, } ) return result def _load_data_single(self, mongoID, engine): """load the document according to given _ID :param mongoID: the ID of document in MongoDB :type mongoID: MongoDB ID object :param engine: _description_ :type engine: _type_ :return: The engine for different types of the data :rtype: str """ if engine == 'xarray': return self.xdb.get(mongoID) if engine == 'additional': return self.mongoDB.additional.find_one({'_id': mongoID}) def load_data_single(self, shotNum=None, runNum=None, globalDict=None, date=None, field=None): """go through the given global document and find all the MongoDB object, then replace them with the document they linked to. :param shotNum: The shot number, defaults to None :type shotNum: str, optional :param runNum: The run number, defaults to None :type runNum: int, optional :param globalDict: The given global document, defaults to None :type globalDict: dict, optional :param date: The date of the data ('YYYY/MM/DD'), defaults to None :type date: str, optional :param field: If not None, the function will only go through these keys, defaults to None :type field: array like, optional :return: The document with loaded data :rtype: dict """ if not date is None: self.set_date(date) if globalDict is None: globalDict = self.read_global_single(shotNum=shotNum, runNum=runNum) if field is None: field = globalDict res = {} for key in field: if isinstance(globalDict[key], dict) and ('mongoID' in globalDict[key]): mongoID = globalDict[key]['mongoID'] engine = globalDict[key]['engine'] res.update( { key: self._load_data_single(mongoID=mongoID, engine=engine) } ) return res def load_data(self, shotNum=None, globalDict=None, date=None, field=None): # load all the data of specified shot pass