In [ ]:
%pylab inline
import sk_dsp_comm.sigsys as ss
import sk_dsp_comm.pyaudio_helper as pah
import sk_dsp_comm.fir_design_helper as fir_d
import scipy.signal as signal
import scipy.io as io
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from IPython.display import Audio, display
from IPython.display import Image, SVG
In [ ]:
pylab.rcParams['savefig.dpi'] = 100 # default 72
%config InlineBackend.figure_formats=['svg'] # SVG inline viewing
In [ ]:
Image("audio_files/pyaudio_dsp_IO.png", width="90%")
If you add or delete devices by plugging or unplugging USB audio ibterface, this list becomdes invalid. Restart the kernel and run again to get the correct device index list. For two channel apps both the input and output devices must support two channels. For the Sabrent USB audio devices, which has one input and two outputs, Windows for example may improperly list the devices as having two inputs.
pah.available_devices()
Index 0 device name = Built-in Microph, inputs = 2, outputs = 0
Index 1 device name = Built-in Output, inputs = 0, outputs = 2
Here we set up a simple callback
function that passes the input samples directly to the output. The module pyaudio_support
provides a class for managing a pyaudio
stream object, capturing the samples processed by the callback
function, and collection of performance metrics. Once the callback
function is written/declared a DSP_io_stream
object can be created and then the stream(Tsec)
method can be executed to start the input/output processing, e.g.,
import pyaudio_helper as pah
DSP_IO = pah.DSP_io_stream(callback,in_idx, out_idx)
DSP_IO.interactive_stream(Tsec = 2, numChan = 1)
where in_idx
is the index of the chosen input device found using available_devices()
and similarly out_idx
is the index of the chosen output device.
callback
function must be written first as the function name used by the object to call the callback.
In [ ]:
# define a pass through, y = x, callback
def callback(in_data, frame_count, time_info, status):
# convert byte data to ndarray
in_data_nda = np.frombuffer(in_data, dtype=np.int16)
#***********************************************
# DSP operations here
# Here we simply pass the input to the output, i.e.
# y[n] = x[n]
x = in_data_nda.astype(float32)
y = x
# Typically more DSP code here
#
#***********************************************
# Convert from float back to int16
y = y.astype(int16)
# Convert ndarray back to bytes
return y.tobytes(), pah.pyaudio.paContinue
This callback makes use of the instrumentation capabilities of the DSP_io_stream
and also has a simple lowpass filter waiting in-the-wings if a line of code in commented out and a following line is uncomments, e.g.,
#y = x
# Typically more DSP code here
y, zi = signal.lfilter(b,a,x,zi=zi) # for FIR or simple IIR
Notice that globals
are now used for the DSP_IO
object, the filter coefficients in arrays, a
and b
, and also the filter states held in the array zi
. In its present form te filtering is commented out, but can be uncommented to allow a simple 1st-order IIR lowpass filter to operate on one channel of audio streaming through the system.
In [ ]:
# Add a simple IIR LPF
fs = 48000 # Assummed sampling rate
f3 = 1000 # Hz
a = [1, -exp(-2*pi*f3/fs)]
b = [1 - exp(-2*pi*f3/fs)]
zi = signal.lfiltic(b,a,[0])
In [ ]:
# define a pass through, y = x, callback
def callback(in_data, frame_length, time_info, status):
global DSP_IO, b, a, zi
DSP_IO.DSP_callback_tic()
# convert byte data to ndarray
in_data_nda = np.frombuffer(in_data, dtype=np.int16)
#***********************************************
# DSP operations here
# Here we apply a linear filter to the input
x = in_data_nda.astype(float32)
y = x
# Typically more DSP code here
#y, zi = signal.lfilter(b,a,x,zi=zi) # for FIR or simple IIR
#***********************************************
# Save data for later analysis
# accumulate a new frame of samples
DSP_IO.DSP_capture_add_samples(y)
#***********************************************
# Convert from float back to int16
y = y.astype(int16)
DSP_IO.DSP_callback_toc()
# Convert ndarray back to bytes
#return (in_data_nda.tobytes(), pyaudio.paContinue)
return y.tobytes(), pah.pyaudio.paContinue
DSP_IO = pah.DSP_io_stream(callback,in_idx=0,out_idx=1,fs=48000,Tcapture=0)
Index 0 device name = Built-in Microph, inputs = 2, outputs = 0
Index 1 device name = Built-in Output, inputs = 0, outputs = 2
DSP_IO.interactive_stream(Tsec=0,numChan=1)
In [ ]:
Image("audio_files/start_stop_stream.png", width='55%')
With the iMic plugged in the input/output device indices can be reconfigured to use the iMic index for both the input output streams. The Analog Discovery (AD2) is then used to drive a white noise test signal into the ADC and capture the output from the DAC. This allows us to measure the ADC-DAC frequency response using a long-term time average spectral estimate capabilities of the AD2. A second test capture is to use DSP_IO.DSP_capture_add_samples(y)
to capture the response of the ADC alone, and perform spectral analysis here in the Jupyter notebook. For this capture we set Tcapture=20
s two cells above and Tsec=20
one cell above. A comparison of the ADC-alone and ADC-DAC spectrum normalized to look like the frequency response is done in the cell below.
In [ ]:
f_AD,Mag_AD = loadtxt('audio_files/Loop_through_noise_SA_iMic.csv',
delimiter=',',skiprows=6,unpack=True)
plot(f_AD,Mag_AD-Mag_AD[100])
ylim([-10,5])
xlim([0,20e3])
ylabel(r'ADC Gain Flatness (dB)')
xlabel(r'Frequency (Hz)')
legend((r'ADC-DAC from AD2 SA dB Avg',))
title(r'Loop Through Gain Flatness using iMic at $f_s = 48$ kHz')
grid();
The callback stats when capturing data using DSP_IO.DSP_capture_add_samples(y)
and a plot of the time domain samples.
Nstop = 1000
plot(arange(0,len(DSP_IO.data_capture[:Nstop]))/48000,DSP_IO.data_capture[:Nstop])
DSP_IO.stream_stats()
Note for a attributes used in the above examples the frame_length
is always 1024 samples and the sampling rate $f_s = 48$ ksps. The ideal callback period is this
$$
T_{cb} = \frac{1024}{480100} = 21.33\ \text{(ms)}
$$
Next consider what the captures tic
and toc
data revels about the processing. Calling the method cb_active_plot()
produces a plot similar to what an electrical engineer would see what using a logic analyzer to show the time spent in an interrupt service routine of an embedded system. The latency is also evident. You expect to see a minimum latency of two frame lengths (input buffer fill and output buffer fill),e.g.,
The host processor is multitasking, so the latency can be even greater. A true real-time DSP system would give the signal processing high priority and hence much lower is expected, particularly if the frame_length
can be made small.
Here we set up a callback
function that filters the input samples and then sends them to the output.
import pyaudio_helper as pah
DSP_IO = pah.DSP_io_stream(callback,in_idx, out_idx)
DSP_IO.interactive_stream(2,1)
where in_idx
is the index of the chosen input device found using available_devices()
and similarly out_idx
is the index of the chosen output device.
callback
function must be written first as the function name is used by the object to call the callback
In [ ]:
b = fir_d.fir_remez_bpf(2700,3200,4800,5300,.5,50,48000,18)
a = [1]
fir_d.freqz_resp_list([b],[1],'dB',48000)
ylim([-60,5])
grid();
zi = signal.lfiltic(b,a,[0])
In [ ]:
f_AD,Mag_AD = loadtxt('audio_files/FIR_BPF_2700_3200_4800_5300_p5dB_50dB_48k.csv',
delimiter=',',skiprows=6,unpack=True)
plot(f_AD,Mag_AD-max(Mag_AD)+1)
f = arange(0,20000,10)
w,H_BPF = signal.freqz(b,1,2*pi*f/48000)
plot(f,20*log10(abs(H_BPF)))
ylabel(r'Gain (dB)')
xlabel(r'Frequency (Hz)')
legend((r'AD2 Noise Measured',r'Design Theory'))
title(r'4 kHz 182-Tap FIR Bandpass Design at $f_s = 48$ kHz')
ylim([-60,5])
xlim([2000,8000])
grid();
In [ ]:
# Design an IIR Notch
b, a = ss.fir_iir_notch(2000,48000,r= 0.9)
fir_d.freqz_resp_list([b],[a],'dB',48000,4096)
ylim([-60,5])
grid();
zi = signal.lfiltic(b,a,[0])
Create some global variables for the filter coefficients and the filter state array (recall that a filter has memory).
In [ ]:
# define callback (#2)
def callback2(in_data, frame_count, time_info, status):
global DSP_IO, b, a, zi
DSP_IO.DSP_callback_tic()
# convert byte data to ndarray
in_data_nda = np.frombuffer(in_data, dtype=np.int16)
#***********************************************
# DSP operations here
# Here we apply a linear filter to the input
x = 5*in_data_nda.astype(float32)
#y = x
# The filter state/(memory), zi, must be maintained from frame-to-frame
# for FIR or simple IIR
y, zi = signal.lfilter(b,a,x,zi=zi)
# for IIR use second-order sections
#y, zi = signal.sosfilt(sos,x,zi=zi)
#***********************************************
# Save data for later analysis
# accumulate a new frame of samples
DSP_IO.DSP_capture_add_samples(y)
#***********************************************
# Convert from float back to int16
y = y.astype(int16)
DSP_IO.DSP_callback_toc()
return y.tobytes(), pah.pyaudio.paContinue
DSP_IO = pah.DSP_io_stream(callback2,2,2,fs=48000,Tcapture=0)
DSP_IO.interactive_stream(Tsec=0,numChan=1)
In [ ]:
Image("audio_files/start_stop_stream.png", width='55%')
In [ ]:
# define callback (3)
# Here we configure the callback to play back a wav file
def callback3(in_data, frame_count, time_info, status):
global DSP_IO, x
DSP_IO.DSP_callback_tic()
# Ignore in_data when generating output only
#***********************************************
global x
# Note wav is scaled to [-1,1] so need to rescale to int16
y = 32767*x.get_samples(frame_count)
# Perform real-time DSP here if desired
#
#***********************************************
# Save data for later analysis
# accumulate a new frame of samples
DSP_IO.DSP_capture_add_samples(y)
#***********************************************
# Convert from float back to int16
y = y.astype(int16)
DSP_IO.DSP_callback_toc()
return y.tobytes(), pah.pyaudio.paContinue
fs, x_wav2 = ss.from_wav('Music_Test.wav')
x_wav = (x_wav2[:,0] + x_wav2[:,1])/2
x = pah.loop_audio(x_wav)
DSP_IO = pah.DSP_io_stream(callback3,0,1,fs=44100,Tcapture=2)
DSP_IO.interactive_stream(20) # play for 20s but capture only the last 2s
In [ ]:
Image("audio_files/start_stop_stream.png", width='55%')
Npts = 96000
Nstart = 0
plot(arange(len(DSP_IO.data_capture[Nstart:Nstart+Npts]))*1000/44100,
DSP_IO.data_capture[Nstart:Nstart+Npts]/2**(16-1))
title(r'A Portion of the capture buffer')
ylabel(r'Normalized Amplitude')
xlabel(r'Time (ms)')
grid();
In [ ]:
Image("audio_files/music_buffer_plot.png", width="75%")
Finally, the spectrum of the output signal. To apply custom scaling we use a variation of psd()
found in the sigsys
module. If we are plotting the spectrum of white noise sent through a filter, the output PSD will be of the form $\sigma_w^2|H(e^{j2\pi f/f_s})|^2$, where $\sigma_w^2$ is the variance of the noise driving the filter. You may choose to overlay a plot of
In [ ]:
L_gain = widgets.FloatSlider(description = 'L Gain',
continuous_update = True,
value = 1.0,
min = 0.0,
max = 2.0,
step = 0.01,
orientation = 'vertical')
R_gain = widgets.FloatSlider(description = 'R Gain',
continuous_update = True,
value = 1.0,
min = 0.0,
max = 2.0,
step = 0.01,
orientation = 'vertical')
#widgets.HBox([L_gain, R_gain])
In [ ]:
# L and Right Gain Sliders
def callback(in_data, frame_count, time_info, status):
global DSP_IO, L_gain, R_gain
DSP_IO.DSP_callback_tic()
# convert byte data to ndarray
in_data_nda = np.frombuffer(in_data, dtype=np.int16)
# separate left and right data
x_left,x_right = DSP_IO.get_LR(in_data_nda.astype(float32))
#***********************************************
# DSP operations here
y_left = x_left*L_gain.value
y_right = x_right*R_gain.value
#***********************************************
# Pack left and right data together
y = DSP_IO.pack_LR(y_left,y_right)
# Typically more DSP code here
#***********************************************
# Save data for later analysis
# accumulate a new frame of samples
DSP_IO.DSP_capture_add_samples_stereo(y_left,y_right)
#***********************************************
# Convert from float back to int16
y = y.astype(int16)
DSP_IO.DSP_callback_toc()
# Convert ndarray back to bytes
#return (in_data_nda.tobytes(), pyaudio.paContinue)
return y.tobytes(), pah.pyaudio.paContinue
DSP_IO = pah.DSP_io_stream(callback,0,1,fs=48000,Tcapture=0)
DSP_IO.interactive_stream(0,2)
widgets.HBox([L_gain, R_gain])
In [ ]:
Image("audio_files/left_right_gain.png", width="65%")
In [ ]:
panning = widgets.FloatSlider(description = 'Panning (%)',
continuous_update = True, # Continuous updates
value = 50.0,
min = 0.0,
max = 100.0,
step = 0.1,
orientation = 'horizontal')
#display(panning)
In [ ]:
# Panning
def callback(in_data, frame_count, time_info, status):
global DSP_IO, panning
DSP_IO.DSP_callback_tic()
# convert byte data to ndarray
in_data_nda = np.frombuffer(in_data, dtype=np.int16)
# separate left and right data
x_left,x_right = DSP_IO.get_LR(in_data_nda.astype(float32))
#***********************************************
# DSP operations here
y_left = (100-panning.value)/100*x_left \
+ panning.value/100*x_right
y_right = panning.value/100*x_left \
+ (100-panning.value)/100*x_right
#***********************************************
# Pack left and right data together
y = DSP_IO.pack_LR(y_left,y_right)
# Typically more DSP code here
#***********************************************
# Save data for later analysis
# accumulate a new frame of samples
DSP_IO.DSP_capture_add_samples_stereo(y_left,y_right)
#***********************************************
# Convert from float back to int16
y = y.astype(int16)
DSP_IO.DSP_callback_toc()
# Convert ndarray back to bytes
#return (in_data_nda.tobytes(), pyaudio.paContinue)
return y.tobytes(), pah.pyaudio.paContinue
FRAMES = 512
# Create streaming object
DSP_IO = pah.DSP_io_stream(callback,0,1,
fs=48000,
frame_length = FRAMES,
Tcapture=0)
# interactive_stream runs in a thread
#so widget can be used
DSP_IO.interactive_stream(0,2)
# display panning widget
display(panning)
In [ ]:
Image("audio_files/cross_panning.png", width='55%')
Here we consider a three-band equalizer operating on a music loop. Each peaking filter has system function in the $z$-domain defined by $$ H_{pk}(z) = C_\text{pk}\frac{1 + b_1 z^{-1} + b_2 z^{-2}}{1 + a_1 z^{-1} + a_2 z^{-2}} $$
where the filter coefficients are given by $$\begin{align} C_\text{pk} &= \frac{1+k_q\mu}{1+k_q}\\ k_q &= \frac{4}{1+\mu} \tan\left(\frac{2\pi f_c/f_s}{2Q}\right) \\ b_1 &= \frac{-2\cos(2\pi f_c/f_s)}{1+k_q\mu} \\ b_2 &= \frac{1-k_q\mu}{1+k_q\mu} \\ a_1 &= \frac{-2\cos(2\pi f_c/f_s)}{1+k_q} \\ a_2 &= \frac{1 - k_q}{1+k_q} \end{align}$$
where $$ \mu = 10^{G_\text{dB}/20},\ \ Q \in [2, 10] $$
and and $f_c$ is the center frequency in Hz relative to sampling rate $f_s$ in Hz, and $G_\text{dB}$ is the peaking filter gain in dB. Conveniently, the function peaking
is available in the module sk_dsp_comm.sigsys
.
In [ ]:
band1 = widgets.FloatSlider(description = '100 Hz',
continuous_update = True, # Continuous updates
value = 20.0,
min = -20.0,
max = 20.0,
step = 1,
orientation = 'vertical')
band2 = widgets.FloatSlider(description = '1000 Hz',
continuous_update = True, # Continuous updates
value = 10.0,
min = -20.0,
max = 20.0,
step = 1,
orientation = 'vertical')
band3 = widgets.FloatSlider(description = '8000 Hz',
continuous_update = True, # Continuous updates
value = -10.0,
min = -20.0,
max = 20.0,
step = 1,
orientation = 'vertical')
Gain = widgets.FloatSlider(description = 'Gain',
continuous_update = True,
value = 0.2,
min = 0.0,
max = 2.0,
step = 0.01,
orientation = 'vertical')
#widgets.HBox([Gain,band1,band2,band3])
In [ ]:
b_b1,a_b1 = ss.peaking(band1.value,100,Q=3.5,fs=48000)
zi_b1 = signal.lfiltic(b_b1,a_b1,[0])
b_b2,a_b2 = ss.peaking(band2.value,1000,Q=3.5,fs=48000)
zi_b2 = signal.lfiltic(b_b2,a_b2,[0])
b_b3,a_b3 = ss.peaking(band3.value,8000,Q=3.5,fs=48000)
zi_b3 = signal.lfiltic(b_b3,a_b3,[0])
b_12,a_12 = ss.cascade_filters(b_b1,a_b1,b_b2,a_b2)
b_123,a_123 = ss.cascade_filters(b_12,a_12,b_b3,a_b3)
f = logspace(log10(50),log10(10000),100)
w,H_123 = signal.freqz(b_123,a_123,2*pi*f/48000)
semilogx(f,20*log10(abs(H_123)))
ylim([-20,20])
ylabel(r'Gain (dB)')
xlabel(r'Frequency (Hz)')
grid();
In [ ]:
# define a pass through, y = x, callback
def callback(in_data, frame_count, time_info, status):
global DSP_IO, zi_b1,zi_b2,zi_b3, x
global Gain, band1, band2, band3
DSP_IO.DSP_callback_tic()
# convert byte data to ndarray
in_data_nda = np.frombuffer(in_data, dtype=np.int16)
#***********************************************
# DSP operations here
# Here we apply a linear filter to the input
#x = in_data_nda.astype(float32)
x = Gain.value*20000*x_loop.get_samples(frame_count)
# DSP code here
b_b1,a_b1 = ss.peaking(band1.value,100,Q=3.5,fs=48000)
z1, zi_b1 = signal.lfilter(b_b1,a_b1,x,zi=zi_b1)
b_b2,a_b2 = ss.peaking(band2.value,1000,Q=3.5,fs=48000)
z2, zi_b2 = signal.lfilter(b_b2,a_b2,z1,zi=zi_b2)
b_b3,a_b3 = ss.peaking(band3.value,8000,Q=3.5,fs=48000)
y, zi_b3 = signal.lfilter(b_b3,a_b3,z2,zi=zi_b3)
#***********************************************
# Save data for later analysis
# accumulate a new frame of samples
DSP_IO.DSP_capture_add_samples(y)
#***********************************************
# Convert from float back to int16
y = y.astype(int16)
DSP_IO.DSP_callback_toc()
# Convert ndarray back to bytes
#return (in_data_nda.tobytes(), pyaudio.paContinue)
return y.tobytes(), pah.pyaudio.paContinue
fs, x_wav2 = ss.from_wav('audio_files/Music_Test.wav')
x_wav = (x_wav2[:,0] + x_wav2[:,1])/2
x_loop = pah.loop_audio(x_wav)
DSP_IO = pah.DSP_io_stream(callback,0,1,fs=44100,Tcapture=0)
DSP_IO.interactive_stream(0,1)
widgets.HBox([Gain,band1,band2,band3])
In [ ]:
Image("audio_files/three_band_widgets.png", width="55%")
In [ ]:
f_AD,Mag_AD = loadtxt('audio_files/ThreeBand_Peak_100_p20_1k_p10_8k_m10_fs_48k.csv',
delimiter=',',skiprows=6,unpack=True)
semilogx(f_AD,Mag_AD+55)
semilogx(f,20*log10(abs(H_123)))
ylabel(r'Gain (dB)')
xlabel(r'Frequency (Hz)')
legend((r'AD2 Noise Measured',r'Design Theory'))
title(r'Three Band Equalizer: $f_{center} = [100,1000,800]$, $Q = 3.5$')
ylim([-20,20])
xlim([50,10000])
grid();
In [ ]: