Open-Loop versus Closed-Loop Differential PCM

This code is provided as supplementary material of the lecture Quellencodierung.

This code illustrates

  • Effect of open loop versus closed loop DPCM on the quantization noise

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
from scipy.signal import lfilter
import scipy.signal as sp
import librosa
import librosa.display
import IPython.display as ipd

Load wave file and play original, unquantized version


In [2]:
wave_filename = '../audio/33711__acclivity__excessiveexposure.wav'

# load file, do resample to 8 KhZ (as used in the DPCM system)
x, sampling_rate = librosa.load(wave_filename, sr=8000)

x_max = max(abs(x))

# playback original audio
ipd.Audio(x, rate=sampling_rate)


Out[2]:

Some helper functions to determine prediction coefficients.


In [25]:
# prediction order
n = 8

# quantization of residual signal with w bits 
w = 3

# helper function calculating the auto-correlation coefficient r
def get_r(x, l):    
    x_shift = np.roll(x,l)
    x_shift[:l] = 0        
    return np.correlate(x,x_shift)[0]
    
    
def get_prediction_coefficients(x, n):
    r = np.array([get_r(x,k) for k in np.arange(1,n+1)])
    R = np.array([np.concatenate(([get_r(x,j) for j in np.arange(i,0,-1)], [get_r(x,j) for j in np.arange(0,n-i)])) for i in range(n)])    
    
    a_opt = np.linalg.inv(R) @ r
    return a_opt


# helper function for sign that assigns "+1" to zero
def my_sign(x):
    if np.isscalar(x):
        if np.isclose(x,0):
            return +1.0
        else:
            return np.sign(x)
    else:
        retval = np.sign(x)
        retval[np.isclose(x,np.zeros_like(x))] = +1.0
        return retval

Experiment A: Open-loop linear prediction with fixed (but optimized prediction coefficients)

Again, we employ $w$ bit per sample and use scalar quantization merely for illustration purposes. We adapt the stepsize of the scalar quantizer to each block, such that the dynamic range is maximally exploited. We play the original and the reconstructed signal. We calculate new predictor coefficients in each block.


In [26]:
# N samples per frame, here N = 160, with sampling rate of 8 Khz, we have 20 ms frames
N = 160 
frames = int(np.floor(len(x) / N))


y_ol = np.zeros_like(x)

# no adaptation for first frame
a = np.zeros(n)

# filter memory 
memory_tx = np.zeros(n)
memory_rx = np.zeros(n)

# save predictor coefficients and block-based stepsize for later used in closed-loop prediction
a_save = []
Delta_x_save = []
for k0 in range(frames):
    
    x_part = x[np.arange(N)+k0*N]

    # update filter coefficients and save filter coefficients for closed-loop
    a = get_prediction_coefficients(x_part, n)
    a_save.append(a)
    
    # prediction, save memory contents for next round
    # important is to save the memory contents
    xh_part, memory_tx = lfilter(np.concatenate(([0], a)), 1, x_part, zi=memory_tx)
    d = x[np.arange(N) + k0*N] - xh_part
    
    # fix x_max based on the current signal, leave some headroom
    x_max = 1.5 * np.max([np.max(d), -np.min(d)])
    Delta_x = x_max / (2**(w-1))
    xh_max = (2**(w-1)) * Delta_x / 2
    Delta_x_save.append(Delta_x)

    # quantize with uniform scalar quantizer, adaptive stepsize
    d_q = my_sign(d)*Delta_x*(np.floor(np.abs(d)/Delta_x)+0.5) 

    # reconstruct signal
    y_part, memory_rx = lfilter([1.0], np.concatenate(([1], -a)), d_q, zi=memory_rx)     
    y_ol[np.arange(N)+k0*N] = y_part

In [27]:
print('Audio playback of original file')
ipd.display(ipd.Audio(x, rate=sampling_rate))
print('Audio playback of recovered file (after open-loop DPCM)')
ipd.display(ipd.Audio(y_ol, rate=sampling_rate))


Audio playback of original file
Audio playback of recovered file (after open-loop DPCM)

Experiment B: Closed-loop linear prediction with fixed (but optimized prediction coefficients)

We reuse both the quantizer and the prediction coefficients from the previous experiment and use closed-loop DPCM instead.


In [23]:
# memory of the predictor, initialize with zeros, contains the input to the prediction filter, i.e., \tilde{x}
memory_tx = np.zeros(n)
memory_rx = np.zeros(n)

y_cl = np.zeros_like(x)

d_q = np.zeros(N)

for k0 in range(frames):
    
    x_part = x[np.arange(N)+k0*N]

    # recover filter coefficients and quantizer stepsize
    a = a_save[k0]
    Delta_x = Delta_x_save[k0]
    xh_max = (2**(w-1)) * Delta_x / 2

    # closed-loop prediction
    for k in range(len(x_part)):
        x_hat = a @ memory_tx
        d = x_part[k] - x_hat
              
        d_q[k] = my_sign(d)*Delta_x*(np.floor(np.abs(d)/Delta_x)+0.5)    
    
        # saturate
        if d_q[k] < -xh_max :
            d_q[k] = -xh_max
        if d_q[k] > +xh_max :
            d_q[k] = +xh_max
              
        # update memory of predictor
        memory_tx = np.concatenate(([d_q[k] + x_hat], memory_tx[:-1]))
        
    # receiver part, recover signal
    y_part, memory_rx = lfilter([1.0], np.concatenate(([1], -a)), d_q, zi=memory_rx) 
    
    y_cl[np.arange(N)+k0*N] = y_part

Compare all files in the playback.


In [28]:
print('Audio playback of original file')
ipd.display(ipd.Audio(x, rate=sampling_rate))
print('Audio playback of recovered file (open-loop DPCM)')
ipd.display(ipd.Audio(y_ol, rate=sampling_rate))
print('Audio playback of recovered file (closed-loop DPCM)')
ipd.display(ipd.Audio(y_cl, rate=sampling_rate))


Audio playback of original file
Audio playback of recovered file (open-loop DPCM)
Audio playback of recovered file (closed-loop DPCM)