mirror of
https://github.com/jopohl/urh.git
synced 2026-03-03 06:54:00 +01:00
257 lines
8.1 KiB
Python
257 lines
8.1 KiB
Python
import unittest
|
|
import numpy as np
|
|
from tests.utils_testing import get_path_for_data_file
|
|
from urh.signalprocessing.Filter import Filter
|
|
from urh.signalprocessing.Modulator import Modulator
|
|
from urh.signalprocessing.Signal import Signal
|
|
import array
|
|
|
|
from matplotlib import pyplot as plt
|
|
from urh.cythonext import signal_functions
|
|
from urh.signalprocessing.Spectrogram import Spectrogram
|
|
|
|
|
|
class SpectrogramTest(unittest.TestCase):
|
|
"""short time fourier transform of audio signal"""
|
|
|
|
def stft(
|
|
self, samples, window_size, overlap_factor=0.5, window_function=np.hanning
|
|
):
|
|
"""
|
|
Perform Short-time Fourier transform to get the spectrogram for the given samples
|
|
|
|
:param samples: Complex samples
|
|
:param window_size: Size of DFT window
|
|
:param overlap_factor: Value between 0 (= No Overlapping) and 1 (= Full overlapping) of windows
|
|
:param window_function: Function for DFT window
|
|
:return: short-time Fourier transform of the given signal
|
|
"""
|
|
window = window_function(window_size)
|
|
|
|
# hop size determines by how many samples the window is advanced
|
|
hop_size = window_size - int(overlap_factor * window_size)
|
|
|
|
# pad with zeros to ensure last window fits signal
|
|
padded_samples = np.append(
|
|
samples, np.zeros((len(samples) - window_size) % hop_size)
|
|
)
|
|
num_frames = ((len(padded_samples) - window_size) // hop_size) + 1
|
|
frames = [
|
|
padded_samples[i * hop_size : i * hop_size + window_size] * window
|
|
for i in range(num_frames)
|
|
]
|
|
return np.fft.fft(frames)
|
|
|
|
def setUp(self):
|
|
self.signal = Signal(
|
|
get_path_for_data_file("two_participants.complex16s"), "test"
|
|
)
|
|
|
|
def test_numpy_impl(self):
|
|
sample_rate = 1e6
|
|
spectrogram = (
|
|
np.fft.fftshift(
|
|
self.stft(self.signal.iq_array.data, 2**10, overlap_factor=0.5)
|
|
)
|
|
/ 1024
|
|
)
|
|
|
|
ims = 10 * np.log10(
|
|
spectrogram.real**2 + spectrogram.imag**2
|
|
) # convert amplitudes to decibel
|
|
num_time_bins, num_freq_bins = np.shape(ims)
|
|
|
|
plt.imshow(np.transpose(ims), aspect="auto", cmap="magma")
|
|
plt.colorbar()
|
|
|
|
plt.xlabel("time in seconds")
|
|
plt.ylabel("frequency in Hz")
|
|
plt.ylim(ymin=0, ymax=num_freq_bins)
|
|
|
|
x_tick_pos = np.linspace(0, num_time_bins - 1, 5, dtype=np.float32)
|
|
plt.xticks(
|
|
x_tick_pos,
|
|
[
|
|
"%.02f" % l
|
|
for l in (x_tick_pos * len(self.signal.iq_array.data) / num_time_bins)
|
|
/ sample_rate
|
|
],
|
|
)
|
|
y_tick_pos = np.linspace(0, num_freq_bins - 1, 10, dtype=np.int16)
|
|
frequencies = np.fft.fftshift(np.fft.fftfreq(num_freq_bins, 1 / sample_rate))
|
|
plt.yticks(y_tick_pos, ["%.02f" % frequencies[i] for i in y_tick_pos])
|
|
|
|
plt.show()
|
|
|
|
def narrowband_iir(self, fc, bw, fs):
|
|
fc /= fs
|
|
bw /= fs
|
|
|
|
R = 1 - 3 * bw
|
|
K = (1 - 2 * R * np.cos(2 * np.pi * fc) + R**2) / (
|
|
2 - 2 * np.cos(2 * np.pi * fc)
|
|
)
|
|
|
|
a = np.array([K, -2 * K * np.cos(2 * np.pi * fc), K], dtype=np.float64)
|
|
b = np.array([2 * R * np.cos(2 * np.pi * fc), -(R**2)], dtype=np.float64)
|
|
|
|
return a, b
|
|
|
|
def test_bandpass(self):
|
|
# Generate a noisy signal
|
|
fs = 2000
|
|
T = 0.1
|
|
nsamples = T * fs
|
|
t = np.linspace(0, T, nsamples, endpoint=False)
|
|
a = 0.02
|
|
f0 = 600
|
|
x = 0.25 * np.sin(2 * np.pi * 0.25 * f0 * t)
|
|
x += 0.25 * np.sin(2 * np.pi * f0 * t)
|
|
x += 0.25 * np.sin(2 * np.pi * 2 * f0 * t)
|
|
x += 0.25 * np.sin(2 * np.pi * 3 * f0 * t)
|
|
|
|
import time
|
|
|
|
lowcut = f0 - 200
|
|
highcut = f0 + 200
|
|
|
|
# Define the parameters
|
|
fc = f0 / fs
|
|
b = 0.05
|
|
data = x
|
|
|
|
y = Filter.apply_bandpass_filter(data, lowcut / fs, highcut / fs, filter_bw=b)
|
|
|
|
plt.plot(y, label="Filtered signal (%g Hz)" % f0)
|
|
plt.plot(data, label="Noisy signal")
|
|
plt.legend(loc="upper left")
|
|
plt.show()
|
|
|
|
def test_iir_bandpass(self):
|
|
# Generate a noisy signal
|
|
fs = 2400
|
|
T = 6
|
|
nsamples = T * fs
|
|
t = np.linspace(0, T, nsamples, endpoint=False)
|
|
a = 0.02
|
|
f0 = 300
|
|
x = 0.5 * np.sin(2 * np.pi * f0 * t)
|
|
x += 0.25 * np.sin(2 * np.pi * 2 * f0 * t)
|
|
x += 0.25 * np.sin(2 * np.pi * 3 * f0 * t)
|
|
|
|
# data = x.astype(np.complex64)
|
|
data = np.sin(2 * np.pi * f0 * t).astype(np.complex64)
|
|
|
|
print("Len data", len(data))
|
|
a, b = self.narrowband_iir(f0, 100, fs)
|
|
s = a.sum() + b.sum()
|
|
# a /= s
|
|
# b /= s
|
|
print(a, b)
|
|
|
|
filtered_data = signal_functions.iir_filter(a, b, data)
|
|
|
|
# plt.plot(data, label='Noisy signal')
|
|
plt.plot(np.fft.fft(filtered_data), label="Filtered signal (%g Hz)" % f0)
|
|
|
|
plt.legend(loc="upper left")
|
|
plt.show()
|
|
|
|
def test_channels(self):
|
|
sample_rate = 10**6
|
|
|
|
channel1_freq = 40 * 10**3
|
|
channel2_freq = 240 * 10**3
|
|
|
|
channel1_data = array.array("B", [1, 0, 1, 0, 1, 0, 0, 1])
|
|
channel2_data = array.array("B", [1, 1, 0, 0, 1, 1, 0, 1])
|
|
channel3_data = array.array("B", [1, 0, 0, 1, 0, 1, 1, 1])
|
|
|
|
filter_bw = 0.1
|
|
|
|
filter_freq1_high = 1.5 * channel1_freq
|
|
filter_freq1_low = 0.5 * channel1_freq
|
|
filter_freq2_high = 1.5 * channel2_freq
|
|
filter_freq2_low = 0.5 * channel2_freq
|
|
|
|
modulator1, modulator2, modulator3 = (
|
|
Modulator("test"),
|
|
Modulator("test2"),
|
|
Modulator("test3"),
|
|
)
|
|
modulator1.carrier_freq_hz = channel1_freq
|
|
modulator2.carrier_freq_hz = channel2_freq
|
|
modulator3.carrier_freq_hz = -channel2_freq
|
|
modulator1.sample_rate = (
|
|
modulator2.sample_rate
|
|
) = modulator3.sample_rate = sample_rate
|
|
data1 = modulator1.modulate(channel1_data)
|
|
data2 = modulator2.modulate(channel2_data)
|
|
data3 = modulator3.modulate(channel3_data)
|
|
|
|
mixed_signal = data1 + data2 + data3
|
|
|
|
mixed_signal.tofile("/tmp/three_channels.complex")
|
|
|
|
plt.subplot("221")
|
|
plt.title("Signal")
|
|
plt.plot(mixed_signal)
|
|
|
|
spectrogram = Spectrogram(mixed_signal)
|
|
plt.subplot("222")
|
|
plt.title("Spectrogram")
|
|
plt.imshow(np.transpose(spectrogram.data), aspect="auto", cmap="magma")
|
|
plt.ylim(0, spectrogram.freq_bins)
|
|
|
|
chann1_filtered = Filter.apply_bandpass_filter(
|
|
mixed_signal,
|
|
filter_freq1_low / sample_rate,
|
|
filter_freq1_high / sample_rate,
|
|
filter_bw,
|
|
)
|
|
plt.subplot("223")
|
|
plt.title("Channel 1 Filtered ({})".format("".join(map(str, channel1_data))))
|
|
plt.plot(chann1_filtered)
|
|
|
|
chann2_filtered = Filter.apply_bandpass_filter(
|
|
mixed_signal,
|
|
filter_freq2_low / sample_rate,
|
|
filter_freq2_high / sample_rate,
|
|
filter_bw,
|
|
)
|
|
plt.subplot("224")
|
|
plt.title("Channel 2 Filtered ({})".format("".join(map(str, channel2_data))))
|
|
plt.plot(chann2_filtered)
|
|
|
|
plt.show()
|
|
|
|
def test_bandpass_h(self):
|
|
f_low = -0.4
|
|
f_high = -0.3
|
|
bw = 0.01
|
|
|
|
f_shift = (f_low + f_high) / 2
|
|
f_c = (f_high - f_low) / 2
|
|
|
|
N = Filter.get_filter_length_from_bandwidth(bw)
|
|
|
|
h = Filter.design_windowed_sinc_lpf(f_c, bw=bw) * np.exp(
|
|
np.complex(0, 1) * np.pi * 2 * f_shift * np.arange(0, N, dtype=complex)
|
|
)
|
|
|
|
# h = Filter.design_windowed_sinc_bandpass(f_low=f_low, f_high=f_high, bw=bw)
|
|
# h = Filter.design_windowed_sinc_lpf(0.42, bw=0.08)
|
|
|
|
impulse = np.exp(1j * np.linspace(0, 1, 50))
|
|
|
|
plt.subplot("221")
|
|
plt.title("f_low={} f_high={} bw={}".format(f_low, f_high, bw))
|
|
plt.plot(np.fft.fftfreq(1024), np.fft.fft(h, 1024))
|
|
|
|
plt.subplot("222")
|
|
plt.plot(h)
|
|
|
|
plt.show()
|
|
|
|
# h = cls.design_windowed_sinc_bandpass(f_low, f_high, filter_bw)
|