In [ ]:
%pylab inline
import sk_dsp_comm.sigsys as ss
import sk_dsp_comm.pyaudio_helper as pah
import sk_dsp_comm.fir_design_helper as fir_d
import scipy.signal as signal
import scipy.io as io
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
from IPython.display import Audio, display
from IPython.display import Image, SVG

In [ ]:
pylab.rcParams['savefig.dpi'] = 100 # default 72
%config InlineBackend.figure_formats=['svg'] # SVG inline viewing

Introduction

A simplified block diagram of PyAudio streaming-based (nonblocking) signal processing when using pyaudio_helper and ipython widgets.


In [ ]:
Image("audio_files/pyaudio_dsp_IO.png", width="90%")

Available Audio I/O Devices

If you add or delete devices by plugging or unplugging USB audio ibterface, this list becomdes invalid. Restart the kernel and run again to get the correct device index list. For two channel apps both the input and output devices must support two channels. For the Sabrent USB audio devices, which has one input and two outputs, Windows for example may improperly list the devices as having two inputs.

pah.available_devices()

Index 0 device name = Built-in Microph, inputs = 2, outputs = 0

Index 1 device name = Built-in Output, inputs = 0, outputs = 2

Real-Time Loop Through

Here we set up a simple callback function that passes the input samples directly to the output. The module pyaudio_support provides a class for managing a pyaudio stream object, capturing the samples processed by the callback function, and collection of performance metrics. Once the callback function is written/declared a DSP_io_stream object can be created and then the stream(Tsec) method can be executed to start the input/output processing, e.g.,

import pyaudio_helper as pah

DSP_IO = pah.DSP_io_stream(callback,in_idx, out_idx)
DSP_IO.interactive_stream(Tsec = 2, numChan = 1)

where in_idx is the index of the chosen input device found using available_devices() and similarly out_idx is the index of the chosen output device.

  • The callback function must be written first as the function name used by the object to call the callback.

A Minimal Callback

No globals required here as there is no instrumentation configured, externally defined algorithm coefficients, and no widgets being used.


In [ ]:
# define a pass through, y = x, callback
def callback(in_data, frame_count, time_info, status):
    # convert byte data to ndarray
    in_data_nda = np.frombuffer(in_data, dtype=np.int16)
    #***********************************************
    # DSP operations here
    # Here we simply pass the input to the output, i.e.
    # y[n] = x[n]
    x = in_data_nda.astype(float32)
    y = x
    # Typically more DSP code here     
    #
    #***********************************************
    # Convert from float back to int16
    y = y.astype(int16)
    # Convert ndarray back to bytes
    return y.tobytes(), pah.pyaudio.paContinue

A Basic Callback

This callback makes use of the instrumentation capabilities of the DSP_io_stream and also has a simple lowpass filter waiting in-the-wings if a line of code in commented out and a following line is uncomments, e.g.,

#y = x
    # Typically more DSP code here
    y, zi = signal.lfilter(b,a,x,zi=zi) # for FIR or simple IIR

Notice that globals are now used for the DSP_IO object, the filter coefficients in arrays, a and b, and also the filter states held in the array zi. In its present form te filtering is commented out, but can be uncommented to allow a simple 1st-order IIR lowpass filter to operate on one channel of audio streaming through the system.


In [ ]:
# Add a simple IIR LPF
fs = 48000 # Assummed sampling rate
f3 = 1000 # Hz
a = [1, -exp(-2*pi*f3/fs)]
b = [1 - exp(-2*pi*f3/fs)]
zi = signal.lfiltic(b,a,[0])

In [ ]:
# define a pass through, y = x, callback
def callback(in_data, frame_length, time_info, status):
    global DSP_IO, b, a, zi
    DSP_IO.DSP_callback_tic()
    # convert byte data to ndarray
    in_data_nda = np.frombuffer(in_data, dtype=np.int16)
    #***********************************************
    # DSP operations here
    # Here we apply a linear filter to the input
    x = in_data_nda.astype(float32)
    y = x
    # Typically more DSP code here
    #y, zi = signal.lfilter(b,a,x,zi=zi) # for FIR or simple IIR
    #***********************************************
    # Save data for later analysis
    # accumulate a new frame of samples
    DSP_IO.DSP_capture_add_samples(y) 
    #***********************************************
    # Convert from float back to int16
    y = y.astype(int16)
    DSP_IO.DSP_callback_toc()
    # Convert ndarray back to bytes
    #return (in_data_nda.tobytes(), pyaudio.paContinue)
    return y.tobytes(), pah.pyaudio.paContinue
DSP_IO = pah.DSP_io_stream(callback,in_idx=0,out_idx=1,fs=48000,Tcapture=0)

Index 0 device name = Built-in Microph, inputs = 2, outputs = 0

Index 1 device name = Built-in Output, inputs = 0, outputs = 2

DSP_IO.interactive_stream(Tsec=0,numChan=1)

In [ ]:
Image("audio_files/start_stop_stream.png", width='55%')

iMic Noise Capture Gain Flatness

With the iMic plugged in the input/output device indices can be reconfigured to use the iMic index for both the input output streams. The Analog Discovery (AD2) is then used to drive a white noise test signal into the ADC and capture the output from the DAC. This allows us to measure the ADC-DAC frequency response using a long-term time average spectral estimate capabilities of the AD2. A second test capture is to use DSP_IO.DSP_capture_add_samples(y) to capture the response of the ADC alone, and perform spectral analysis here in the Jupyter notebook. For this capture we set Tcapture=20s two cells above and Tsec=20 one cell above. A comparison of the ADC-alone and ADC-DAC spectrum normalized to look like the frequency response is done in the cell below.


In [ ]:
f_AD,Mag_AD = loadtxt('audio_files/Loop_through_noise_SA_iMic.csv',
                       delimiter=',',skiprows=6,unpack=True)
plot(f_AD,Mag_AD-Mag_AD[100])
ylim([-10,5])
xlim([0,20e3])
ylabel(r'ADC Gain Flatness (dB)')
xlabel(r'Frequency (Hz)')
legend((r'ADC-DAC from AD2 SA dB Avg',))
title(r'Loop Through Gain Flatness using iMic at $f_s = 48$ kHz')
grid();

The callback stats when capturing data using DSP_IO.DSP_capture_add_samples(y) and a plot of the time domain samples.

Nstop = 1000
plot(arange(0,len(DSP_IO.data_capture[:Nstop]))/48000,DSP_IO.data_capture[:Nstop])
DSP_IO.stream_stats()

Note for a attributes used in the above examples the frame_length is always 1024 samples and the sampling rate $f_s = 48$ ksps. The ideal callback period is this $$ T_{cb} = \frac{1024}{480100} = 21.33\ \text{(ms)} $$

Next consider what the captures tic and toc data revels about the processing. Calling the method cb_active_plot() produces a plot similar to what an electrical engineer would see what using a logic analyzer to show the time spent in an interrupt service routine of an embedded system. The latency is also evident. You expect to see a minimum latency of two frame lengths (input buffer fill and output buffer fill),e.g.,

$$ T_\text{latency} >= 2\times \frac{1024}{48000} \times 1000 = 42.6\ \text{(ms)} $$

The host processor is multitasking, so the latency can be even greater. A true real-time DSP system would give the signal processing high priority and hence much lower is expected, particularly if the frame_length can be made small.

Real-Time Filtering

Here we set up a callback function that filters the input samples and then sends them to the output.

import pyaudio_helper as pah

DSP_IO = pah.DSP_io_stream(callback,in_idx, out_idx)
DSP_IO.interactive_stream(2,1)

where in_idx is the index of the chosen input device found using available_devices() and similarly out_idx is the index of the chosen output device.

  • The callback function must be written first as the function name is used by the object to call the callback
  • To demonstrate this we first design some filters that can be used in testing

In [ ]:
b = fir_d.fir_remez_bpf(2700,3200,4800,5300,.5,50,48000,18)
a = [1]
fir_d.freqz_resp_list([b],[1],'dB',48000)
ylim([-60,5])
grid();
zi = signal.lfiltic(b,a,[0])

In [ ]:
f_AD,Mag_AD = loadtxt('audio_files/FIR_BPF_2700_3200_4800_5300_p5dB_50dB_48k.csv',
                       delimiter=',',skiprows=6,unpack=True)
plot(f_AD,Mag_AD-max(Mag_AD)+1)
f = arange(0,20000,10)
w,H_BPF = signal.freqz(b,1,2*pi*f/48000)
plot(f,20*log10(abs(H_BPF)))
ylabel(r'Gain (dB)')
xlabel(r'Frequency (Hz)')
legend((r'AD2 Noise Measured',r'Design Theory'))
title(r'4 kHz 182-Tap FIR Bandpass Design at $f_s = 48$ kHz')
ylim([-60,5])
xlim([2000,8000])
grid();

In [ ]:
# Design an IIR Notch
b, a = ss.fir_iir_notch(2000,48000,r= 0.9)
fir_d.freqz_resp_list([b],[a],'dB',48000,4096)
ylim([-60,5])
grid();
zi = signal.lfiltic(b,a,[0])

