334 lines
17 KiB
Python
334 lines
17 KiB
Python
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.gridspec as gridspec
|
|
from scipy.signal import find_peaks, resample, detrend
|
|
import csv
|
|
|
|
"""
|
|
NOTES
|
|
|
|
When you compute the Fourier transform of a signal, you obtain a set of complex-valued coefficients corresponding to different frequency components present in the signal. These coefficients represent the amplitude and phase of sinusoidal components at specific frequencies.
|
|
|
|
The frequency range covered by the Fourier transform output is divided into discrete frequency bins, each representing a specific frequency component. The width of these bins depends on the sampling rate and the length of the input signal. In a typical implementation, the frequency bins are evenly spaced.
|
|
|
|
In practical terms, the power spectrum bins correspond to the frequency components at which the power spectral density (PSD) or magnitude squared of the Fourier coefficients are evaluated. These bins are used to represent the distribution of signal power across different frequency components, providing insights into the frequency content of the signal.
|
|
|
|
The bin width, also called the resolution bandwidth, is simply sampling rate / Total number of samples = (1/dt)/N
|
|
|
|
N can be re-written in terms of t, dt. Putting this in to the expression for Sxx = 2 * |fft|^2/(Resolution Bandwidth * Noise Power Bandwidth) We get the expression used here. Note that the fft computed above is scaled by N which results eventually in the factor dt^2/T.
|
|
|
|
The noise power bandwidth is typically 1 Hz if no windowing/tapering function is used.
|
|
|
|
|
|
Compute the broadband noise level in Vrms2/Hz by summing all the power spectrum bins, excluding any peaks and the DC component, and dividing the sum by the equivalent noise bandwidth of the window
|
|
|
|
The equivalent noise bandwidth (ENBW) of a window in the context of a power spectrum refers to a measure of the effective bandwidth of the window function applied to the signal before taking the Fourier transform.
|
|
|
|
When you compute the power spectrum of a signal using a windowing function (e.g., Hamming window, Hann window, etc.), the window modifies the original signal by tapering its edges. This tapering reduces the spectral leakage and improves frequency resolution but also introduces a smoothing effect, which can affect the estimation of the signal's power at different frequencies.
|
|
|
|
The equivalent noise bandwidth provides a way to quantify the effective bandwidth of the window function in terms of its impact on noise power. It represents the width of a rectangular filter that would have the same noise power as the windowed signal.
|
|
|
|
In practical terms, when calculating the power spectrum of a signal using a window, the ENBW is used to adjust the power spectrum to account for the smoothing effect of the window. Dividing the sum of the power spectrum bins by the ENBW yields an estimate of the noise power per unit frequency bandwidth.
|
|
|
|
ENBW is often used in the context of noise measurements or signal processing applications where accurate estimation of noise power is important. It helps ensure that the power spectrum accurately reflects the true power distribution of the signal, accounting for the effects of windowing.
|
|
|
|
|
|
The noise floor refers to the minimum level of signal that can be reliably distinguished from the background noise. It represents the lowest amplitude of a signal that can be detected or measured with reasonable accuracy.
|
|
|
|
The noise floor is often defined as the RMS (Root Mean Square) value of the background noise in a given frequency band.
|
|
|
|
To compute the noise floor value from the power spectral density (PSD) values, you typically need to analyze the portion of the PSD that corresponds to the background noise.
|
|
|
|
"""
|
|
|
|
def compute_autocorrelation(data):
|
|
|
|
print("Calculating autocorrelation...")
|
|
yunbiased = data-np.mean(data)
|
|
ynorm = np.sum(yunbiased**2)
|
|
acor = np.correlate(yunbiased, yunbiased, "same")/ynorm
|
|
return acor
|
|
|
|
def pre_process_data(data, new_sampling_rate):
|
|
# Resample the time series
|
|
# Define the new sampling rate and calculate the new time values
|
|
n_points = int((len(data[:, 0]) * (data[:, 0][1] - data[:, 0][0])) * new_sampling_rate)
|
|
t = np.linspace(data[:, 0][0], data[:, 0][-1], n_points)
|
|
x_array = resample(data[:, 1], n_points)
|
|
x = detrend(x_array - x_array.mean())
|
|
|
|
return np.column_stack((t, x))
|
|
|
|
def smoothen_data(data, window_size):
|
|
# Smooth data by doing a moving average
|
|
return np.convolve(data, np.ones(window_size, dtype=int)/window_size, mode='valid')
|
|
|
|
def compute_psd(data, new_sampling_rate, window_size = 21):
|
|
"""
|
|
A power spectral density (PSD) takes the amplitude of the FFT, multiplies it by its complex conjugate and normalizes it to the frequency bin width.
|
|
|
|
"""
|
|
|
|
processed_data = pre_process_data(data, new_sampling_rate)
|
|
t, x = processed_data[:, 0], processed_data[:, 1]
|
|
|
|
dt = t[1] - t[0] # Define the sampling interval
|
|
N = x.shape[0] # Define the total number of data points
|
|
T = N * dt # Define the total duration of the data
|
|
|
|
# Calculate fft
|
|
print("Calculating power spectrum...")
|
|
fft_ts = np.fft.fft(x) # Compute Fourier transform of x
|
|
Sxx = 2 * (dt ** 2 / T) * (fft_ts * fft_ts.conj()) # Compute spectrum
|
|
Sxx = Sxx[:int(len(x) / 2)] # Ignore negative frequencies, we have accounted for this by the scaling factor of 2 in the previous step
|
|
|
|
return processed_data, smoothen_data(Sxx.real, window_size)
|
|
|
|
def compute_RIN(time, voltages, Sxx_smooth):
|
|
|
|
dt = time[1] - time[0] # Define the sampling interval
|
|
N = voltages.shape[0] # Define the total number of data points
|
|
T = N * dt # Define the total duration of the data
|
|
df = 1 / T.max()
|
|
|
|
# Compute the average power
|
|
average_P = np.mean(np.squared(voltages))
|
|
|
|
# Calculate the RIN
|
|
RIN_Sxx_smooth = 10 * np.log10(Sxx_smooth / (average_P * df))
|
|
|
|
return RIN_Sxx_smooth
|
|
|
|
def find_noise_peaks(psd, faxis, freq_range, threshold):
|
|
"""
|
|
Compute the peak power in the specified frequency range.
|
|
|
|
Parameters:
|
|
psd_values: array-like
|
|
Power spectral density values.
|
|
faxis: array-like
|
|
Frequencies corresponding to the PSD values.
|
|
freq_range: tuple
|
|
Tuple containing the start and end frequencies of the range of interest.
|
|
threshold: scalar
|
|
Threshold for peak heights
|
|
|
|
Returns:
|
|
float: Peak power in the specified frequency range.
|
|
"""
|
|
start_freq, end_freq = freq_range
|
|
idx_start = np.argmax(faxis >= start_freq)
|
|
idx_end = np.argmax(faxis >= end_freq)
|
|
sliced_psd = psd[idx_start:idx_end]
|
|
sliced_faxis = faxis[idx_start:idx_end]
|
|
|
|
peak_indices, _ = find_peaks(sliced_psd, height=threshold)
|
|
peak_powers = 10 * np.log10(sliced_psd[peak_indices])
|
|
peak_frequencies = np.around(sliced_faxis[peak_indices], 2)
|
|
|
|
return peak_powers, peak_frequencies
|
|
|
|
def compute_noise_level(psd, resolution_bandwidth, exclude_peaks=False, faxis=None, freq_range=None, threshold=None):
|
|
"""
|
|
Compute the noise level from a power spectral density (PSD).
|
|
|
|
Parameters:
|
|
psd: array-like
|
|
One-sided power spectral density.
|
|
resolution_bandwidth: float
|
|
Bin width
|
|
Returns:
|
|
float: Noise level (Vrms^2).
|
|
"""
|
|
|
|
noise_level = None
|
|
# Exclude peaks from the sum
|
|
if exclude_peaks and threshold is not None:
|
|
|
|
threshold = 10**(threshold/10)
|
|
|
|
if freq_range is None:
|
|
peak_indices, _ = find_peaks(psd, height=threshold)
|
|
noise_level = resolution_bandwidth * np.sum([psd[i] for i in range(len(psd)) if i not in peak_indices])
|
|
else:
|
|
start_freq, end_freq = freq_range
|
|
idx_start = np.argmax(faxis >= start_freq)
|
|
idx_end = np.argmax(faxis >= end_freq)
|
|
sliced_psd = psd[idx_start:idx_end]
|
|
peak_indices, _ = find_peaks(sliced_psd, height=threshold)
|
|
noise_level = resolution_bandwidth * np.sum([sliced_psd[i] for i in range(len(sliced_psd)) if i not in peak_indices])
|
|
else:
|
|
|
|
if freq_range is None:
|
|
noise_level = resolution_bandwidth * np.sum([psd[i] for i in range(len(psd))])
|
|
else:
|
|
start_freq, end_freq = freq_range
|
|
idx_start = np.argmax(faxis >= start_freq)
|
|
idx_end = np.argmax(faxis >= end_freq)
|
|
sliced_psd = psd[idx_start:idx_end]
|
|
noise_level = resolution_bandwidth * np.sum([sliced_psd[i] for i in range(len(sliced_psd))])
|
|
|
|
return noise_level
|
|
|
|
def extract_data(filepath):
|
|
|
|
# Open the CSV file
|
|
with open(filepath, newline='') as csvfile:
|
|
# Skip the first line (header)
|
|
next(csvfile)
|
|
|
|
# Read the CSV file using csv.reader
|
|
reader = csv.reader(csvfile)
|
|
|
|
# Read the headers from the second line
|
|
next_header = next(reader)
|
|
string_number = next_header[-1]
|
|
|
|
try:
|
|
time_step = int(string_number)
|
|
except ValueError:
|
|
try:
|
|
time_step = float(string_number)
|
|
except ValueError:
|
|
print("The string does not represent a valid number.")
|
|
|
|
# Initialize lists to store the first and second values
|
|
first_column = []
|
|
second_column = []
|
|
|
|
# Iterate over each row in the CSV file
|
|
for row in reader:
|
|
# Extract the first and second values from the row and convert to float
|
|
first_value = float(row[0])
|
|
second_value = float(row[1])
|
|
|
|
# Append the values to their respective lists
|
|
first_column.append(first_value)
|
|
second_column.append(second_value)
|
|
|
|
# Convert the lists into numpy arrays
|
|
time_array = np.arange(0, len(first_column)*time_step, time_step)
|
|
voltage_array = np.array(second_column)
|
|
|
|
# Stack the arrays horizontally to form a single 2D array
|
|
data_array = np.column_stack((time_array, voltage_array))
|
|
|
|
return data_array
|
|
|
|
def plot_analysis(data, data_bkg, Sxx, Sxx_bkg, data_str, bkg_str, peak_find_threshold, window_size = 21, plot_only_psd = True):
|
|
|
|
|
|
time, voltages = data[:, 0], data[:, 1]
|
|
time_bkg, voltages_bkg = data_bkg[:, 0], data_bkg[:, 1]
|
|
|
|
dt = time[1] - time[0] # Define the sampling interval
|
|
N = voltages.shape[0] # Define the total number of data points
|
|
T = N * dt # Define the total duration of the data
|
|
df = 1 / T.max()
|
|
|
|
fNQ = 1 / dt / 2 # Determine Nyquist frequency
|
|
faxis = smoothen_data(np.linspace(0,fNQ,N//2), window_size) # Construct frequency axis
|
|
|
|
""" Noise levels in units of Vrms^2/Hz"""
|
|
# resolution_bandwidth = (1/dt)/N
|
|
# broadband_noise_level = compute_noise_level(Sxx, resolution_bandwidth) # Integrates across PSD from DC to Nyquist frequency, gives result in in units of Vrms^2/Hz
|
|
# noise_floor = np.mean(Sxx_bkg)
|
|
|
|
freq_range = (50, max(faxis))
|
|
threshold = 10**(peak_find_threshold/10)
|
|
peak_powers, peak_frequencies = find_noise_peaks(Sxx, faxis, freq_range, threshold)
|
|
|
|
if plot_only_psd:
|
|
|
|
plt.figure(figsize=(12, 8))
|
|
|
|
# Plot Power Spectrum in dB
|
|
plt.semilogx(faxis, 10 * np.log10(Sxx_bkg), color='orange', linewidth=0.5, label = bkg_str)
|
|
plt.semilogx(faxis, 10 * np.log10(Sxx), color='green', linewidth=2, label = data_str)
|
|
|
|
# plt.axhline(y=10 * np.log10(broadband_noise_level), color='red', linewidth=2, linestyle='--', label=f'Broadband cumulative noise level: {10 * np.log10(broadband_noise_level):.1f} dB')
|
|
# plt.axhline(y=10 * np.log10(noise_floor), color='blue', linewidth=2, linestyle='--', label=f'Broadband noise floor: {10 * np.log10(noise_floor):.1f} dB')
|
|
|
|
plt.plot(peak_frequencies, peak_powers, 'o', markerfacecolor='none', markeredgecolor='r', markersize=10) # Plot power against frequency as hollow circles
|
|
for freq, power in zip(peak_frequencies, peak_powers):
|
|
plt.text(freq, power, str(freq)+' Hz', verticalalignment='bottom', horizontalalignment='right') # Add text next to each circle indicating the frequency
|
|
|
|
plt.grid(True, which="both", linestyle='-', linewidth=0.5, color='gray') # Thin lines for non-decade grid
|
|
plt.grid(True, which="both", linestyle=':', linewidth=1, color='gray', axis='x') # Thick lines for decade grid
|
|
|
|
# Calculate the x-axis values for multiples of 10
|
|
x_multiples_of_10 = [10**i for i in range(int(np.log10(min(faxis[faxis > 0]))), int(np.log10(max(faxis[faxis > 0]))) + 1)]
|
|
# Add thick lines for multiples of 10
|
|
for val in x_multiples_of_10:
|
|
plt.axvline(x=val, color='black', linestyle='-', linewidth=2) # Thick lines for multiples of 10
|
|
|
|
f_sig_idx = np.argmax(Sxx)
|
|
# SNR_f = 10 * np.log10(Sxx[f_sig_idx] / np.sum(np.delete(Sxx, f_sig_idx)))
|
|
# SNR_f = 10 * np.log10(Sxx[f_sig_idx] / noise_floor)
|
|
|
|
plt.xlim([min(faxis), max(faxis)])
|
|
# plt.ylim([-100, 10])
|
|
plt.legend(loc = 3, fontsize=12)
|
|
plt.xlabel('Frequency [Hz]', fontsize=14)
|
|
plt.ylabel('Power Spectral Density [dBV/Hz]', fontsize=14)
|
|
# plt.title('SNR= %.2f dB' % (SNR_f), fontsize=14)
|
|
|
|
# Adjust layout
|
|
plt.tight_layout()
|
|
|
|
# Show plot
|
|
plt.show()
|
|
|
|
else:
|
|
# Create subplots
|
|
plt.figure(figsize=(12, 8))
|
|
gs = gridspec.GridSpec(2, 3, width_ratios=[1, 1, 1], height_ratios=[1, 1])
|
|
|
|
# Plot 1: Time vs Voltage
|
|
axs1 = plt.subplot(gs[0, 0:])
|
|
axs1.plot(time_bkg, voltages_bkg, marker='o', color='orange', linewidth=0.5, ms=1, label = bkg_str)
|
|
axs1.plot(time, voltages, marker='o', color='green', linewidth=0.5, ms=1, label = data_str)
|
|
axs1.set_ylim([-0.5, 0.5])
|
|
axs1.set_xlabel('Time (s)', fontsize=14)
|
|
axs1.set_ylabel('Voltage (V)', fontsize=14)
|
|
axs1.legend(loc = 1, fontsize=12)
|
|
axs1.autoscale(tight=True)
|
|
axs1.grid(True)
|
|
|
|
# Plot 2: Power Spectrum in dB
|
|
axs2 = plt.subplot(gs[1, 0:])
|
|
axs2.semilogx(faxis, 10 * np.log10(Sxx_bkg), color='orange', linewidth=0.5, label = bkg_str)
|
|
axs2.semilogx(faxis, 10 * np.log10(Sxx), color='green', linewidth=2, label = data_str)
|
|
|
|
# axs2.axhline(y=10 * np.log10(broadband_noise_level), color='red', linewidth=2, linestyle='--', label=f'Broadband cumulative noise level: {10 * np.log10(broadband_noise_level):.1f} dB')
|
|
# axs2.axhline(y=10 * np.log10(noise_floor), color='blue', linewidth=2, linestyle='--', label=f'Broadband noise floor: {10 * np.log10(noise_floor):.1f} dB')
|
|
|
|
axs2.plot(peak_frequencies, peak_powers, 'o', markerfacecolor='none', markeredgecolor='r', markersize=10) # Plot power against frequency as hollow circles
|
|
for freq, power in zip(peak_frequencies, peak_powers):
|
|
axs2.text(freq, power, str(freq)+' Hz', verticalalignment='bottom', horizontalalignment='right') # Add text next to each circle indicating the frequency
|
|
|
|
axs2.grid(True, which="both", linestyle='-', linewidth=0.5, color='gray') # Thin lines for non-decade grid
|
|
axs2.grid(True, which="both", linestyle=':', linewidth=1, color='gray', axis='x') # Thick lines for decade grid
|
|
# Calculate the x-axis values for multiples of 10
|
|
x_multiples_of_10 = [10**i for i in range(int(np.log10(min(faxis[faxis > 0]))), int(np.log10(max(faxis[faxis > 0]))) + 1)]
|
|
# Add thick lines for multiples of 10
|
|
for val in x_multiples_of_10:
|
|
axs2.axvline(x=val, color='black', linestyle='-', linewidth=2) # Thick lines for multiples of 10
|
|
|
|
f_sig_idx = np.argmax(Sxx)
|
|
# SNR_f = 10 * np.log10(Sxx[f_sig_idx] / np.sum(np.delete(Sxx, f_sig_idx)))
|
|
# SNR_f = 10 * np.log10(Sxx[f_sig_idx] / noise_floor)
|
|
|
|
axs2.set_xlim([min(faxis), max(faxis)])
|
|
# axs2.set_ylim([-100, 10])
|
|
axs2.legend(loc = 3, fontsize=12)
|
|
axs2.set_xlabel('Frequency [Hz]', fontsize=14)
|
|
axs2.set_ylabel('Power Spectral Density [dBV/Hz]', fontsize=14)
|
|
# axs2.set_title('SNR= %.2f dB' % (SNR_f), fontsize=14)
|
|
|
|
# Adjust layout
|
|
plt.tight_layout()
|
|
|
|
# Show plot
|
|
plt.show()
|