In [74]:
import numpy as np
from scipy.io import wavfile
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
from scipy.ndimage.morphology import generate_binary_structure, iterate_structure, binary_erosion
import hhashlib

Audio Fingerprinting

The goal here is to be able to take a sample audio file and create a set of fingerprints that define that file. To that end, we're going to do the following:

  • create a spectrogram of the file
  • find the 'peaks' in the audio
  • filter out low peaks
  • create pairs of peaks separated by some time delta
  • hash those pairs with the time delta information

those hashes are our fingerprints.

Creating the Spectrogram


In [2]:
samplerate, channels = wavfile.read('../data/Sor1929.wav')
mono = np.mean(channels, axis=1)

In [3]:
# constants
WINDOW_SIZE = 4096 # granularity of chunks
SAMPLE_RATE = 44100 # by nyquist or whatever
OVERLAP_RATIO = 0.5 # amount our chunks can overlap

In [4]:
res = plt.specgram(mono, 
             NFFT=WINDOW_SIZE, 
             Fs=SAMPLE_RATE, 
             window=mlab.window_hanning,
             noverlap = int(WINDOW_SIZE * OVERLAP_RATIO))[0]


put this shit on a log scale:


In [5]:
# log it!
res = 10 * np.log10(res)

# np.inf is terrible, replace with zeros.
res[res == -np.inf] = 0

In [8]:
def heatmap(m):
    plt.pcolor(m)
    plt.colorbar()

In [9]:
heatmap(res)



In [95]:
res.shape


Out[95]:
(2049, 214)

In [98]:
res


Out[98]:
array([[ 34.53101085,  29.39607637,  14.86573686, ...,  46.2016775 ,
         48.19035718,  48.82804761],
       [ 33.08141731,  26.54014288,  27.72906387, ...,  43.47524504,
         45.08122096,  45.78939866],
       [ 21.13760964,  17.10633379,  22.03472837, ...,  19.35458494,
         15.74225949,  16.98076228],
       ..., 
       [-53.88926814, -53.28589151, -51.57680939, ..., -59.97192965,
        -50.76645516, -58.97918794],
       [-51.17899251, -64.75096829, -50.35851185, ..., -70.06006002,
        -49.57059615, -54.70190764],
       [-54.22194285, -53.9047388 , -55.50505622, ..., -59.75358744,
        -51.26259631, -53.83486096]])

Peak Detection

Okay, so what we want is to detect all the peaks in the spectrogram.

How do we do this?

First, we can define a neighborhood around a particular point, and define some point as a peak if it's greater than all the points in its neighborhood.

How do we define a neighborhood? Make a numpy filter using generate_binary_structure:


In [10]:
from scipy.ndimage.morphology import generate_binary_structure, iterate_structure

s = generate_binary_structure(2, 1)
s


Out[10]:
array([[False,  True, False],
       [ True,  True,  True],
       [False,  True, False]], dtype=bool)

We can extend this filter by using iterate_structure:


In [11]:
neighborhood = iterate_structure(s, 5).astype(int)
neighborhood


Out[11]:
array([[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0],
       [0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0],
       [0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0],
       [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0],
       [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
       [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0],
       [0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0],
       [0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 0],
       [0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]])

In [12]:
# make it a little bigger
neighborhood = iterate_structure(s, 20).astype(int)

Okay, great. How do we apply this to our spectrogram?

We can use a maximum filter, which will re-define each point in our matrix to be the maximum in it's neighborhood:


In [13]:
from scipy.ndimage.filters import maximum_filter
local_max = maximum_filter(res, footprint=neighborhood)
local_max


Out[13]:
array([[ 47.70849849,  47.70849849,  47.70849849, ...,  53.78828931,
         53.78828931,  53.78828931],
       [ 47.70849849,  47.70849849,  47.70849849, ...,  53.78828931,
         53.78828931,  53.78828931],
       [ 51.62175634,  47.70849849,  47.70849849, ...,  53.78828931,
         53.78828931,  53.78828931],
       ..., 
       [-47.48694411, -47.48694411, -47.48694411, ..., -46.33640966,
        -46.33640966, -46.33640966],
       [-47.48694411, -47.48694411, -47.48694411, ..., -46.33640966,
        -46.33640966, -46.33640966],
       [-47.48694411, -47.48694411, -47.48694411, ..., -46.33640966,
        -46.33640966, -46.33640966]])

Then, we can check where the local_max (maximum in the neighborhood) is equal to our original values.

The points for which this is true are our peaks:


In [14]:
peaks = local_max==res
peaks


Out[14]:
array([[False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       ..., 
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False]], dtype=bool)

So now, we can get the actual maximum points.

One thing we should do is kind of "filter out" the background noise around the peaks - we want to only pull the peaks, not any of the other data.

Apparently, we need to do a binary erosion of the background, shrinking it a little, in order to get the actual peaks and not a bunch of space around the peaks as well, so we do that:


In [17]:
background = (res == 0)
eroded_background = binary_erosion(background, structure=neighborhood,
                                   border_value=1)
actual_peaks = peaks - eroded_background
actual_peaks


Out[17]:
array([[False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       ..., 
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False],
       [False, False, False, ..., False, False, False]], dtype=bool)

Great! Now we have all the peaks from this spectrogram.

Maybe we can plot it?


In [33]:
fig = plt.figure()
ax1 = fig.add_subplot(111)

y, x = actual_peaks.astype(int).nonzero()
ax1.pcolor(res)
#cax = ax1.imshow(res, interpolation='nearest', cmap=cm.coolwarm)
#fig.colorbar(cax)
ax1.scatter(x, y, c= 'blue')

#plt.pcolor(res)
plt.show()


Peak Filtering

I think it's weird that we're seeing a bunch of peaks in the low blue areas, but maybe that's just because of the file?

Maybe we should remove peaks that are below some threshold:


In [38]:
# threshold for amplitudes
AMPLITUDE_THRESHOLD = 20

# get amplitudes of peaks
amplitudes = res[actual_peaks].flatten()

# x & y of peaks
y, x = actual_peaks.astype(int).nonzero()

# tuple up the frequency/time stuff:
all_peaks = zip(x, y, amplitudes)

now, we've got a list of tupes of all the peaks, where:

  • t[0] = the time (remember, we made a spectrogram? time bins.)
  • t[1] = the frequency
  • t[2] = the amplitude

(time, frequency, amplitude)

here's the last 5 peaks, by amplitude:


In [43]:
all_peaks[-5:]


Out[43]:
[(201, 2041, -46.336409661576212),
 (120, 2042, -46.963677847479246),
 (1, 2043, -47.486944106157551),
 (147, 2044, -45.355392898475841),
 (173, 2047, -47.413819600093831)]

So now we can filter them:


In [42]:
# filter the peaks
filtered_peaks = [p for p in all_peaks if p[2] > AMPLITUDE_THRESHOLD]
filtered_peaks[-5:]


Out[42]:
[(196, 763, 26.803332308264206),
 (0, 779, 21.460365502635383),
 (156, 786, 22.355241916207387),
 (210, 829, 21.9251353126052),
 (147, 858, 23.023283655961642)]

Now, we can create the fingerprint from the times/frequencies (tossing the amplitudes):


In [44]:
fingerprint = ([p[0] for p in filtered_peaks], 
               [p[1] for p in filtered_peaks])

In [50]:
fig = plt.figure()
ax1 = fig.add_subplot(111)

x, y = fingerprint[0], fingerprint[1]
ax1.pcolor(res)
#cax = ax1.imshow(res, interpolation='nearest', cmap=cm.coolwarm)
#fig.colorbar(cax)
ax1.scatter(x, y, c= 'blue')

#plt.pcolor(res)
plt.show()


Looking pretty good to me!

How many peaks do we have for this file?


In [51]:
len(fingerprint[0])


Out[51]:
194

Fingerprinting

194 is a good feature reduction from thousasnds of samples, I think :).

