In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import lfilter
import scipy.signal as sp
import librosa
import librosa.display
import IPython.display as ipd
Load wave file and play original, unquantized version
In [2]:
wave_filename = '../audio/33711__acclivity__excessiveexposure.wav'
# load file, do resample to 8 KhZ (as used in the DPCM system)
x, sampling_rate = librosa.load(wave_filename, sr=8000)
x_max = max(abs(x))
# playback original audio
ipd.Audio(x, rate=sampling_rate)
Out[2]:
Some helper functions to determine prediction coefficients.
In [25]:
# prediction order
n = 8
# quantization of residual signal with w bits
w = 3
# helper function calculating the auto-correlation coefficient r
def get_r(x, l):
x_shift = np.roll(x,l)
x_shift[:l] = 0
return np.correlate(x,x_shift)[0]
def get_prediction_coefficients(x, n):
r = np.array([get_r(x,k) for k in np.arange(1,n+1)])
R = np.array([np.concatenate(([get_r(x,j) for j in np.arange(i,0,-1)], [get_r(x,j) for j in np.arange(0,n-i)])) for i in range(n)])
a_opt = np.linalg.inv(R) @ r
return a_opt
# helper function for sign that assigns "+1" to zero
def my_sign(x):
if np.isscalar(x):
if np.isclose(x,0):
return +1.0
else:
return np.sign(x)
else:
retval = np.sign(x)
retval[np.isclose(x,np.zeros_like(x))] = +1.0
return retval
Again, we employ $w$ bit per sample and use scalar quantization merely for illustration purposes. We adapt the stepsize of the scalar quantizer to each block, such that the dynamic range is maximally exploited. We play the original and the reconstructed signal. We calculate new predictor coefficients in each block.
In [26]:
# N samples per frame, here N = 160, with sampling rate of 8 Khz, we have 20 ms frames
N = 160
frames = int(np.floor(len(x) / N))
y_ol = np.zeros_like(x)
# no adaptation for first frame
a = np.zeros(n)
# filter memory
memory_tx = np.zeros(n)
memory_rx = np.zeros(n)
# save predictor coefficients and block-based stepsize for later used in closed-loop prediction
a_save = []
Delta_x_save = []
for k0 in range(frames):
x_part = x[np.arange(N)+k0*N]
# update filter coefficients and save filter coefficients for closed-loop
a = get_prediction_coefficients(x_part, n)
a_save.append(a)
# prediction, save memory contents for next round
# important is to save the memory contents
xh_part, memory_tx = lfilter(np.concatenate(([0], a)), 1, x_part, zi=memory_tx)
d = x[np.arange(N) + k0*N] - xh_part
# fix x_max based on the current signal, leave some headroom
x_max = 1.5 * np.max([np.max(d), -np.min(d)])
Delta_x = x_max / (2**(w-1))
xh_max = (2**(w-1)) * Delta_x / 2
Delta_x_save.append(Delta_x)
# quantize with uniform scalar quantizer, adaptive stepsize
d_q = my_sign(d)*Delta_x*(np.floor(np.abs(d)/Delta_x)+0.5)
# reconstruct signal
y_part, memory_rx = lfilter([1.0], np.concatenate(([1], -a)), d_q, zi=memory_rx)
y_ol[np.arange(N)+k0*N] = y_part
In [27]:
print('Audio playback of original file')
ipd.display(ipd.Audio(x, rate=sampling_rate))
print('Audio playback of recovered file (after open-loop DPCM)')
ipd.display(ipd.Audio(y_ol, rate=sampling_rate))
In [23]:
# memory of the predictor, initialize with zeros, contains the input to the prediction filter, i.e., \tilde{x}
memory_tx = np.zeros(n)
memory_rx = np.zeros(n)
y_cl = np.zeros_like(x)
d_q = np.zeros(N)
for k0 in range(frames):
x_part = x[np.arange(N)+k0*N]
# recover filter coefficients and quantizer stepsize
a = a_save[k0]
Delta_x = Delta_x_save[k0]
xh_max = (2**(w-1)) * Delta_x / 2
# closed-loop prediction
for k in range(len(x_part)):
x_hat = a @ memory_tx
d = x_part[k] - x_hat
d_q[k] = my_sign(d)*Delta_x*(np.floor(np.abs(d)/Delta_x)+0.5)
# saturate
if d_q[k] < -xh_max :
d_q[k] = -xh_max
if d_q[k] > +xh_max :
d_q[k] = +xh_max
# update memory of predictor
memory_tx = np.concatenate(([d_q[k] + x_hat], memory_tx[:-1]))
# receiver part, recover signal
y_part, memory_rx = lfilter([1.0], np.concatenate(([1], -a)), d_q, zi=memory_rx)
y_cl[np.arange(N)+k0*N] = y_part
Compare all files in the playback.
In [28]:
print('Audio playback of original file')
ipd.display(ipd.Audio(x, rate=sampling_rate))
print('Audio playback of recovered file (open-loop DPCM)')
ipd.display(ipd.Audio(y_ol, rate=sampling_rate))
print('Audio playback of recovered file (closed-loop DPCM)')
ipd.display(ipd.Audio(y_cl, rate=sampling_rate))