Calculations/TimeSeriesAnalyzer/TimeSeriesAnalyzer.py

334 lines
17 KiB
Python

import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from scipy.signal import find_peaks, resample, detrend
import csv
"""
NOTES
When you compute the Fourier transform of a signal, you obtain a set of complex-valued coefficients corresponding to different frequency components present in the signal. These coefficients represent the amplitude and phase of sinusoidal components at specific frequencies.
The frequency range covered by the Fourier transform output is divided into discrete frequency bins, each representing a specific frequency component. The width of these bins depends on the sampling rate and the length of the input signal. In a typical implementation, the frequency bins are evenly spaced.
In practical terms, the power spectrum bins correspond to the frequency components at which the power spectral density (PSD) or magnitude squared of the Fourier coefficients are evaluated. These bins are used to represent the distribution of signal power across different frequency components, providing insights into the frequency content of the signal.
The bin width, also called the resolution bandwidth, is simply sampling rate / Total number of samples = (1/dt)/N
N can be re-written in terms of t, dt. Putting this in to the expression for Sxx = 2 * |fft|^2/(Resolution Bandwidth * Noise Power Bandwidth) We get the expression used here. Note that the fft computed above is scaled by N which results eventually in the factor dt^2/T.
The noise power bandwidth is typically 1 Hz if no windowing/tapering function is used.
Compute the broadband noise level in Vrms2/Hz by summing all the power spectrum bins, excluding any peaks and the DC component, and dividing the sum by the equivalent noise bandwidth of the window
The equivalent noise bandwidth (ENBW) of a window in the context of a power spectrum refers to a measure of the effective bandwidth of the window function applied to the signal before taking the Fourier transform.
When you compute the power spectrum of a signal using a windowing function (e.g., Hamming window, Hann window, etc.), the window modifies the original signal by tapering its edges. This tapering reduces the spectral leakage and improves frequency resolution but also introduces a smoothing effect, which can affect the estimation of the signal's power at different frequencies.
The equivalent noise bandwidth provides a way to quantify the effective bandwidth of the window function in terms of its impact on noise power. It represents the width of a rectangular filter that would have the same noise power as the windowed signal.
In practical terms, when calculating the power spectrum of a signal using a window, the ENBW is used to adjust the power spectrum to account for the smoothing effect of the window. Dividing the sum of the power spectrum bins by the ENBW yields an estimate of the noise power per unit frequency bandwidth.
ENBW is often used in the context of noise measurements or signal processing applications where accurate estimation of noise power is important. It helps ensure that the power spectrum accurately reflects the true power distribution of the signal, accounting for the effects of windowing.
The noise floor refers to the minimum level of signal that can be reliably distinguished from the background noise. It represents the lowest amplitude of a signal that can be detected or measured with reasonable accuracy.
The noise floor is often defined as the RMS (Root Mean Square) value of the background noise in a given frequency band.
To compute the noise floor value from the power spectral density (PSD) values, you typically need to analyze the portion of the PSD that corresponds to the background noise.
"""
def compute_autocorrelation(data):
print("Calculating autocorrelation...")
yunbiased = data-np.mean(data)
ynorm = np.sum(yunbiased**2)
acor = np.correlate(yunbiased, yunbiased, "same")/ynorm
return acor
def pre_process_data(data, new_sampling_rate):
# Resample the time series
# Define the new sampling rate and calculate the new time values
n_points = int((len(data[:, 0]) * (data[:, 0][1] - data[:, 0][0])) * new_sampling_rate)
t = np.linspace(data[:, 0][0], data[:, 0][-1], n_points)
x_array = resample(data[:, 1], n_points)
x = detrend(x_array - x_array.mean())
return np.column_stack((t, x))
def smoothen_data(data, window_size):
# Smooth data by doing a moving average
return np.convolve(data, np.ones(window_size, dtype=int)/window_size, mode='valid')
def compute_psd(data, new_sampling_rate, window_size = 21):
"""
A power spectral density (PSD) takes the amplitude of the FFT, multiplies it by its complex conjugate and normalizes it to the frequency bin width.
"""
processed_data = pre_process_data(data, new_sampling_rate)
t, x = processed_data[:, 0], processed_data[:, 1]
dt = t[1] - t[0] # Define the sampling interval
N = x.shape[0] # Define the total number of data points
T = N * dt # Define the total duration of the data
# Calculate fft
print("Calculating power spectrum...")
fft_ts = np.fft.fft(x) # Compute Fourier transform of x
Sxx = 2 * (dt ** 2 / T) * (fft_ts * fft_ts.conj()) # Compute spectrum
Sxx = Sxx[:int(len(x) / 2)] # Ignore negative frequencies, we have accounted for this by the scaling factor of 2 in the previous step
return processed_data, smoothen_data(Sxx.real, window_size)
def compute_RIN(time, voltages, Sxx_smooth):
dt = time[1] - time[0] # Define the sampling interval
N = voltages.shape[0] # Define the total number of data points
T = N * dt # Define the total duration of the data
df = 1 / T.max()
# Compute the average power
average_P = np.mean(np.squared(voltages))
# Calculate the RIN
RIN_Sxx_smooth = 10 * np.log10(Sxx_smooth / (average_P * df))
return RIN_Sxx_smooth
def find_noise_peaks(psd, faxis, freq_range, threshold):
"""
Compute the peak power in the specified frequency range.
Parameters:
psd_values: array-like
Power spectral density values.
faxis: array-like
Frequencies corresponding to the PSD values.
freq_range: tuple
Tuple containing the start and end frequencies of the range of interest.
threshold: scalar
Threshold for peak heights
Returns:
float: Peak power in the specified frequency range.
"""
start_freq, end_freq = freq_range
idx_start = np.argmax(faxis >= start_freq)
idx_end = np.argmax(faxis >= end_freq)
sliced_psd = psd[idx_start:idx_end]
sliced_faxis = faxis[idx_start:idx_end]
peak_indices, _ = find_peaks(sliced_psd, height=threshold)
peak_powers = 10 * np.log10(sliced_psd[peak_indices])
peak_frequencies = np.around(sliced_faxis[peak_indices], 2)
return peak_powers, peak_frequencies
def compute_noise_level(psd, resolution_bandwidth, exclude_peaks=False, faxis=None, freq_range=None, threshold=None):
"""
Compute the noise level from a power spectral density (PSD).
Parameters:
psd: array-like
One-sided power spectral density.
resolution_bandwidth: float
Bin width
Returns:
float: Noise level (Vrms^2).
"""
noise_level = None
# Exclude peaks from the sum
if exclude_peaks and threshold is not None:
threshold = 10**(threshold/10)
if freq_range is None:
peak_indices, _ = find_peaks(psd, height=threshold)
noise_level = resolution_bandwidth * np.sum([psd[i] for i in range(len(psd)) if i not in peak_indices])
else:
start_freq, end_freq = freq_range
idx_start = np.argmax(faxis >= start_freq)
idx_end = np.argmax(faxis >= end_freq)
sliced_psd = psd[idx_start:idx_end]
peak_indices, _ = find_peaks(sliced_psd, height=threshold)
noise_level = resolution_bandwidth * np.sum([sliced_psd[i] for i in range(len(sliced_psd)) if i not in peak_indices])
else:
if freq_range is None:
noise_level = resolution_bandwidth * np.sum([psd[i] for i in range(len(psd))])
else:
start_freq, end_freq = freq_range
idx_start = np.argmax(faxis >= start_freq)
idx_end = np.argmax(faxis >= end_freq)
sliced_psd = psd[idx_start:idx_end]
noise_level = resolution_bandwidth * np.sum([sliced_psd[i] for i in range(len(sliced_psd))])
return noise_level
def extract_data(filepath):
# Open the CSV file
with open(filepath, newline='') as csvfile:
# Skip the first line (header)
next(csvfile)
# Read the CSV file using csv.reader
reader = csv.reader(csvfile)
# Read the headers from the second line
next_header = next(reader)
string_number = next_header[-1]
try:
time_step = int(string_number)
except ValueError:
try:
time_step = float(string_number)
except ValueError:
print("The string does not represent a valid number.")
# Initialize lists to store the first and second values
first_column = []
second_column = []
# Iterate over each row in the CSV file
for row in reader:
# Extract the first and second values from the row and convert to float
first_value = float(row[0])
second_value = float(row[1])
# Append the values to their respective lists
first_column.append(first_value)
second_column.append(second_value)
# Convert the lists into numpy arrays
time_array = np.arange(0, len(first_column)*time_step, time_step)
voltage_array = np.array(second_column)
# Stack the arrays horizontally to form a single 2D array
data_array = np.column_stack((time_array, voltage_array))
return data_array
def plot_analysis(data, data_bkg, Sxx, Sxx_bkg, data_str, bkg_str, peak_find_threshold, window_size = 21, plot_only_psd = True):
time, voltages = data[:, 0], data[:, 1]
time_bkg, voltages_bkg = data_bkg[:, 0], data_bkg[:, 1]
dt = time[1] - time[0] # Define the sampling interval
N = voltages.shape[0] # Define the total number of data points
T = N * dt # Define the total duration of the data
df = 1 / T.max()
fNQ = 1 / dt / 2 # Determine Nyquist frequency
faxis = smoothen_data(np.linspace(0,fNQ,N//2), window_size) # Construct frequency axis
""" Noise levels in units of Vrms^2/Hz"""
# resolution_bandwidth = (1/dt)/N
# broadband_noise_level = compute_noise_level(Sxx, resolution_bandwidth) # Integrates across PSD from DC to Nyquist frequency, gives result in in units of Vrms^2/Hz
# noise_floor = np.mean(Sxx_bkg)
freq_range = (50, max(faxis))
threshold = 10**(peak_find_threshold/10)
peak_powers, peak_frequencies = find_noise_peaks(Sxx, faxis, freq_range, threshold)
if plot_only_psd:
plt.figure(figsize=(12, 8))
# Plot Power Spectrum in dB
plt.semilogx(faxis, 10 * np.log10(Sxx_bkg), color='orange', linewidth=0.5, label = bkg_str)
plt.semilogx(faxis, 10 * np.log10(Sxx), color='green', linewidth=2, label = data_str)
# plt.axhline(y=10 * np.log10(broadband_noise_level), color='red', linewidth=2, linestyle='--', label=f'Broadband cumulative noise level: {10 * np.log10(broadband_noise_level):.1f} dB')
# plt.axhline(y=10 * np.log10(noise_floor), color='blue', linewidth=2, linestyle='--', label=f'Broadband noise floor: {10 * np.log10(noise_floor):.1f} dB')
plt.plot(peak_frequencies, peak_powers, 'o', markerfacecolor='none', markeredgecolor='r', markersize=10) # Plot power against frequency as hollow circles
for freq, power in zip(peak_frequencies, peak_powers):
plt.text(freq, power, str(freq)+' Hz', verticalalignment='bottom', horizontalalignment='right') # Add text next to each circle indicating the frequency
plt.grid(True, which="both", linestyle='-', linewidth=0.5, color='gray') # Thin lines for non-decade grid
plt.grid(True, which="both", linestyle=':', linewidth=1, color='gray', axis='x') # Thick lines for decade grid
# Calculate the x-axis values for multiples of 10
x_multiples_of_10 = [10**i for i in range(int(np.log10(min(faxis[faxis > 0]))), int(np.log10(max(faxis[faxis > 0]))) + 1)]
# Add thick lines for multiples of 10
for val in x_multiples_of_10:
plt.axvline(x=val, color='black', linestyle='-', linewidth=2) # Thick lines for multiples of 10
f_sig_idx = np.argmax(Sxx)
# SNR_f = 10 * np.log10(Sxx[f_sig_idx] / np.sum(np.delete(Sxx, f_sig_idx)))
# SNR_f = 10 * np.log10(Sxx[f_sig_idx] / noise_floor)
plt.xlim([min(faxis), max(faxis)])
# plt.ylim([-100, 10])
plt.legend(loc = 3, fontsize=12)
plt.xlabel('Frequency [Hz]', fontsize=14)
plt.ylabel('Power Spectral Density [dB/Hz]', fontsize=14)
# plt.title('SNR= %.2f dB' % (SNR_f), fontsize=14)
# Adjust layout
plt.tight_layout()
# Show plot
plt.show()
else:
# Create subplots
plt.figure(figsize=(12, 8))
gs = gridspec.GridSpec(2, 3, width_ratios=[1, 1, 1], height_ratios=[1, 1])
# Plot 1: Time vs Voltage
axs1 = plt.subplot(gs[0, 0:])
axs1.plot(time_bkg, voltages_bkg, marker='o', color='orange', linewidth=0.5, ms=1, label = bkg_str)
axs1.plot(time, voltages, marker='o', color='green', linewidth=0.5, ms=1, label = data_str)
axs1.set_ylim([-0.5, 0.5])
axs1.set_xlabel('Time (s)', fontsize=14)
axs1.set_ylabel('Voltage (V)', fontsize=14)
axs1.legend(loc = 1, fontsize=12)
axs1.autoscale(tight=True)
axs1.grid(True)
# Plot 2: Power Spectrum in dB
axs2 = plt.subplot(gs[1, 0:])
axs2.semilogx(faxis, 10 * np.log10(Sxx_bkg), color='orange', linewidth=0.5, label = bkg_str)
axs2.semilogx(faxis, 10 * np.log10(Sxx), color='green', linewidth=2, label = data_str)
# axs2.axhline(y=10 * np.log10(broadband_noise_level), color='red', linewidth=2, linestyle='--', label=f'Broadband cumulative noise level: {10 * np.log10(broadband_noise_level):.1f} dB')
# axs2.axhline(y=10 * np.log10(noise_floor), color='blue', linewidth=2, linestyle='--', label=f'Broadband noise floor: {10 * np.log10(noise_floor):.1f} dB')
axs2.plot(peak_frequencies, peak_powers, 'o', markerfacecolor='none', markeredgecolor='r', markersize=10) # Plot power against frequency as hollow circles
for freq, power in zip(peak_frequencies, peak_powers):
axs2.text(freq, power, str(freq)+' Hz', verticalalignment='bottom', horizontalalignment='right') # Add text next to each circle indicating the frequency
axs2.grid(True, which="both", linestyle='-', linewidth=0.5, color='gray') # Thin lines for non-decade grid
axs2.grid(True, which="both", linestyle=':', linewidth=1, color='gray', axis='x') # Thick lines for decade grid
# Calculate the x-axis values for multiples of 10
x_multiples_of_10 = [10**i for i in range(int(np.log10(min(faxis[faxis > 0]))), int(np.log10(max(faxis[faxis > 0]))) + 1)]
# Add thick lines for multiples of 10
for val in x_multiples_of_10:
axs2.axvline(x=val, color='black', linestyle='-', linewidth=2) # Thick lines for multiples of 10
f_sig_idx = np.argmax(Sxx)
# SNR_f = 10 * np.log10(Sxx[f_sig_idx] / np.sum(np.delete(Sxx, f_sig_idx)))
# SNR_f = 10 * np.log10(Sxx[f_sig_idx] / noise_floor)
axs2.set_xlim([min(faxis), max(faxis)])
# axs2.set_ylim([-100, 10])
axs2.legend(loc = 3, fontsize=12)
axs2.set_xlabel('Frequency [Hz]', fontsize=14)
axs2.set_ylabel('Power Spectral Density [dB/Hz]', fontsize=14)
# axs2.set_title('SNR= %.2f dB' % (SNR_f), fontsize=14)
# Adjust layout
plt.tight_layout()
# Show plot
plt.show()