Create some global variables for the filter coefficients and the filter state array (recall that a filter has memory).


In [ ]:
# define callback (#2)
def callback2(in_data, frame_count, time_info, status):
    global DSP_IO, b, a, zi
    DSP_IO.DSP_callback_tic()
    # convert byte data to ndarray
    in_data_nda = np.frombuffer(in_data, dtype=np.int16)
    #***********************************************
    # DSP operations here
    # Here we apply a linear filter to the input
    x = 5*in_data_nda.astype(float32)
    #y = x
    # The filter state/(memory), zi, must be maintained from frame-to-frame 
    # for FIR or simple IIR
    y, zi = signal.lfilter(b,a,x,zi=zi) 
    # for IIR use second-order sections
    #y, zi = signal.sosfilt(sos,x,zi=zi)     
    #***********************************************
    # Save data for later analysis
    # accumulate a new frame of samples
    DSP_IO.DSP_capture_add_samples(y) 
    #***********************************************
    # Convert from float back to int16
    y = y.astype(int16)
    DSP_IO.DSP_callback_toc()
    return y.tobytes(), pah.pyaudio.paContinue
DSP_IO = pah.DSP_io_stream(callback2,2,2,fs=48000,Tcapture=0)
DSP_IO.interactive_stream(Tsec=0,numChan=1)

In [ ]:
Image("audio_files/start_stop_stream.png", width='55%')

Playback Only Using an Audio Loop

A playback audio loop is created using the pah.loop_audio class filled with samples input from a wav file. In the example below we take a two-channel (stereo) wav file and convert to one channel.


In [ ]:
# define callback (3)
# Here we configure the callback to play back a wav file                      
def callback3(in_data, frame_count, time_info, status):
    global DSP_IO, x
    DSP_IO.DSP_callback_tic()
    
    # Ignore in_data when generating output only
    #***********************************************
    global x
    # Note wav is scaled to [-1,1] so need to rescale to int16
    y = 32767*x.get_samples(frame_count)
    # Perform real-time DSP here if desired
    #
    #***********************************************
    # Save data for later analysis
    # accumulate a new frame of samples
    DSP_IO.DSP_capture_add_samples(y)
    #***********************************************
    # Convert from float back to int16
    y = y.astype(int16)
    DSP_IO.DSP_callback_toc()
    return y.tobytes(), pah.pyaudio.paContinue
fs, x_wav2 = ss.from_wav('Music_Test.wav')
x_wav = (x_wav2[:,0] + x_wav2[:,1])/2
x = pah.loop_audio(x_wav)
DSP_IO = pah.DSP_io_stream(callback3,0,1,fs=44100,Tcapture=2)
DSP_IO.interactive_stream(20) # play for 20s but capture only the last 2s

In [ ]:
Image("audio_files/start_stop_stream.png", width='55%')
Npts = 96000
Nstart = 0
plot(arange(len(DSP_IO.data_capture[Nstart:Nstart+Npts]))*1000/44100,
     DSP_IO.data_capture[Nstart:Nstart+Npts]/2**(16-1))
title(r'A Portion of the capture buffer')
ylabel(r'Normalized Amplitude')
xlabel(r'Time (ms)')
grid();

In [ ]:
Image("audio_files/music_buffer_plot.png", width="75%")

Finally, the spectrum of the output signal. To apply custom scaling we use a variation of psd() found in the sigsys module. If we are plotting the spectrum of white noise sent through a filter, the output PSD will be of the form $\sigma_w^2|H(e^{j2\pi f/f_s})|^2$, where $\sigma_w^2$ is the variance of the noise driving the filter. You may choose to overlay a plot of

Widgets Examples

Stereo Gain Sliders


In [ ]:
L_gain = widgets.FloatSlider(description = 'L Gain', 
                continuous_update = True,
                value = 1.0,
                min = 0.0, 
                max = 2.0, 
                step = 0.01, 
                orientation = 'vertical')
R_gain = widgets.FloatSlider(description = 'R Gain', 
                continuous_update = True,
                value = 1.0,
                min = 0.0, 
                max = 2.0, 
                step = 0.01, 
                orientation = 'vertical')

#widgets.HBox([L_gain, R_gain])

In [ ]:
# L and Right Gain Sliders
def callback(in_data, frame_count, time_info, status):  
    global DSP_IO, L_gain, R_gain
    DSP_IO.DSP_callback_tic()
    # convert byte data to ndarray
    in_data_nda = np.frombuffer(in_data, dtype=np.int16)
    # separate left and right data
    x_left,x_right = DSP_IO.get_LR(in_data_nda.astype(float32))
    #***********************************************
    # DSP operations here
    y_left = x_left*L_gain.value
    y_right = x_right*R_gain.value
    
    #***********************************************
    # Pack left and right data together
    y = DSP_IO.pack_LR(y_left,y_right)
    # Typically more DSP code here     
    #***********************************************
    # Save data for later analysis
    # accumulate a new frame of samples
    DSP_IO.DSP_capture_add_samples_stereo(y_left,y_right)
    #***********************************************
    # Convert from float back to int16
    y = y.astype(int16)
    DSP_IO.DSP_callback_toc()
    # Convert ndarray back to bytes
    #return (in_data_nda.tobytes(), pyaudio.paContinue)
    return y.tobytes(), pah.pyaudio.paContinue
DSP_IO = pah.DSP_io_stream(callback,0,1,fs=48000,Tcapture=0)
DSP_IO.interactive_stream(0,2)
widgets.HBox([L_gain, R_gain])

In [ ]:
Image("audio_files/left_right_gain.png", width="65%")

Cross Panning


In [ ]:
panning = widgets.FloatSlider(description = 'Panning (%)', 
                            continuous_update = True,       # Continuous updates
                            value = 50.0,
                            min = 0.0, 
                            max = 100.0, 
                            step = 0.1, 
                            orientation = 'horizontal')
#display(panning)

In [ ]:
# Panning
def callback(in_data, frame_count, time_info, status):
    global DSP_IO, panning
    DSP_IO.DSP_callback_tic()
    # convert byte data to ndarray
    in_data_nda = np.frombuffer(in_data, dtype=np.int16)
    # separate left and right data
    x_left,x_right = DSP_IO.get_LR(in_data_nda.astype(float32))
    #***********************************************
    # DSP operations here
    y_left = (100-panning.value)/100*x_left \
              + panning.value/100*x_right
    y_right = panning.value/100*x_left \
              + (100-panning.value)/100*x_right
    
    #***********************************************
    # Pack left and right data together
    y = DSP_IO.pack_LR(y_left,y_right)
    # Typically more DSP code here     
    #***********************************************
    # Save data for later analysis
    # accumulate a new frame of samples
    DSP_IO.DSP_capture_add_samples_stereo(y_left,y_right)
    #***********************************************
    # Convert from float back to int16
    y = y.astype(int16)
    DSP_IO.DSP_callback_toc()
    # Convert ndarray back to bytes
    #return (in_data_nda.tobytes(), pyaudio.paContinue)
    return y.tobytes(), pah.pyaudio.paContinue
FRAMES = 512
# Create streaming object
DSP_IO = pah.DSP_io_stream(callback,0,1,
                           fs=48000,
                           frame_length = FRAMES,
                           Tcapture=0) 

# interactive_stream runs in a thread 
#so widget can be used
DSP_IO.interactive_stream(0,2)

# display panning widget
display(panning)

In [ ]:
Image("audio_files/cross_panning.png", width='55%')

Three Band Equalizer

Here we consider a three-band equalizer operating on a music loop. Each peaking filter has system function in the $z$-domain defined by $$ H_{pk}(z) = C_\text{pk}\frac{1 + b_1 z^{-1} + b_2 z^{-2}}{1 + a_1 z^{-1} + a_2 z^{-2}} $$

where the filter coefficients are given by $$\begin{align} C_\text{pk} &= \frac{1+k_q\mu}{1+k_q}\\ k_q &= \frac{4}{1+\mu} \tan\left(\frac{2\pi f_c/f_s}{2Q}\right) \\ b_1 &= \frac{-2\cos(2\pi f_c/f_s)}{1+k_q\mu} \\ b_2 &= \frac{1-k_q\mu}{1+k_q\mu} \\ a_1 &= \frac{-2\cos(2\pi f_c/f_s)}{1+k_q} \\ a_2 &= \frac{1 - k_q}{1+k_q} \end{align}$$

where $$ \mu = 10^{G_\text{dB}/20},\ \ Q \in [2, 10] $$

and and $f_c$ is the center frequency in Hz relative to sampling rate $f_s$ in Hz, and $G_\text{dB}$ is the peaking filter gain in dB. Conveniently, the function peaking is available in the module sk_dsp_comm.sigsys.


In [ ]:
band1 = widgets.FloatSlider(description = '100 Hz', 
                            continuous_update = True,       # Continuous updates
                            value = 20.0,
                            min = -20.0, 
                            max = 20.0, 
                            step = 1, 
                            orientation = 'vertical')
band2 = widgets.FloatSlider(description = '1000 Hz', 
                            continuous_update = True,       # Continuous updates
                            value = 10.0,
                            min = -20.0, 
                            max = 20.0, 
                            step = 1, 
                            orientation = 'vertical')
band3 = widgets.FloatSlider(description = '8000 Hz', 
                            continuous_update = True,       # Continuous updates
                            value = -10.0,
                            min = -20.0, 
                            max = 20.0, 
                            step = 1, 
                            orientation = 'vertical')

Gain = widgets.FloatSlider(description = 'Gain', 
                continuous_update = True,
                value = 0.2,
                min = 0.0, 
                max = 2.0, 
                step = 0.01, 
                orientation = 'vertical')

#widgets.HBox([Gain,band1,band2,band3])

In [ ]:
b_b1,a_b1 = ss.peaking(band1.value,100,Q=3.5,fs=48000)
zi_b1 = signal.lfiltic(b_b1,a_b1,[0])
b_b2,a_b2 = ss.peaking(band2.value,1000,Q=3.5,fs=48000)
zi_b2 = signal.lfiltic(b_b2,a_b2,[0])
b_b3,a_b3 = ss.peaking(band3.value,8000,Q=3.5,fs=48000)
zi_b3 = signal.lfiltic(b_b3,a_b3,[0])
b_12,a_12 = ss.cascade_filters(b_b1,a_b1,b_b2,a_b2)
b_123,a_123 = ss.cascade_filters(b_12,a_12,b_b3,a_b3)
f = logspace(log10(50),log10(10000),100)
w,H_123 = signal.freqz(b_123,a_123,2*pi*f/48000)
semilogx(f,20*log10(abs(H_123)))
ylim([-20,20])
ylabel(r'Gain (dB)')
xlabel(r'Frequency (Hz)')
grid();

In [ ]:
# define a pass through, y = x, callback
def callback(in_data, frame_count, time_info, status):
    global DSP_IO, zi_b1,zi_b2,zi_b3, x
    global Gain, band1, band2, band3
    DSP_IO.DSP_callback_tic()
    # convert byte data to ndarray
    in_data_nda = np.frombuffer(in_data, dtype=np.int16)
    #***********************************************
    # DSP operations here
    # Here we apply a linear filter to the input
    #x = in_data_nda.astype(float32)
    x = Gain.value*20000*x_loop.get_samples(frame_count)
    # DSP code here
    b_b1,a_b1 = ss.peaking(band1.value,100,Q=3.5,fs=48000)
    z1, zi_b1 = signal.lfilter(b_b1,a_b1,x,zi=zi_b1) 
    b_b2,a_b2 = ss.peaking(band2.value,1000,Q=3.5,fs=48000)
    z2, zi_b2 = signal.lfilter(b_b2,a_b2,z1,zi=zi_b2)
    b_b3,a_b3 = ss.peaking(band3.value,8000,Q=3.5,fs=48000)
    y, zi_b3 = signal.lfilter(b_b3,a_b3,z2,zi=zi_b3)
    #***********************************************
    # Save data for later analysis
    # accumulate a new frame of samples
    DSP_IO.DSP_capture_add_samples(y) 
    #***********************************************
    # Convert from float back to int16
    y = y.astype(int16)
    DSP_IO.DSP_callback_toc()
    # Convert ndarray back to bytes
    #return (in_data_nda.tobytes(), pyaudio.paContinue)
    return y.tobytes(), pah.pyaudio.paContinue
fs, x_wav2 = ss.from_wav('audio_files/Music_Test.wav')
x_wav = (x_wav2[:,0] + x_wav2[:,1])/2
x_loop = pah.loop_audio(x_wav)
DSP_IO = pah.DSP_io_stream(callback,0,1,fs=44100,Tcapture=0)
DSP_IO.interactive_stream(0,1)
widgets.HBox([Gain,band1,band2,band3])

In [ ]:
Image("audio_files/three_band_widgets.png", width="55%")

In [ ]:
f_AD,Mag_AD = loadtxt('audio_files/ThreeBand_Peak_100_p20_1k_p10_8k_m10_fs_48k.csv',
                       delimiter=',',skiprows=6,unpack=True)
semilogx(f_AD,Mag_AD+55)
semilogx(f,20*log10(abs(H_123)))
ylabel(r'Gain (dB)')
xlabel(r'Frequency (Hz)')
legend((r'AD2 Noise Measured',r'Design Theory'))
title(r'Three Band Equalizer: $f_{center} = [100,1000,800]$, $Q = 3.5$')
ylim([-20,20])
xlim([50,10000])
grid();

In [ ]: