Trajectory analysis becomes very important for a wide range of such actual problems as drivers classification by driving style, distinguish one driver for another, etc. In this article I want to show an example of very basic analysis of trajectories as curves (without any map connection), but that could be usefull in other tasks as well.
More details you can find here: https://www.datacrucis.com/research.html
In [17]:
import numpy as np
import pandas as pd
tracks = pd.read_csv("trajectory.csv")
print tracks
In [18]:
import matplotlib.pyplot as plt
%matplotlib inline
tracks = np.array(tracks)
x, y = tracks.T
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(x, y, 'b-', label='path')
plt.legend(loc='best')
Out[18]:
In [19]:
from itertools import tee, izip
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return izip(a, b)
def distance(start, end):
"""Euclidean distance between start and end
"""
value = np.sqrt( (start[0] - end[0]) ** 2 + (start[1] - end[1]) ** 2)
return round(value, 3)
def get_distances(tracks):
"""Calculate distances between tracked positions
"""
distances = []
for track in pairwise(tracks):
start, end = track
if end is None:
continue
dist = distance(start, end)
distances.append(dist)
return np.array(distances)
In [20]:
dist = get_distances(tracks)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(dist, 'b-', label='distances')
plt.legend(loc='best')
Out[20]:
In [21]:
def is_outlier(value, p25, p75):
"""Check if value is an outlier
"""
lower = p25 - 1.5 * (p75 - p25)
upper = p75 + 1.5 * (p75 - p25)
return value <= lower or value >= upper
def get_outliers(distances):
"""Get outlier indices (if any)
"""
p25 = np.percentile(distances, 25)
p75 = np.percentile(distances, 75)
outliers = []
for ind, dist in enumerate(distances):
if is_outlier(dist, p25, p75):
outliers.append(ind)
return outliers
def remove_outliers(tracks, outliers):
"""Remove outliers from the tracked positions
"""
return np.delete(tracks, outliers, 0)
In [22]:
outliers_indices = get_outliers(dist)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(dist, 'b-', label='distances')
ax.plot(outliers_indices, dist[outliers_indices], 'ro', markersize = 7, label='outliers')
plt.legend(loc='best')
Out[22]:
In [23]:
dist_without_outliers = remove_outliers(dist, outliers_indices)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(dist_without_outliers, 'b-', label='cleared distances')
plt.legend(loc='best')
Out[23]:
In [24]:
from rdp import rdp
simplified_tracks = rdp(tracks, epsilon=200)
sx, sy = simplified_tracks.T
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(x, y, 'r--', label='path')
ax.plot(sx, sy, 'b-', label='simplified path')
plt.legend(loc='best')
Out[24]:
In [25]:
def angle(directions):
"""Returns the angles between vectors
"""
vec2 = directions[1:]
vec1 = directions[:-1]
norm1 = np.sqrt((vec1 ** 2).sum(axis=1))
norm2 = np.sqrt((vec2 ** 2).sum(axis=1))
cos = (vec1 * vec2).sum(axis=1) / (norm1 * norm2)
return np.arccos(cos)
min_angle = np.pi / 5.0
# compute the direction vectors on the simplified_tracks
directions = np.diff(simplified_tracks, axis=0)
theta = angle(directions)
# Select the index of the points with the greatest theta
# Large theta is associated with greatest change in direction.
idx = np.where(theta > min_angle)[0] + 1
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(sx, sy, 'gx-', label='simplified path')
ax.plot(sx[idx], sy[idx], 'ro', markersize = 7, label='turning points')
plt.legend(loc='best')
Out[25]:
In [26]:
def smooth(x, window_len=11, window='hanning'):
"""smooth the data using a window with requested size.
This method is based on the convolution of a scaled window with the signal.
The signal is prepared by introducing reflected copies of the signal
(with the window size) in both ends so that transient parts are minimized
in the begining and end part of the output signal.
input:
x: the input signal
window_len: the dimension of the smoothing window; should be an odd integer
window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
flat window will produce a moving average smoothing.
output:
the smoothed signal
example:
t=linspace(-2,2,0.1)
x=sin(t)+randn(len(t))*0.1
y=smooth(x)
see also:
numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
scipy.signal.lfilter
TODO: the window parameter could be the window itself if an array instead of a string
NOTE: length(output) != length(input), to correct this: return y[(window_len/2-1):-(window_len/2)] instead of just y.
"""
if x.ndim != 1:
raise ValueError, "smooth only accepts 1 dimension arrays."
if x.size < window_len:
raise ValueError, "Input vector needs to be bigger than window size."
if window_len<3:
return x
if not window in ['flat', 'hanning', 'hamming', 'bartlett', 'blackman']:
raise ValueError, "Window is on of 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'"
s = np.r_[x[window_len-1:0:-1], x , x[-1:-window_len:-1]]
if window == 'flat': #moving average
w=np.ones(window_len,'d')
else:
w=eval('np.'+window+'(window_len)')
y=np.convolve(w/w.sum(),s,mode='valid')
return y[(window_len/2-1):-(window_len/2)]
dist_smoothed = smooth(dist_without_outliers, int(len(dist_without_outliers) / 25.0), 'hamming')
plt.plot(dist_smoothed, 'r-', lw=1, label="Smoothed")
plt.plot(dist_without_outliers, 'b--', lw=1, label="Original")
plt.legend(loc='best')
Out[26]:
In [27]:
from scipy.signal import argrelextrema
local_max, = argrelextrema(dist_smoothed, np.greater, order=15)
local_min, = argrelextrema(dist_smoothed, np.less, order=15)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(dist_smoothed, 'b-', label='smoothed')
ax.plot(local_min, dist_smoothed[local_min], 'ro', markersize = 5, label='min')
ax.plot(local_max, dist_smoothed[local_max], 'go', markersize = 5, label='max')
plt.legend(loc='best')
Out[27]:
In [28]:
def get_intervals(local_min, local_max, total):
accelerations = []
decelerations = []
extremums = np.append(local_min, local_max)
extremums = np.sort(extremums)
index = None
for i in extremums:
if index is None:
index = i
continue
interval = (index, i)
index = i
if i in local_min:
decelerations.append(interval)
elif i in local_max:
accelerations.append(interval)
return accelerations, decelerations
ac, dc = get_intervals(local_min, local_max, len(dist_smoothed))
print "acceleration:", ac
print "deceleration:", dc
In [34]:
def get_acceleration_stats(d_smoothed, ac, delta=1):
"""Calculate statistics of acceleration: lengths, distances etc.
"""
lengths = []
distances = []
speeds = []
accelerations = []
for interval in ac:
start, end = interval
acc_length = np.abs(end - start) * delta
acc_distance = d_smoothed[end] - d_smoothed[start]
speed = acc_distance / acc_length
acc = [
np.abs(d_smoothed[i+1] - d_smoothed[i]) / (2.0 * delta) for i in xrange(start, end - 1)
]
lengths.append(acc_length)
distances.append(acc_distance)
speeds.append(speed)
if acc:
accelerations.append(np.mean(acc))
return lengths, distances, speeds, accelerations
delta = 1.0 # 1sec between positions
acc_len, acc_dist, acc_sp, acc_avg = get_acceleration_stats(dist_smoothed, ac, delta)
print (np.mean(acc_len) if acc_len else 0.0,
np.mean(acc_dist) if acc_dist else 0.0,
np.mean(acc_sp) if acc_sp else 0.0)
In [35]:
import heapq
def get_top_freq(freq, amplitude, n=5):
local_max, = argrelextrema(amplitude, np.greater, order=15)
ampl_max = amplitude[local_max]
freq_max = freq[local_max]
idx = heapq.nlargest(n, range(ampl_max.size), ampl_max.take)
return np.array(zip(freq_max[idx], ampl_max[idx]))
def get_fft(signal, f_s=1.0):
fft = np.abs(np.fft.fft(signal))
freq = np.fft.fftfreq(fft.size, 1.0 / f_s)
half_n = np.ceil(fft.size / 2.0)
fft_half = (2.0 / fft.size) * fft[:half_n]
freq_half = freq[:half_n]
return freq_half, fft_half
In [36]:
def calculate_acceleration(velocity):
acceleration = []
for speed in pairwise(velocity):
start, end = speed
if end is None:
continue
value = (end - start) / 1.0
acceleration.append(value)
return acceleration
acc = calculate_acceleration(dist_without_outliers)
freq, fft = get_fft(acc, 1.0)
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(freq, fft, 'b-', label='Frequencies')
plt.legend(loc='best')
Out[36]:
In [37]:
top_freq = get_top_freq(freq, fft, n=10)
fq, ampl = top_freq.T
fig = plt.figure()
ax = fig.add_subplot(111)
ax.plot(freq, fft, 'b-', label='Frequencies')
ax.plot(fq, ampl, 'go', markersize = 5, label='Top freq')
plt.legend(loc='best')
Out[37]: