Learning about Dynamic Time Warping
Launching off of Nipun Bantra's excellent page at http://nipunbatra.github.io/2014/07/dtw/
The beginning of this is taken directly from his page.
In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
In [7]:
x = np.array([1, 1, 2, 3, 2, 0])
y = np.array([0, 1, 1, 2, 3, 2, 1])
In [8]:
plt.plot(x,'r', label='x')
plt.plot(y, 'g', label='y')
plt.legend();
In [9]:
distances = np.zeros((len(y), len(x)))
In [10]:
distances
Out[10]:
In [11]:
for i in range(len(y)):
for j in range(len(x)):
distances[i,j] = (x[j]-y[i])**2
In [12]:
distances
Out[12]:
In [13]:
def distance_cost_plot(distances):
im = plt.imshow(distances, interpolation='nearest', cmap='Reds')
plt.gca().invert_yaxis()
plt.xlabel("X")
plt.ylabel("Y")
plt.grid()
plt.colorbar();
In [14]:
distance_cost_plot(distances)
In [15]:
accumulated_cost = np.zeros((len(y), len(x)))
In [16]:
accumulated_cost[0,0] = distances[0,0]
In [17]:
for i in range(1, len(x)):
accumulated_cost[0,i] = distances[0,i] + accumulated_cost[0, i-1]
In [18]:
accumulated_cost
Out[18]:
In [19]:
for i in range(1, len(x)):
accumulated_cost[0,i] = distances[0,i] + accumulated_cost[0, i-1]
In [20]:
for i in range(1, len(y)):
for j in range(1, len(x)):
accumulated_cost[i, j] = min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]) + distances[i, j]
In [21]:
path = [[len(x)-1, len(y)-1]]
i = len(y)-1
j = len(x)-1
while i>0 and j>0:
if i==0:
j = j - 1
elif j==0:
i = i - 1
else:
if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
i = i - 1
elif accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
j = j-1
else:
i = i - 1
j= j- 1
path.append([j, i])
path.append([0,0])
In [22]:
path
Out[22]:
In [23]:
path_x = [point[0] for point in path]
path_y = [point[1] for point in path]
In [24]:
distance_cost_plot(accumulated_cost)
plt.plot(path_x, path_y);
In [25]:
def path_cost(x, y, accumulated_cost, distances):
path = [[len(x)-1, len(y)-1]]
cost = 0
i = len(y)-1
j = len(x)-1
while i>0 and j>0:
if i==0:
j = j - 1
elif j==0:
i = i - 1
else:
if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
i = i - 1
elif accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
j = j-1
else:
i = i - 1
j= j- 1
path.append([j, i])
path.append([0,0])
for [y, x] in path:
cost = cost +distances[x, y]
return path, cost
In [26]:
path, cost = path_cost(x, y, accumulated_cost, distances)
print path
print cost
In [27]:
x = np.random.random_sample((60,))
In [28]:
y = np.random.random_sample((60,))
In [39]:
def process(x, y):
distances = np.zeros((len(y), len(x)))
for i in range(len(y)):
for j in range(len(x)):
distances[i,j] = (x[j]-y[i])**2
accumulated_cost = np.zeros((len(y), len(x)))
for i in range(1, len(y)):
for j in range(1, len(x)):
accumulated_cost[i, j] = min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]) + distances[i, j]
path = [[len(x)-1, len(y)-1]]
i = len(y)-1
j = len(x)-1
while i>0 and j>0:
if i==0:
j = j - 1
elif j==0:
i = i - 1
else:
if accumulated_cost[i-1, j] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
i = i - 1
elif accumulated_cost[i, j-1] == min(accumulated_cost[i-1, j-1], accumulated_cost[i-1, j], accumulated_cost[i, j-1]):
j = j-1
else:
i = i - 1
j= j- 1
path.append([j, i])
path.append([0,0])
path, cost = path_cost(x, y, accumulated_cost, distances)
return path, cost
In [40]:
%timeit process(x, y)
The mlpy package used by Nipun is out of date so use a different package. Following is adapted from the dtw example page.
In [ ]:
from dtw import dtw
In [30]:
plot(x)
plot(y)
Out[30]:
In [31]:
dist, cost, path = dtw(x, y)
In [32]:
dist
Out[32]:
In [34]:
imshow(cost.T, origin='lower', cmap=cm.gray, interpolation='nearest')
plot(path[0], path[1], 'w')
xlim((-0.5, cost.shape[0]-0.5))
ylim((-0.5, cost.shape[1]-0.5))
Out[34]:
This is silly data but what I really want to do is to measure the performance of the implementation.
In [35]:
%timeit dtw(x, y)
In [45]:
dtw(x, y)[0]
Out[45]:
In [44]:
process(x, y)[1]
Out[44]:
Hrm, our previous test was invalid?! We will have to go back and debug that code later. Lets get a feeling for the magnitude of costs on situations we understand.
In [49]:
angles_rads = np.linspace(0, 6.28, 100)
In [75]:
costs = []
x = np.sin(angles_rads)
for angle in range(360):
y = np.sin(angles_rads + angle*3.14/180.0)
costs.append(dtw(x, y)[0])
plot(costs)
Out[75]:
In [76]:
def plot_curves(delta = 0):
x = np.sin(angles_rads)
y = np.sin(angles_rads + delta*3.14/180.0)
plot(x)
plot(y)
print dtw(x, y)[0]
In [79]:
plot_curves(45)
In [80]:
plot_curves(90)
In [ ]: