NNDy/scripts/DA/TestDA.py
castaneda 3c2255a589 HOWTO define the DA script for NNDy
complete commenting added with instructions for writing an analysis script that is compatible with NNDy optimizer and returns the cost associated with the run just performed
2025-03-21 16:19:10 +01:00

158 lines
4.9 KiB
Python

from lyse import Run as lyse_run
import h5py
import sys
import pandas as pd
import numpy as np
import xarray as xr
import lmfit
from matplotlib import pyplot as plt
#here it looks for the DA file, if not in the same folder as NNDy you can specify the folder in new_path
new_path = r'C:\Users\DyLab\PrepLab\Data_Analysis'
if new_path not in sys.path:
sys.path.append(new_path)
from Analyser.FitAnalyser import Gaussian2dModel
from Analyser.FitAnalyser import FitAnalyser
from ToolFunction.ToolFunction import get_scanAxis
#reference values measured without AOM for 5 minutes
free_std = 1.815672e-6 #THz
free_freq = 438.585992 #THz
free_intensity_max = 186
#handling relative weight in the cost scaling
cost_scaling_factor = {
'red_chi2': 0.01,
'std_freq': 1,
'peak_intensity': 10
}
# THE COST FUNCTION
#
#in every DA script for NNDy we need a cost function defined as below
def cost(hdf_outfile):
#takes as argument:
# the hdf5 file of the shot, where it can read the data of the shot and the global variables
#return:
# dictionary with 'cost' item - and uncertainity if available- or 'bad' : True for a bad run
#it would be better if you return all three items everytime, but the code can handle if you don't
#then the role of the cost function is to initiate the analysis and obtain the values of the observables of interest from which you want to calculate the cost
output = analysis(hdf_outfile)
print(f'output:\n{output}')
# recommended way to handle bad runs
if output['bad']:
return {
'cost': np.inf,
'bad': True
}
cost_value = (
(1 + abs( output['red_chi2'] - 1 ))
* cost_scaling_factor['red_chi2'] +
output['std_freq'] / free_std
* cost_scaling_factor['std_freq'] +
free_intensity_max / output['peak_intensity']
* cost_scaling_factor['peak_intensity']
) / 3
#print('doing return now')
return {
'cost': cost_value,
'bad': False
}
#this function is called within cost and is the one which handles the analysis
#it should be substituted with the imports that will execute the analysis or a custom function like this one
def analysis(hdf_outfile):
bad = False
group_name = 'DA'
lyse_run_obj = lyse_run(hdf_outfile)
lyse_run_obj.set_group(group_name)
#print(type(hdf_outfile))
with h5py.File(hdf_outfile, 'r') as f:
data = f['results/wlm']['wlm_df'][:] # Load as NumPy array
df = pd.DataFrame(data,
columns = ['timestamp [s]','wavelength [nm]', 'frequency [THz]'])
#we're conservative and declare a false measurement if at any point the device is over or underexposed
if (df < 0 ).any().any():
bad = True
return {'bad': bad }
#print(f'df imported, all right {df}')
mean_wl = df['wavelength [nm]'].mean()
mean_freq = df['frequency [THz]'].mean()
std_wl = df['wavelength [nm]'].std()
std_freq = df['frequency [THz]'].std()
img = lyse_run_obj.get_image('Camera','camera_snap','image')
peak_intensity, wx, wy, red_chi2, fitted_img = img_Analyzer(img)
#save fitted image
#print(f"Saving frame(s) {group_name}/{frametype}.")
frametype = 'fitted_beam_shape'
with h5py.File(hdf_outfile, 'r+') as f:
group = f.require_group(group_name)
dset = group.create_dataset(
frametype, data=fitted_img, dtype='uint16', compression='gzip'
)
# Specify this dataset should be viewed as an image
dset.attrs['CLASS'] = np.bytes_('IMAGE')
dset.attrs['IMAGE_VERSION'] = np.bytes_('1.2')
dset.attrs['IMAGE_SUBCLASS'] = np.bytes_('IMAGE_GRAYSCALE')
dset.attrs['IMAGE_WHITE_IS_ZERO'] = np.uint8(0)
output = {'mean_wl': mean_wl, 'std_wl': std_wl,
'mean_freq': mean_freq, 'std_freq': std_freq,
'peak_intensity': peak_intensity,
'wx': wx, 'wy': wy,
'red_chi2': red_chi2,
'bad': bad}
lyse_run_obj.save_result('output', output)
#print(output)
return output
#subprocess called within the analysis routine
def img_Analyzer(image):
dimensions = ['x', 'y']
# Converting image from NumPy array to Xarray DataArray
image = xr.DataArray(image, dims = dimensions)
fitModel = Gaussian2dModel()
fitAnalyser = FitAnalyser(fitModel, fitDim=2)
params = fitAnalyser.guess(image)
#print(params.item())
fitResult = fitAnalyser.fit(image, params).load()
#lmfit.report_fit(fitResult.item())
fitted_image = fitResult.item().eval(x = image.x, y = image.y)
fitted_image = xr.DataArray(fitted_image, dims = dimensions)
val = fitAnalyser.get_fit_value(fitResult)
#print(val)
#print(image.max())
return np.float64(image.max()), np.float64(val['sigmax']), np.float64(val['sigmay']), np.float64(fitResult.item().redchi), fitted_image