In [51]:
%matplotlib inline
from __future__ import division, print_function
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import SHS_data
import util
import paired_data
reload(paired_data);
This notebook documents an experiment in which we try to learn a fingerprinting function for chroma-based cover song retrieval.
It relies on three other modules in this project for data handling: SHS_data
(to load Second Hand Song chroma features and ground truth), paired_data
(to preprocess data for training a neural network) and util
.
The cover detection experiment in the next section uses main
for the experiment routines and fingerprints
, which contains other fingerprinting functions to which we can compare.
But first, this section presents the fingerprint learning, in three parts:
Data - loading train and test data to memory
Network components - defining network variables and layers
Network - setting up a three-layer convolutional neural network
Training - training (and testing) the network
In [52]:
# train, test, validation split
ratio = (50,20,30)
clique_dict, _ = SHS_data.read_cliques()
train_cliques, test_cliques_big, _ = util.split_train_test_validation(clique_dict, ratio=ratio)
# preload training data to memory (just about doable)
print('Preloading training data...')
train_uris = util.uris_from_clique_dict(train_cliques)
chroma_dict = SHS_data.preload_chroma(train_uris)
# make a training dataset of cover and non-cover pairs of songs
print('Preparing training dataset...')
n_patches, patch_len = 8, 64
X_A, X_B, Y, pair_uris = paired_data.dataset_of_pairs(train_cliques, chroma_dict,
n_patches=n_patches, patch_len=patch_len)
print(' Training set:', X_A.shape, X_B.shape, Y.shape)
In [53]:
# pick a test subset
n_test_cliques = 50 # e.g., 50 ~ small actual datasets
test_cliques = {uri: test_cliques_big[uri] for uri in test_cliques_big.keys()[:n_test_cliques]}
# preload test data to memory (just about doable)
print('Preloading test data...')
test_uris = util.uris_from_clique_dict(test_cliques)
chroma_dict_T = SHS_data.preload_chroma(test_uris)
# make a test dataset of cover and non-cover pairs of songs
print('Preparing test dataset...')
X_A_T, X_B_T, Y_T, test_pair_uris_T = paired_data.dataset_of_pairs(test_cliques, chroma_dict_T,
n_patches=n_patches, patch_len=patch_len)
print(' Test set:', X_A_T.shape, X_B_T.shape, Y_T.shape)
In [54]:
weight_scale = 0.1
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=weight_scale)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.constant(weight_scale, shape=shape)
return tf.Variable(initial)
In [55]:
# convolutional layers
def conv_bins(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='VALID')
def conv_frames(x, W):
return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')
# max pool layers
def max_pool_4x1(x):
return tf.nn.max_pool(x, ksize=[1, 4, 1, 1],
strides=[1, 4, 1, 1], padding='SAME')
def max_pool_8x1(x):
return tf.nn.max_pool(x, ksize=[1, 8, 1, 1],
strides=[1, 8, 1, 1], padding='SAME')
def max_pool_16x1(x):
return tf.nn.max_pool(x, ksize=[1, 16, 1, 1],
strides=[1, 16, 1, 1], padding='SAME')
In [56]:
input_len = n_patches * patch_len
x_A = tf.placeholder("float", shape=[None, input_len, 12])
x_B = tf.placeholder("float", shape=[None, input_len, 12])
y_ = tf.placeholder("float", shape=[None,])
x_image_A = tf.reshape(x_A, [-1, input_len, 12, 1])
x_image_B = tf.reshape(x_B, [-1, input_len, 12, 1])
y_ = tf.reshape(y_, [-1, 1])
In [57]:
W_conv1 = weight_variable([1, 12, 1, 32])
b_conv1 = bias_variable([32])
h_conv1_A = tf.nn.relu(conv_bins(x_image_A, W_conv1) + b_conv1)
h_conv1_B = tf.nn.relu(conv_bins(x_image_B, W_conv1) + b_conv1)
h_pool1_A = max_pool_4x1(h_conv1_A)
h_pool1_B = max_pool_4x1(h_conv1_B)
In [58]:
W_conv2 = weight_variable([2, 1, 32, 64])
b_conv2 = bias_variable([64])
h_conv2_A = tf.nn.relu(conv_frames(h_pool1_A, W_conv2) + b_conv2)
h_conv2_B = tf.nn.relu(conv_frames(h_pool1_B, W_conv2) + b_conv2)
h_pool2_A = max_pool_16x1(h_conv2_A)
h_pool2_B = max_pool_16x1(h_conv2_B)
In [59]:
h_pool2_A_flat = tf.reshape(h_pool2_A, [-1, 8*64]) # flatten images first
h_pool2_B_flat = tf.reshape(h_pool2_B, [-1, 8*64])
W_fc1 = weight_variable([8*64, 128])
b_fc1 = bias_variable([128])
out_I_A = tf.tanh(tf.matmul(h_pool2_A_flat, W_fc1) + b_fc1)
out_I_B = tf.tanh(tf.matmul(h_pool2_B_flat, W_fc1) + b_fc1)
# h_fc1_A = tf.nn.relu(tf.matmul(h_pool2_A_flat, W_fc1) + b_fc1)
# h_fc1_B = tf.nn.relu(tf.matmul(h_pool2_B_flat, W_fc1) + b_fc1)
# out_I_A = tf.tanh(h_pool2_A_flat)
# out_I_B = tf.tanh(h_pool2_B_flat)
In [60]:
def approx_bhattacharyya(squared_dists, is_cover):
"""Approximate Bhattacharyya distance between cover and non-cover distances.
Similar to Mahalanobis distance, but for distributions with different variances.
Assumes normality, hence approximate (distances are bound by 0).
"""
pair_dists = np.sqrt(squared_dists[np.where(is_cover==1)])
non_pair_dists = np.sqrt(squared_dists[np.where(is_cover==0)])
mu_pairs, sigma2_pairs = np.mean(pair_dists), np.var(pair_dists)
mu_non_pairs, sigma2_non_pairs = np.mean(non_pair_dists), np.var(non_pair_dists)
bhatt = (0.25 * np.log(0.25 * (sigma2_pairs/sigma2_non_pairs + sigma2_non_pairs/sigma2_pairs + 2)) +
0.25 * (mu_pairs - mu_non_pairs)**2 / (sigma2_pairs + sigma2_non_pairs))
return bhatt, mu_pairs, mu_non_pairs
Minize pair distances while maximizing non-pair distances smaller than m
Following [1].
In [61]:
alpha = 1
m = 10
squared_errors = tf.reduce_sum(tf.square(out_I_A - out_I_B), reduction_indices=1, keep_dims=True)
pair_loss = tf.reduce_mean(y_ * squared_errors)
non_pair_loss = tf.reduce_mean((1 - y_) * tf.square(tf.maximum(0.0, m - tf.sqrt(squared_errors))))
loss_function = pair_loss + (alpha * non_pair_loss)
# loss_float = tf.cast(loss_function, "float")
# optimizer
learning_rate = tf.placeholder(tf.float32, shape=[])
optimizer = tf.train.AdamOptimizer(learning_rate)
train_step = optimizer.minimize(loss_function)
In [62]:
error_pairs_log = []
error_non_pairs_log = []
squared_dists_log = []
labels_log = []
d_pairs_log = []
d_non_pairs_log = []
train_error_log = []
train_bhatt_log = []
test_error_log = []
test_bhatt_log = []
In [63]:
def report(step, batch):
print('step {}'.format(step))
# train and test feeds
train_feed = {x_A:batch[0], x_B:batch[1], y_: batch[2]}
test_feed = {x_A:X_A_T, x_B:X_B_T, y_: Y_T}
# train metrics
error_pairs, error_non_pairs, squared_dists, train_error = sess.run([pair_loss, non_pair_loss,
squared_errors, loss_float],
feed_dict=train_feed)
train_bhatt, d_pairs, d_non_pairs = approx_bhattacharyya(squared_dists, train_feed[y_])
# test metrics
test_squared_dists, test_error = sess.run([squared_errors, loss_float],
feed_dict=test_feed)
test_bhatt, _, _ = approx_bhattacharyya(test_squared_dists, test_feed[y_])
# log all metrics
error_pairs_log.append(error_pairs)
error_non_pairs_log.append(error_non_pairs)
squared_dists_log.append(squared_dists)
labels_log.append(batch[2])
d_non_pairs_log.append(d_non_pairs)
d_pairs_log.append(d_pairs)
train_bhatt_log.append(train_bhatt)
train_error_log.append(train_error)
test_bhatt_log.append(test_bhatt)
test_error_log.append(test_error)
# print some metrics
print(' d_pairs, d_non_pairs = %.3g, %.3g' % (d_pairs, d_non_pairs))
print(' train error %.3g, train bhatt %.3g' % (train_error, train_bhatt))
print(' test error %.3g, test bhatt %.3g' % (test_error, test_bhatt))
In [64]:
sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
In [65]:
n_epoques = 2500 # 2500 ~ 10 x training set after (50,20,30) split
batch_size = 100
lr = 3e-4
train_batches = paired_data.get_batches([X_A, X_B, Y], batch_size=batch_size)
for step in range(n_epoques):
batch = next(train_batches)
if step%10 == 0:
report(step, batch)
train_feed = {x_A:batch[0], x_B:batch[1], y_: batch[2], learning_rate: lr}
train_step.run(feed_dict=train_feed)
report('[end]', batch)
typical result:
# step [end]
# d_pairs, d_non_pairs = 3.47, 5.71
# train error 17.2, train bhatt 0.335
# test error 19.5, test bhatt 0.155
plot loss functions for train and test data.
Note: test error is computed for the same subset at every step, making it appear much more stable than the training loss (computed for a different batch at every step).
In [ ]:
plt.figure(figsize=(10,10))
plt.subplot(311)
plt.plot(train_error_log);
plt.plot(error_pairs_log, color='g');
plt.plot(error_non_pairs_log, color='r');
plt.plot(test_error_log, color='k');
plt.title('train (b) and test(k) loss function with train pairs (g) vs non-pairs (r) components');
plt.subplot(312)
plt.plot(d_pairs_log, color='g');
plt.plot(d_non_pairs_log, color='r');
plt.title('average distance, train pairs (g) vs non-pairs (r)');
plt.subplot(313)
plt.plot(np.log(train_bhatt_log));
plt.plot(np.log(test_bhatt_log), 'k');
plt.title('bhattacharyya distance train (b) and test (k)');
In [ ]:
pair_dists = np.sqrt(squared_dists_log[-1][np.where(labels_log[-1]==1)])
non_pair_dists = np.sqrt(squared_dists_log[-1][np.where(labels_log[-1]==0)])
L1 = pair_loss.eval(feed_dict={x_A:batch[0], x_B:batch[1], y_:batch[2]})
L2 = non_pair_loss.eval(feed_dict={x_A:batch[0], x_B:batch[1], y_:batch[2]})
bins = np.arange(0,10,0.4)
plt.figure(figsize=(15,5))
plt.subplot(121)
plt.hist(non_pair_dists, bins=bins, alpha=0.5);
plt.hist(pair_dists, bins=bins, color='r', alpha=0.5);
plt.subplot(143)
plt.boxplot([non_pair_dists, pair_dists]);
print('bhatt =', approx_bhattacharyya(squared_dists_log[-1], labels_log[-1]))
In [ ]:
test_squared_dists = squared_errors.eval(feed_dict={x_A:X_A_T, x_B:X_B_T})
test_squared_dists = np.sum(test_squared_dists, axis=1)
test_pair_dists = np.sqrt(test_squared_dists[np.where(Y_T==1)[0]])
test_non_pair_dists = np.sqrt(test_squared_dists[np.where(Y_T==0)[0]])
bins = np.arange(0,10,0.4)
plt.figure(figsize=(15,5))
plt.subplot(121)
plt.hist(test_non_pair_dists, bins=bins, alpha=0.5);
plt.hist(test_pair_dists, bins=bins, color='r', alpha=0.5);
plt.subplot(143)
plt.boxplot([test_non_pair_dists, test_pair_dists]);
print('bhatt =', approx_bhattacharyya(test_squared_dists, Y_T.flatten()))
Run a cover detection experiment on some of the Second Hand Song data, using the modules implemented before. We simply pass fingerprint()
, a wrapper function around out_I_A.eval()
, to main.run_leave_one_out_experiment()
.
First, however, we compute some baseline performances using existing fingerprinting methods (see fingerprints
documentation).
In [ ]:
import main
import fingerprints as fp
In [ ]:
results = main.run_leave_one_out_experiment(test_cliques,
fp_function=fp.cov,
print_every=50)
print('results:', results)
In [ ]:
results = main.run_leave_one_out_experiment(test_cliques,
fp_function=fp.fourier,
print_every=50)
print('results:', results)
In [ ]:
def fingerprint(chroma, n_patches=8, patch_len=64):
patchwork = paired_data.patchwork(chroma, n_patches=n_patches,
patch_len=patch_len)
fps = []
for i in range(12):
patchwork_trans = np.roll(patchwork, -i, axis=1)
patchwork_tensor = patchwork_trans.reshape((1, n_patches*patch_len, 12))
fp = sess.run(out_I_A, feed_dict={x_A : patchwork_tensor})
fps.append(fp.flatten())
return fps
In [ ]:
results = main.run_leave_one_out_experiment(test_cliques,
fp_function=fingerprint,
print_every=50)
print('results:', results)
In [ ]: