import numpy, imageio, os from scipy import interpolate import matplotlib.pyplot as plt from qtutils import * from labscript_utils.connections import ConnectionTable from labscript_utils import device_registry from labscript_c_extensions.runviewer.resample import resample as _resample import h5py def generate_ylabel(channel_name): if channel_name == 'AO_MOT_Grad_Coil_current': label = '$ \\nabla B_{AHH}$' elif channel_name == 'AO_MOT_CompZ_Coil_current': label = '$B_{HH}$' elif channel_name == 'AO_MOT_3D_freq': label = '$\Delta \\nu$' elif channel_name == 'AO_MOT_3D_amp': label = '$P_{3D}$' elif channel_name == 'AO_Red_Push_amp': label = '$P_{Push}$' elif channel_name == 'MOT_2D_Shutter': label = '$P_{2D}$' elif channel_name == 'AO_ODT1_Pow': label = '$P_{cODT1}$' elif channel_name == 'Imaging_RF_Switch': label = '$P_{img}$' elif channel_name == 'MOT_3D_Camera_Trigger': label = '$Cam$' return label def plotSequence(Channels, Switches, animate = False, idx = 0): axs = plt.subplots(len(Channels), figsize = (10, 6), constrained_layout=True, sharex=True)[1] for i, channel_name in enumerate(Channels): #markers_unscaled = sorted(list(shotObj.markers.keys())) #scalehandler = ScaleHandler(markers_unscaled, markers_unscaled, shotObj.stop_time) #unscaled_time = numpy.asarray(Traces[channel_name])[0] #scaled_time = scalehandler.get_scaled_time(unscaled_time) channel_time = numpy.asarray(Traces[channel_name])[0] channel_trace = numpy.asarray(Traces[channel_name])[1] xmin = 0 xmax = shotObj.stop_time dx = 1e-9 resampled_channel_trace = shotObj.resample(channel_time, channel_trace, xmin, xmax, shotObj.stop_time, dx)[1] time = numpy.arange(xmin, xmax, (xmax-xmin)/len(resampled_channel_trace)) switch_time = numpy.asarray(Traces[Switches[i]])[0] switch_trace = numpy.asarray(Traces[Switches[i]])[1] xmin = 0 xmax = shotObj.stop_time dx = 1e-9 resampled_switch_trace = shotObj.resample(switch_time, switch_trace, xmin, xmax, shotObj.stop_time, dx)[1] trace = numpy.multiply(resampled_channel_trace, resampled_switch_trace) if not animate: axs[i].plot(time, trace, '-b') #'-ob' axs[i].fill_between(time, trace, alpha=0.4) else: axs[i].plot(time[0:idx], trace[0:idx], '-b') #'-ob' axs[i].fill_between(time[0:idx], trace[0:idx], alpha=0.4) axs[i].axvline(x=0, color = 'b', linestyle = '--') axs[i].axvline(x=4, color = 'b', linestyle = '--') axs[i].axvline(x=4.315, color = 'b', linestyle = '--') axs[i].axvline(x=shotObj.stop_time, color = 'b', linestyle = '--') axs[i].set_xlim(0, shotObj.stop_time) axs[i].set_ylim(0, max(resampled_channel_trace) + 0.2) if i == len(Channels)-1: axs[i].set_xlabel('Time (s)', fontsize = 16) axs[i].set_ylabel(generate_ylabel(channel_name), fontsize = 16) if not animate: plt.savefig(f'seq.png', format='png', bbox_inches = "tight") plt.show() else: plt.savefig(f'seq-{idx}.png') plt.close() def animateSequence(Channels, Switches): SIZE = 6000 STEP = 58 for i in range(2, SIZE, STEP): plotSequence(Channels, Switches, animate = True, idx = i) with imageio.get_writer('seq_animated.gif', mode='i', fps = 24, loop = 1) as writer: for i in range(2, SIZE, STEP): image = imageio.imread(f'seq-{i}.png') writer.append_data(image) os.remove(f'seq-{i}.png') class ScaleHandler(): def __init__(self, input_times, target_positions, stop_time): # input_times is a list (may be unsorted) of times which should be scaled evenly with target_length # an input list of [1,2,4,6] and target_length of 1.0 will result in: # get_scaled_time(1) -> 1 # get_scaled_time(1.5) -> 1.5 # get_scaled_time(3) -> 2.5 # get_scaled_time(4) -> 3 # get_scaled_time(5) -> 3.5 ... self.org_stop_time = float(stop_time) if not all((x >= 0) and (x <= self.org_stop_time) for x in input_times): raise Exception('shot contains at least one marker before t=0 and/or after the stop time. Non-linear time currently does not support this.') unscaled_times = sorted(input_times) scaled_times = sorted(target_positions) # append values for linear scaling before t=0 and after stop time unscaled_times = [min(unscaled_times)-1e-9] + unscaled_times + [max(unscaled_times) + 1e-9] scaled_times = [min(scaled_times)-1e-9] + scaled_times + [max(scaled_times) + 1e-9] self.get_scaled_time = interpolate.interp1d(unscaled_times, scaled_times, assume_sorted=True, bounds_error=False, fill_value='extrapolate') self.get_unscaled_time = interpolate.interp1d(scaled_times, unscaled_times, assume_sorted=True, bounds_error=False, fill_value='extrapolate') self.scaled_stop_time = self.get_scaled_time(self.org_stop_time) class Shot(object): def __init__(self, path): self.path = path # Store list of traces self._traces = None # store list of channels self._channels = None # store list of markers self._markers = None # store list of shutter changes and callibrations self._shutter_times = None self._shutter_calibrations = {} # Load connection table self.connection_table = ConnectionTable(path) # open h5 file with h5py.File(path, 'r') as file: # Get master pseudoclock self.master_pseudoclock_name = file['connection table'].attrs['master_pseudoclock'] if isinstance(self.master_pseudoclock_name, bytes): self.master_pseudoclock_name = self.master_pseudoclock_name.decode('utf8') else: self.master_pseudoclock_name = str(self.master_pseudoclock_name) # get stop time self.stop_time = file['devices'][self.master_pseudoclock_name].attrs['stop_time'] self.device_names = list(file['devices'].keys()) # Get Shutter Calibrations if 'calibrations' in file and 'Shutter' in file['calibrations']: for name, open_delay, close_delay in numpy.array(file['calibrations']['Shutter']): name = name.decode('utf8') if isinstance(name, bytes) else str(name) self._shutter_calibrations[name] = [open_delay, close_delay] def _load(self): if self._channels is None: self._channels = {} if self._traces is None: self._traces = {} if self._markers is None: self._markers = {} if self._shutter_times is None: self._shutter_times = {} self._load_markers() # Let's walk the connection table, starting with the master pseudoclock master_pseudoclock_device = self.connection_table.find_by_name(self.master_pseudoclock_name) self._load_device(master_pseudoclock_device) def _load_markers(self): with h5py.File(self.path, 'r') as file: if "time_markers" in file: for row in file["time_markers"]: self._markers[row['time']] = {'color': row['color'].tolist()[0], 'label': row['label']} elif "runviewer" in file: for time, val in file["runviewer"]["markers"].attrs.items(): props = val.strip('{}}').rsplit(",", 1) color = list(map(int, props[0].split(":")[1].strip(" ()").split(","))) label = props[1].split(":")[1] self._markers[float(time)] = {'color': color, 'label': label} if 0 not in self._markers: self._markers[0] = {'color': [0,0,0], 'label': 'Start'} if self.stop_time not in self._markers: self._markers[self.stop_time] = {'color': [0,0,0], 'label' : 'End'} def add_trace(self, name, trace, parent_device_name, connection): name = str(name) self._channels[name] = {'device_name': parent_device_name, 'port': connection} self._traces[name] = trace # add shutter times con = self.connection_table.find_by_name(name) if con.device_class == "Shutter" and 'open_state' in con.properties: self.add_shutter_times([(name, con.properties['open_state'])]) # Temporary solution to physical shutter times def add_shutter_times(self, shutters): for name, open_state in shutters: x_values, y_values = self._traces[name] if len(x_values) > 0: change_indices = numpy.where(y_values[:-1] != y_values[1:])[0] change_indices += 1 # use the index of the value that is changed to change_values = list(zip(x_values[change_indices], y_values[change_indices])) change_values.insert(0, (x_values[0], y_values[0])) # insert first value self._shutter_times[name] = {x_value + (self._shutter_calibrations[name][0] if y_value == open_state else self._shutter_calibrations[name][1]): 1 if y_value == open_state else 0 for x_value, y_value in change_values} def _load_device(self, device, clock=None): try: module = device.device_class device_class = device_registry.get_runviewer_parser(module) device_instance = device_class(self.path, device) clocklines_and_triggers = device_instance.get_traces(self.add_trace, clock) for name, trace in clocklines_and_triggers.items(): child_device = self.connection_table.find_by_name(name) for grandchild_device_name, grandchild_device in child_device.child_list.items(): self._load_device(grandchild_device, trace) except Exception: pass def resample(self, data_x, data_y, xmin, xmax, stop_time, num_pixels): """This is a function for downsampling the data before plotting it. Unlike using nearest neighbour interpolation, this method preserves the features of the plot. It chooses what value to use based on what values within a region are most different from the values it's already chosen. This way, spikes of a short duration won't just be skipped over as they would with any sort of interpolation.""" # TODO: Only finely sample the currently visible region. Coarsely sample the rest # x_out = numpy.float32(numpy.linspace(data_x[0], data_x[-1], 4000*(data_x[-1]-data_x[0])/(xmax-xmin))) x_out = numpy.float64(numpy.linspace(xmin, xmax, 3 * 2000 + 2)) y_out = numpy.empty(len(x_out) - 1, dtype=numpy.float64) data_x = numpy.float64(data_x) data_y = numpy.float64(data_y) # TODO: investigate only resampling when necessary. # Currently pyqtgraph sometimes has trouble rendering things # if you don't resample. If a point is far off the graph, # and this point is the first that should be drawn for stepMode, # because there is a long gap before the next point (which is # visible) then there is a problem. # Also need to explicitly handle cases where none of the data # is visible (which resampling does by setting NaNs) # # x_data_slice = data_x[(data_x>=xmin)&(data_x<=xmax)] # print len(data_x) # if len(x_data_slice) < 3*2000+2: # x_out = x_data_slice # y_out = data_y[(data_x>=xmin)&(data_x<=xmax)][:-1] # logger.info('skipping resampling') # else: resampling = True if resampling: _resample(data_x, data_y, x_out, y_out, numpy.float64(stop_time)) # self.__resample4(data_x, data_y, x_out, y_out, numpy.float32(stop_time)) else: x_out, y_out = data_x, data_y return x_out, y_out @property def channels(self): if self._channels is None: self._load() return self._channels.keys() @property def markers(self): if self._markers is None: self._load() return self._markers @property def traces(self): if self._traces is None: self._load() return self._traces @property def shutter_times(self): if self._shutter_times is None: self._load() return self._shutter_times if __name__ == "__main__": filepath = 'C:/Users/Karthik/Desktop/PreTalk/Plot Sequence/2023-01-25_0069_ODT_Imaging_9.h5' shotObj = Shot(filepath) shotObj._load() Channels = list(shotObj.channels) Traces = shotObj.traces """ 'prawn_clock_line_0', 'prawn_clock_line_1', 'Dummy_1', 'Imaging_RF_Switch', 'Imaging_Shutter', 'MOT_2D_Shutter', 'MOT_3D_RF_Switch', 'MOT_3D_Shutter', 'Push_Beam_Blue_Shutter', 'Push_Beam_Blue_Switch', 'Push_Beam_Red_Shutter', 'Push_Beam_Red_Switch', 'CDT1_Switch', 'CDT2_Switch', 'MOT_3D_Camera_Trigger', 'MOT_3D_Camera_trigger', 'MOT_CompX_Coil_Switch', 'MOT_CompY_Coil_Switch', 'MOT_CompZ_Coil_Switch', 'MOT_Grad_Coil_Switch', 'ODT_Axis_Camera_Trigger', 'ODT_Axis_Camera_trigger', 'AO_Blue_Push_amp', 'AO_Blue_Push_freq', 'AO_Imaging_amp', 'AO_Imaging_freq', 'AO_MOT_3D_amp', 'AO_MOT_3D_freq', 'AO_MOT_CompX_Coil_current', 'AO_MOT_CompX_Coil_voltage', 'AO_MOT_CompY_Coil_current', 'AO_MOT_CompY_Coil_voltage', 'AO_MOT_CompZ_Coil_current', 'AO_MOT_CompZ_Coil_voltage', 'AO_MOT_Grad_Coil_current', 'AO_MOT_Grad_Coil_voltage', 'AO_Red_Push_amp', 'AO_Red_Push_freq', 'AO_Dummy', 'AO_ODT1_Mod', 'AO_ODT1_Pow', 'AO_ODT2_Pow' """ """ Without Imaging """ Channels = ['MOT_2D_Shutter', 'AO_Red_Push_amp', 'AO_MOT_3D_amp', 'AO_MOT_3D_freq', 'AO_MOT_Grad_Coil_current', 'AO_MOT_CompZ_Coil_current', 'AO_ODT1_Pow'] Switches = ['MOT_2D_Shutter', 'Push_Beam_Red_Switch', 'MOT_3D_RF_Switch', 'MOT_3D_RF_Switch', 'MOT_Grad_Coil_Switch', 'MOT_CompZ_Coil_Switch', 'CDT1_Switch'] """ With Imaging """ # Channels = ['MOT_2D_Shutter', 'AO_Red_Push_amp', 'AO_MOT_3D_amp', 'AO_MOT_3D_freq', 'AO_MOT_Grad_Coil_current', 'AO_MOT_CompZ_Coil_current', 'AO_ODT1_Pow', 'Imaging_RF_Switch', 'MOT_3D_Camera_Trigger'] # Switches = ['MOT_2D_Shutter', 'Push_Beam_Red_Switch', 'MOT_3D_RF_Switch', 'MOT_3D_RF_Switch', 'MOT_Grad_Coil_Switch', 'MOT_CompZ_Coil_Switch', 'CDT1_Switch', 'Imaging_RF_Switch', 'MOT_3D_Camera_Trigger'] plotSequence(Channels, Switches) # animateSequence(Channels, Switches)