Learning about Dynamic Time Warping

Launching off of Nipun Bantra's excellent page at http://nipunbatra.github.io/2014/07/dtw/

The beginning of this is taken directly from his page.


In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

In [7]:
x = np.array([1, 1, 2, 3, 2, 0])
y = np.array([0, 1, 1, 2, 3, 2, 1])

In [8]:
plt.plot(x,'r', label='x')
plt.plot(y, 'g', label='y')
plt.legend();



In [9]:
distances = np.zeros((len(y), len(x)))

In [10]:
distances


Out[10]:
array([[ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.],
       [ 0.,  0.,  0.,  0.,  0.,  0.]])

In [11]:
for i in range(len(y)):
    for j in range(len(x)):
        distances[i,j] = (x[j]-y[i])**2

In [12]:
distances


Out[12]:
array([[ 1.,  1.,  4.,  9.,  4.,  0.],
       [ 0.,  0.,  1.,  4.,  1.,  1.],
       [ 0.,  0.,  1.,  4.,  1.,  1.],
       [ 1.,  1.,  0.,  1.,  0.,  4.],
       [ 4.,  4.,  1.,  0.,  1.,  9.],
       [ 1.,  1.,  0.,  1.,  0.,  4.],
       [ 0.,  0.,  1.,  4.,  1.,  1.]])

In [13]:
def distance_cost_plot(distances):
    im = plt.imshow(distances, interpolation='nearest', cmap='Reds') 
    plt.gca().invert_yaxis()
    plt.xlabel("X")
    plt.ylabel("Y")
    plt.grid()
    plt.colorbar();

In [14]:
distance_cost_plot(distances)



In [15]:
accumulated_cost = np.zeros((len(y), len(x)))

In [16]:
accumulated_cost[0,0] = distances[0,0]

In [17]:
for i in range(1, len(x)):
    accumulated_cost[0,i] = distances[0,i] + accumulated_cost[0, i-1]

In [18]:
accumulated_cost


Out[18]:
array([[  1.,   2.,   6.,  15.,  19.,  19.],
       [  0.,   0.,   0.,   0.,   0.,   0.],
       [  0.,   0.,   0.,   0.,   0.,   0.],
       [  0.,   0.,   0.,   0.,   0.,   0.],
       [  0.,   0.,   0.,   0.,   0.,   0.],
       [  0.,   0.,   0.,   0.,   0.,   0.],
       [  0.,   0.,   0.,   0.,   0.,   0.]])

In [19]:
for i in range(1, len(x)):
    accumulated_cost[0,i] = distances[0,i] + accumulated_cost[0, i-1]

In [20]:
for i in range(1, len(y)):
    for j in range(1, len(x)):
        accumulated_cost[i, j] = min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]) + distances[i, j]

In [21]:
path = [[len(x)-1, len(y)-1]]
i = len(y)-1
j = len(x)-1
while i>0 and j>0:
    if i==0:
        j = j - 1
    elif j==0:
        i = i - 1
    else:
        if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
            i = i - 1
        elif accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
            j = j-1
        else:
            i = i - 1
            j= j- 1
    path.append([j, i])
path.append([0,0])

In [22]:
path


Out[22]:
[[5, 6], [4, 5], [3, 4], [2, 3], [1, 2], [1, 1], [0, 1], [0, 0]]

In [23]:
path_x = [point[0] for point in path]
path_y = [point[1] for point in path]

In [24]:
distance_cost_plot(accumulated_cost)
plt.plot(path_x, path_y);



In [25]:
def path_cost(x, y, accumulated_cost, distances):
    path = [[len(x)-1, len(y)-1]]
    cost = 0
    i = len(y)-1
    j = len(x)-1
    while i>0 and j>0:
        if i==0:
            j = j - 1
        elif j==0:
            i = i - 1
        else:
            if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
                i = i - 1
            elif accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
                j = j-1
            else:
                i = i - 1
                j= j- 1
        path.append([j, i])
    path.append([0,0])
    for [y, x] in path:
        cost = cost +distances[x, y]
    return path, cost

In [26]:
path, cost = path_cost(x, y, accumulated_cost, distances)
print path
print cost


[[5, 6], [4, 5], [3, 4], [2, 3], [1, 2], [1, 1], [0, 1], [0, 0]]
2.0

In [27]:
x = np.random.random_sample((60,))

In [28]:
y = np.random.random_sample((60,))

In [39]:
def process(x, y):
    distances = np.zeros((len(y), len(x)))
    for i in range(len(y)):
        for j in range(len(x)):
            distances[i,j] = (x[j]-y[i])**2   
    accumulated_cost = np.zeros((len(y), len(x)))
    for i in range(1, len(y)):
        for j in range(1, len(x)):
            accumulated_cost[i, j] = min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]) + distances[i, j]
    path = [[len(x)-1, len(y)-1]]
    i = len(y)-1
    j = len(x)-1
    while i>0 and j>0:
        if i==0:
            j = j - 1
        elif j==0:
            i = i - 1
        else:
            if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
                 i = i - 1
            elif accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
                 j = j-1
            else:
                 i = i - 1
                 j= j- 1
        path.append([j, i])
    path.append([0,0])
    path, cost = path_cost(x, y, accumulated_cost, distances)
    return path, cost

In [40]:
%timeit process(x, y)


100 loops, best of 3: 5.25 ms per loop

The mlpy package used by Nipun is out of date so use a different package. Following is adapted from the dtw example page.


In [ ]:
from dtw import dtw

In [30]:
plot(x)
plot(y)


Out[30]:
[<matplotlib.lines.Line2D at 0x10a1f9cd0>]

In [31]:
dist, cost, path = dtw(x, y)

In [32]:
dist


Out[32]:
0.097858453002972731

In [34]:
imshow(cost.T, origin='lower', cmap=cm.gray, interpolation='nearest')
plot(path[0], path[1], 'w')
xlim((-0.5, cost.shape[0]-0.5))
ylim((-0.5, cost.shape[1]-0.5))


Out[34]:
(-0.5, 59.5)

This is silly data but what I really want to do is to measure the performance of the implementation.


In [35]:
%timeit dtw(x, y)


10 loops, best of 3: 21.2 ms per loop

In [45]:
dtw(x, y)[0]


Out[45]:
0.097858453002972731

In [44]:
process(x, y)[1]


Out[44]:
2.2707439894707928

Hrm, our previous test was invalid?! We will have to go back and debug that code later. Lets get a feeling for the magnitude of costs on situations we understand.


In [49]:
angles_rads = np.linspace(0, 6.28, 100)

In [75]:
costs = []
x = np.sin(angles_rads)
for angle in range(360):
    y = np.sin(angles_rads + angle*3.14/180.0)
    costs.append(dtw(x, y)[0])
plot(costs)


Out[75]:
[<matplotlib.lines.Line2D at 0x10aedd690>]

In [76]:
def plot_curves(delta = 0):
    x = np.sin(angles_rads)
    y = np.sin(angles_rads + delta*3.14/180.0)
    plot(x)
    plot(y)
    print dtw(x, y)[0]

In [79]:
plot_curves(45)


0.0534593874045

In [80]:
plot_curves(90)


0.129477346985

In [ ]: