You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
318 lines
14 KiB
318 lines
14 KiB
import numpy, imageio, os
|
|
import matplotlib.pyplot as plt
|
|
from qtutils import *
|
|
from labscript_utils.connections import ConnectionTable
|
|
from labscript_utils import device_registry
|
|
from labscript_c_extensions.runviewer.resample import resample as _resample
|
|
import h5py
|
|
|
|
class Shot(object):
|
|
|
|
def __init__(self, path):
|
|
self.path = path
|
|
|
|
# Store list of traces
|
|
self._traces = None
|
|
# store list of channels
|
|
self._channels = None
|
|
# store list of markers
|
|
self._markers = None
|
|
|
|
# store list of shutter changes and callibrations
|
|
self._shutter_times = None
|
|
self._shutter_calibrations = {}
|
|
|
|
# Load connection table
|
|
self.connection_table = ConnectionTable(path)
|
|
|
|
# open h5 file
|
|
with h5py.File(path, 'r') as file:
|
|
# Get master pseudoclock
|
|
self.master_pseudoclock_name = file['connection table'].attrs['master_pseudoclock']
|
|
if isinstance(self.master_pseudoclock_name, bytes):
|
|
self.master_pseudoclock_name = self.master_pseudoclock_name.decode('utf8')
|
|
else:
|
|
self.master_pseudoclock_name = str(self.master_pseudoclock_name)
|
|
|
|
# get stop time
|
|
self.stop_time = file['devices'][self.master_pseudoclock_name].attrs['stop_time']
|
|
|
|
self.device_names = list(file['devices'].keys())
|
|
|
|
# Get Shutter Calibrations
|
|
if 'calibrations' in file and 'Shutter' in file['calibrations']:
|
|
for name, open_delay, close_delay in numpy.array(file['calibrations']['Shutter']):
|
|
name = name.decode('utf8') if isinstance(name, bytes) else str(name)
|
|
self._shutter_calibrations[name] = [open_delay, close_delay]
|
|
|
|
def _load(self):
|
|
if self._channels is None:
|
|
self._channels = {}
|
|
if self._traces is None:
|
|
self._traces = {}
|
|
if self._markers is None:
|
|
self._markers = {}
|
|
if self._shutter_times is None:
|
|
self._shutter_times = {}
|
|
|
|
self._load_markers()
|
|
# Let's walk the connection table, starting with the master pseudoclock
|
|
master_pseudoclock_device = self.connection_table.find_by_name(self.master_pseudoclock_name)
|
|
|
|
self._load_device(master_pseudoclock_device)
|
|
|
|
def _load_markers(self):
|
|
with h5py.File(self.path, 'r') as file:
|
|
if "time_markers" in file:
|
|
for row in file["time_markers"]:
|
|
self._markers[row['time']] = {'color': row['color'].tolist()[0], 'label': row['label']}
|
|
elif "runviewer" in file:
|
|
for time, val in file["runviewer"]["markers"].attrs.items():
|
|
props = val.strip('{}}').rsplit(",", 1)
|
|
color = list(map(int, props[0].split(":")[1].strip(" ()").split(",")))
|
|
label = props[1].split(":")[1]
|
|
self._markers[float(time)] = {'color': color, 'label': label}
|
|
if 0 not in self._markers:
|
|
self._markers[0] = {'color': [0,0,0], 'label': 'Start'}
|
|
if self.stop_time not in self._markers:
|
|
self._markers[self.stop_time] = {'color': [0,0,0], 'label' : 'End'}
|
|
|
|
def add_trace(self, name, trace, parent_device_name, connection):
|
|
name = str(name)
|
|
self._channels[name] = {'device_name': parent_device_name, 'port': connection}
|
|
self._traces[name] = trace
|
|
|
|
# add shutter times
|
|
con = self.connection_table.find_by_name(name)
|
|
if con.device_class == "Shutter" and 'open_state' in con.properties:
|
|
self.add_shutter_times([(name, con.properties['open_state'])])
|
|
|
|
# Temporary solution to physical shutter times
|
|
def add_shutter_times(self, shutters):
|
|
for name, open_state in shutters:
|
|
x_values, y_values = self._traces[name]
|
|
if len(x_values) > 0:
|
|
change_indices = numpy.where(y_values[:-1] != y_values[1:])[0]
|
|
change_indices += 1 # use the index of the value that is changed to
|
|
change_values = list(zip(x_values[change_indices], y_values[change_indices]))
|
|
change_values.insert(0, (x_values[0], y_values[0])) # insert first value
|
|
self._shutter_times[name] = {x_value + (self._shutter_calibrations[name][0] if y_value == open_state else self._shutter_calibrations[name][1]): 1 if y_value == open_state else 0 for x_value, y_value in change_values}
|
|
|
|
def _load_device(self, device, clock=None):
|
|
try:
|
|
module = device.device_class
|
|
device_class = device_registry.get_runviewer_parser(module)
|
|
device_instance = device_class(self.path, device)
|
|
|
|
clocklines_and_triggers = device_instance.get_traces(self.add_trace, clock)
|
|
|
|
|
|
for name, trace in clocklines_and_triggers.items():
|
|
child_device = self.connection_table.find_by_name(name)
|
|
for grandchild_device_name, grandchild_device in child_device.child_list.items():
|
|
self._load_device(grandchild_device, trace)
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
def resample(self, data_x, data_y, xmin, xmax, stop_time, num_pixels):
|
|
"""This is a function for downsampling the data before plotting
|
|
it. Unlike using nearest neighbour interpolation, this method
|
|
preserves the features of the plot. It chooses what value to
|
|
use based on what values within a region are most different
|
|
from the values it's already chosen. This way, spikes of a short
|
|
duration won't just be skipped over as they would with any sort
|
|
of interpolation."""
|
|
# TODO: Only finely sample the currently visible region. Coarsely sample the rest
|
|
# x_out = numpy.float32(numpy.linspace(data_x[0], data_x[-1], 4000*(data_x[-1]-data_x[0])/(xmax-xmin)))
|
|
x_out = numpy.float64(numpy.linspace(xmin, xmax, 3 * 2000 + 2))
|
|
y_out = numpy.empty(len(x_out) - 1, dtype=numpy.float64)
|
|
data_x = numpy.float64(data_x)
|
|
data_y = numpy.float64(data_y)
|
|
|
|
# TODO: investigate only resampling when necessary.
|
|
# Currently pyqtgraph sometimes has trouble rendering things
|
|
# if you don't resample. If a point is far off the graph,
|
|
# and this point is the first that should be drawn for stepMode,
|
|
# because there is a long gap before the next point (which is
|
|
# visible) then there is a problem.
|
|
# Also need to explicitly handle cases where none of the data
|
|
# is visible (which resampling does by setting NaNs)
|
|
#
|
|
# x_data_slice = data_x[(data_x>=xmin)&(data_x<=xmax)]
|
|
# print len(data_x)
|
|
# if len(x_data_slice) < 3*2000+2:
|
|
# x_out = x_data_slice
|
|
# y_out = data_y[(data_x>=xmin)&(data_x<=xmax)][:-1]
|
|
# logger.info('skipping resampling')
|
|
# else:
|
|
resampling = True
|
|
|
|
if resampling:
|
|
_resample(data_x, data_y, x_out, y_out, numpy.float64(stop_time))
|
|
# self.__resample4(data_x, data_y, x_out, y_out, numpy.float32(stop_time))
|
|
else:
|
|
x_out, y_out = data_x, data_y
|
|
|
|
return x_out, y_out
|
|
|
|
def find_nearest(self, array, value):
|
|
array = numpy.asarray(array)
|
|
idx = (numpy.abs(array - value)).argmin()
|
|
return idx, array[idx]
|
|
|
|
def generate_ylabel(self, channel_name):
|
|
if channel_name == 'AO_MOT_Grad_Coil_current':
|
|
label = '$ \\nabla B_{AHH}$'
|
|
elif channel_name == 'AO_MOT_CompZ_Coil_current':
|
|
label = '$B_{HH}$'
|
|
elif channel_name == 'AO_MOT_3D_freq':
|
|
label = '$\Delta \\nu$'
|
|
elif channel_name == 'AO_MOT_3D_amp':
|
|
label = '$P_{3D}$'
|
|
elif channel_name == 'AO_Red_Push_amp':
|
|
label = '$P_{Push}$'
|
|
elif channel_name == 'MOT_2D_Shutter':
|
|
label = '$P_{2D}$'
|
|
elif channel_name == 'AO_ODT1_Pow':
|
|
label = '$P_{cODT1}$'
|
|
elif channel_name == 'Imaging_RF_Switch':
|
|
label = '$P_{img}$'
|
|
elif channel_name == 'MOT_3D_Camera_Trigger':
|
|
label = '$Cam$'
|
|
return label
|
|
|
|
def plotSequence(self, Channels, Switches, PlotRange, animate = False, idx = 0):
|
|
|
|
Traces = self.traces
|
|
|
|
axs = plt.subplots(len(Channels), figsize = (10, 6), constrained_layout=True, sharex=True)[1]
|
|
|
|
for i, channel_name in enumerate(Channels):
|
|
|
|
channel_time = numpy.asarray(Traces[channel_name])[0]
|
|
channel_trace = numpy.asarray(Traces[channel_name])[1]
|
|
xmin = 0
|
|
xmax = self.stop_time
|
|
dx = 1e-9
|
|
resampled_channel_trace = self.resample(channel_time, channel_trace, xmin, xmax, self.stop_time, dx)[1]
|
|
time = numpy.arange(xmin, xmax, (xmax-xmin)/len(resampled_channel_trace))
|
|
|
|
switch_time = numpy.asarray(Traces[Switches[i]])[0]
|
|
switch_trace = numpy.asarray(Traces[Switches[i]])[1]
|
|
xmin = 0
|
|
xmax = self.stop_time
|
|
dx = 1e-9
|
|
resampled_switch_trace = self.resample(switch_time, switch_trace, xmin, xmax, self.stop_time, dx)[1]
|
|
|
|
trace = numpy.multiply(resampled_channel_trace, resampled_switch_trace)
|
|
|
|
TrimRange = [self.find_nearest(time, PlotRange[0])[0], self.find_nearest(time, PlotRange[1])[0]]
|
|
trimmed_time = time[TrimRange[0]:TrimRange[1]]
|
|
trimmed_trace = trace[TrimRange[0]:TrimRange[1]]
|
|
|
|
if not animate:
|
|
axs[i].plot(trimmed_time, trimmed_trace, '-b') #'-ob'
|
|
axs[i].fill_between(trimmed_time, trimmed_trace, alpha=0.4)
|
|
else:
|
|
axs[i].plot(time[0:TrimRange[0] + idx], trace[0:TrimRange[0] + idx], '-b') #'-ob'
|
|
axs[i].fill_between(time[0:TrimRange[0] + idx], trace[0:TrimRange[0] + idx], alpha=0.4)
|
|
|
|
axs[i].axvline(x=0, color = 'b', linestyle = '--')
|
|
axs[i].axvline(x=4, color = 'b', linestyle = '--')
|
|
axs[i].axvline(x=4.315, color = 'b', linestyle = '--')
|
|
axs[i].axvline(x=self.stop_time, color = 'b', linestyle = '--')
|
|
axs[i].set_xlim(0, self.stop_time)
|
|
axs[i].set_ylim(0, max(resampled_channel_trace) + 0.2)
|
|
if i == len(Channels)-1:
|
|
axs[i].set_xlabel('Time (s)', fontsize = 16)
|
|
axs[i].set_ylabel(self.generate_ylabel(channel_name), fontsize = 16)
|
|
|
|
if not animate:
|
|
plt.savefig(f'seq.png', format='png', bbox_inches = "tight")
|
|
plt.show()
|
|
else:
|
|
plt.savefig(f'seq-{idx}.png')
|
|
plt.close()
|
|
|
|
def animateSequence(self, Channels, Switches, PlotRange):
|
|
|
|
SIZE = 6000
|
|
STEP = 58
|
|
|
|
for i in range(2, SIZE, STEP):
|
|
self.plotSequence(Channels, Switches, PlotRange, animate = True, idx = i)
|
|
|
|
with imageio.get_writer('seq_animated.gif', mode='i', fps = 24, loop = 1) as writer:
|
|
for i in range(2, SIZE, STEP):
|
|
image = imageio.imread(f'seq-{i}.png')
|
|
writer.append_data(image)
|
|
|
|
for i in range(2, SIZE, STEP):
|
|
os.remove(f'seq-{i}.png')
|
|
|
|
@property
|
|
def channels(self):
|
|
if self._channels is None:
|
|
self._load()
|
|
|
|
return self._channels.keys()
|
|
|
|
@property
|
|
def markers(self):
|
|
if self._markers is None:
|
|
self._load()
|
|
return self._markers
|
|
|
|
@property
|
|
def traces(self):
|
|
if self._traces is None:
|
|
self._load()
|
|
return self._traces
|
|
|
|
@property
|
|
def shutter_times(self):
|
|
if self._shutter_times is None:
|
|
self._load()
|
|
return self._shutter_times
|
|
|
|
if __name__ == "__main__":
|
|
|
|
filepath = 'C:/Users/Karthik/Desktop/PreTalk/Plot Sequence/2023-01-25_0069_ODT_Imaging_9.h5'
|
|
|
|
shotObj = Shot(filepath)
|
|
shotObj._load()
|
|
|
|
Channels = list(shotObj.channels)
|
|
|
|
"""
|
|
'prawn_clock_line_0', 'prawn_clock_line_1', 'Dummy_1', 'Imaging_RF_Switch', 'Imaging_Shutter', 'MOT_2D_Shutter', 'MOT_3D_RF_Switch', 'MOT_3D_Shutter', 'Push_Beam_Blue_Shutter',
|
|
'Push_Beam_Blue_Switch', 'Push_Beam_Red_Shutter', 'Push_Beam_Red_Switch', 'CDT1_Switch', 'CDT2_Switch', 'MOT_3D_Camera_Trigger', 'MOT_3D_Camera_trigger', 'MOT_CompX_Coil_Switch',
|
|
'MOT_CompY_Coil_Switch', 'MOT_CompZ_Coil_Switch', 'MOT_Grad_Coil_Switch', 'ODT_Axis_Camera_Trigger', 'ODT_Axis_Camera_trigger', 'AO_Blue_Push_amp', 'AO_Blue_Push_freq', 'AO_Imaging_amp',
|
|
'AO_Imaging_freq', 'AO_MOT_3D_amp', 'AO_MOT_3D_freq', 'AO_MOT_CompX_Coil_current', 'AO_MOT_CompX_Coil_voltage', 'AO_MOT_CompY_Coil_current', 'AO_MOT_CompY_Coil_voltage', 'AO_MOT_CompZ_Coil_current',
|
|
'AO_MOT_CompZ_Coil_voltage', 'AO_MOT_Grad_Coil_current', 'AO_MOT_Grad_Coil_voltage', 'AO_Red_Push_amp', 'AO_Red_Push_freq', 'AO_Dummy', 'AO_ODT1_Mod', 'AO_ODT1_Pow', 'AO_ODT2_Pow'
|
|
"""
|
|
|
|
""" Without Imaging """
|
|
Channels = ['MOT_2D_Shutter', 'AO_Red_Push_amp', 'AO_MOT_3D_amp', 'AO_MOT_3D_freq', 'AO_MOT_Grad_Coil_current', 'AO_MOT_CompZ_Coil_current', 'AO_ODT1_Pow']
|
|
Switches = ['MOT_2D_Shutter', 'Push_Beam_Red_Switch', 'MOT_3D_RF_Switch', 'MOT_3D_RF_Switch', 'MOT_Grad_Coil_Switch', 'MOT_CompZ_Coil_Switch', 'CDT1_Switch']
|
|
|
|
""" With Imaging """
|
|
# Channels = ['MOT_2D_Shutter', 'AO_Red_Push_amp', 'AO_MOT_3D_amp', 'AO_MOT_3D_freq', 'AO_MOT_Grad_Coil_current', 'AO_MOT_CompZ_Coil_current', 'AO_ODT1_Pow', 'Imaging_RF_Switch', 'MOT_3D_Camera_Trigger']
|
|
# Switches = ['MOT_2D_Shutter', 'Push_Beam_Red_Switch', 'MOT_3D_RF_Switch', 'MOT_3D_RF_Switch', 'MOT_Grad_Coil_Switch', 'MOT_CompZ_Coil_Switch', 'CDT1_Switch', 'Imaging_RF_Switch', 'MOT_3D_Camera_Trigger']
|
|
|
|
""" Plot Full Sequence """
|
|
# TimeRange = [0.0, shotObj.stop_time]
|
|
|
|
""" Plot till loading of MOT """
|
|
# TimeRange = [0.0, 4.0]
|
|
|
|
""" Plot from loading of MOT till end of sequence"""
|
|
TimeRange = [4.0, shotObj.stop_time]
|
|
|
|
""" Plot sequence """
|
|
# shotObj.plotSequence(Channels, Switches, PlotRange = TimeRange)
|
|
""" Animate sequence """
|
|
shotObj.animateSequence(Channels, Switches, PlotRange = TimeRange)
|
|
|
|
|