Next step: how do we use these peaks to find matches?

Well, really, a song can be thought of a series of pairs of peaks, separated by a time value.

We can fingerprint a song by creating a series of hashed (peak_freq_1, peak_freq_2, time_delta) tuples.

Then, when we look for matches for a given song $B$, we can:

  1. fingerprint song B, collecting a series of $N$ hashed (peak_freq_1, peak_freq_2, time_delta) tuples (let's call them $t_i$).
  2. $\forall i \in N$ for song $B$, check if we have a hash collision for $t_i$ from our stored files.
  3. Check which song file has the most hash collisions (maybe set a threshold?). If the number of collisions for some file is above our threshold, return that song as a match.

First, we need to define how many pairs of peaks are created for each individual peak. Let's use $\frac{N}{10}$ to start:


In [65]:
fingerprint_pairs = 19

We also need to define a time range - we don't want one peak to be paired with another if they're too far apart in time. maybe 10 is good?


In [64]:
fingerprint_time_delta = 10

Okay, let's actually make the fingerprints. This can probably be faster.


In [86]:
# sort by time (I think the other guy does it by frequency? not sure why...)
sorted_peaks = sorted(filtered_peaks, key=lambda p: p[0])

# of the form (f1, f2, time_delta)
fingerprints = []
for i, peak in enumerate(sorted_peaks):
    # get all peaks within `fingerprint_pairs` of the current peak
    potential_pairs = sorted_peaks[i+1:i+fingerprint_pairs]
    # get rid of the ones that are too far away in time
    potential_pairs = [p for p in potential_pairs if p[0] - peak[0] < fingerprint_time_delta]
    # create the (f1, f2, time_delta) tuples
    prints = [(peak[1], p[1], (p[0] - peak[0])) for p in potential_pairs]
    fingerprints.extend(prints)

Okay, how many fingerprints is that?


In [68]:
len(fingerprints)


Out[68]:
1740

Still not bad. These are gonna be hashes, remember, and this is the whole song.

Let's just check what they look like:


In [73]:
fingerprints[:10]


Out[73]:
[(115, 137, 0),
 (115, 183, 0),
 (115, 252, 0),
 (115, 275, 0),
 (115, 321, 0),
 (115, 390, 0),
 (115, 458, 0),
 (115, 481, 0),
 (115, 505, 0),
 (115, 550, 0)]

I'm not sure about having pairs at time_delta == 0, but I'll leave them there for now.

Okay, great. Now we just have to hash them....

MD5?


In [81]:
hashes = []
for fprint in fingerprints:
    h = hashlib.md5('{0}|{1}|{2}'.format(fprint[0], fprint[1], fprint[2]))
    hashes.append(h.hexdigest())

Okay, what do they look like?


In [83]:
hashes[:5]


Out[83]:
['d525254e088e88327c2598b2a0037f84',
 'd324142f471966af15f16ac8a6c0aa73',
 '7e94cac1b350c9a6fa3c99169efd8aac',
 '59ecdbfc8d113ea2203601f213fb0414',
 'b830f81888f436bab78b35f4603b530e']

Cool! Now, there's just the question of how we're going to do efficient searches of these hashes and stuff.

Can we use a database? Is that cheating? not sure...