In [9]:
import googlemaps
from googlemaps import convert
_ROADS_BASE_URL = "https://roads.googleapis.com"
def snap_to_roads(client, geo, interpolate=False):
"""Snaps a path to the most likely roads travelled.
Takes up to 100 GPS points collected along a route, and returns a similar
set of data with the points snapped to the most likely roads the vehicle
was traveling along.
:param path: The path to be snapped.
:type path: a single location, or a list of locations, where a
location is a string, dict, list, or tuple
:param interpolate: Whether to interpolate a path to include all points
points will also be returned, resulting in a path that smoothly follows
the geometry of the road, even around corners and through tunnels.
Interpolated paths may contain more points than the original path.
:type interpolate: bool
:rtype: A list of snapped points.
"""
params = {"path": encode_latlon(geo)}
if interpolate:
params["interpolate"] = "true"
return client._get("/v1/snapToRoads", params,
base_url=_ROADS_BASE_URL,
accepts_clientid=False,
extract_body=_roads_extract)
def speed_limits(client, place_ids):
"""Returns the posted speed limit (in km/h) for given road segments.
:param place_ids: The Place ID of the road segment. Place IDs are returned
by the snap_to_roads function. You can pass up to 100 Place IDs.
:type place_ids: str or list
:rtype: list of speed limits.
"""
params = [("placeId", place_id) for place_id in convert.as_list(place_ids)]
return client._get("/v1/speedLimits", params,
base_url=_ROADS_BASE_URL,
accepts_clientid=False,
extract_body=_roads_extract)["speedLimits"]
def snapped_speed_limits(client, geo):
"""Returns the posted speed limit (in km/h) for given road segments.
The provided points will first be snapped to the most likely roads the
vehicle was traveling along.
:parampath: The path of points to be snapped.
:type path: a single location, or a list of locations, where a
location is a string, dict, list, or tuple
:rtype: dict with a list of speed limits and a list of the snapped points.
"""
params = {"path": encode_latlon(geo)}
return client._get("/v1/speedLimits", params,
base_url=_ROADS_BASE_URL,
accepts_clientid=False,
extract_body=_roads_extract)
def _roads_extract(resp):
"""Extracts a result from a Roads API HTTP response."""
try:
j = resp.json()
except:
if resp.status_code != 200:
raise googlemaps.exceptions.HTTPError(resp.status_code)
raise googlemaps.exceptions.ApiError("UNKNOWN_ERROR",
"Received a malformed response.")
if "error" in j:
error = j["error"]
status = error["status"]
if status == "RESOURCE_EXHAUSTED":
raise googlemaps.exceptions._RetriableRequest()
if "message" in error:
raise googlemaps.exceptions.ApiError(status, error["message"])
else:
raise googlemaps.exceptions.ApiError(status)
if resp.status_code != 200:
raise googlemaps.exceptions.HTTPError(resp.status_code)
return j
In [10]:
%matplotlib inline
import smopy
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.cm as cm
from googlemaps.client import Client
from json_regularizor import read_snapped_points
filename = '../data/useful/S01_valid_itsc_speed_5_1.csv'
filename
KEY = 'AIzaSyCs9WzwEaqGYHOaCp8zMTJUhwBEO4cL31o'
m_client = Client(key=KEY)
######################################################################
# Construct testing cases.
# data[:, 3] - longitude, data[:, 4] - latitude.
# rt_varible: intersections - a list of intersections grouped by instance
def _construct_cases(filename=filename):
data = np.loadtxt(filename, delimiter=',', skiprows=1)
# print data.shape
# Group by intersection's type
intersections = []
ind = 0
for i in range (1, data.shape[0]):
if data[i, 1] != data[i - 1, 1]:
intersections.append(data[ind: i])
ind = i
elif i == data.shape[0] - 1:
intersections.append(data[ind:])
# print len(intersections)
return intersections
######################################################################
# Convert geo 1 * 2 array to str
# e.g: [100, 50] -> '100, 50'
def _convert_array(arr):
return '%s,%s' % (arr[1], arr[0])
######################################################################
# Compute Euclidean distance
def _distance(p1, p2):
return np.power((p1[0] - p2[0]), 2) + np.power((p1[1] - p2[1]), 2)
######################################################################
# Parse points to make better visual representation
def _parse_points(geo):
if geo.shape[0] == 1 or (geo.shape[0] == 2 and geo[0].all() == geo[1].all()):
return geo
geo_ret = np.array(geo)
for i in range(1, geo.shape[0]):
if geo_ret[i, 0] == geo_ret[i - 1, 0] and geo_ret[i, 1] == geo_ret[i - 1, 1]:
if i != geo_ret.shape[0] - 1:
geo_ret[i] = (geo_ret[i - 1] + geo_ret[i + 1]) / 2
else:
geo_ret[i] = 2 * geo_ret[i - 1] - geo_ret[i - 2]
return geo_ret
######################################################################
# Read ground truths and predictions of points
# itsc_id speed_true speed_pred theta_true theta_pred acc_true acc_pred
def _load_labels(filename='../data/prediction/result.csv'):
data = np.loadtxt(filename, delimiter=',', skiprows=1, dtype=int)
intersections_label = []
ind = 0
for i in range (1, data.shape[0]):
if data[i, 0] != data[i - 1, 0]:
intersections_label.append(data[ind: i])
ind = i
elif i == data.shape[0] - 1:
intersections_label.append(data[ind:])
return intersections_label
######################################################################
# Shape speed, theta, acc consisted with model's rule
def _make_labels(speed, theta, acc):
speed = speed[:].astype(int) / 5
for i in range(speed.shape[0]):
if speed[i] >= 11:
speed[i] = 11
for i in range(theta.shape[0]):
if theta[i] > -90 and theta[i] <= -30:
theta[i] = 0
elif theta[i] <= -10:
theta[i] = 1
elif theta[i] <= -1:
theta[i] = 2
elif theta[i] <= 1:
theta[i] = 3
elif theta[i] <= 10:
theta[i] = 4
elif theta[i] <= 30:
theta[i] = 5
elif theta[i] <= 90:
theta[i] = 6
theta = theta[:].astype(int)
for i in range(acc.shape[0]):
if acc[i] <= -2:
acc[i] = 0
elif acc[i] < 0:
acc[i] = 1
elif acc[i] == 0:
acc[i] = 2
elif acc[i] < 2:
acc[i] = 3
else:
acc[i] = 4
acc = acc[:].astype(int)
return speed, theta, acc
######################################################################
# Encode label to color
def _encode_color(label):
label = label.reshape(-1,)
x = np.linspace(0, 1, 5)
colors = cm.bwr(x)
return np.array([colors[i] for i in label])
######################################################################
# Encode label to arrow length multiplier
def _encode_multiplier(label):
label = label.reshape(-1,).astype(float)
return 1. / 22 * (label + 11)
######################################################################
# Encode label to arrow angle
def _encode_angle(label):
label = label.reshape(-1,)
angle = [30., 10., 5., 0., -5., -10., -30.]
return np.array([angle[i] for i in label])
######################################################################
# Compute the angle with x axis after shifting
def _compute_xangle(angle, x, y):
# print x.ravel(), y.ravel()
# print 'origin:'
# print np.arctan(y/x) / np.pi * 180
# print 'changed:'
# print np.arctan(y/x) / np.pi * 180 + angle
xangle = -np.arctan(y/x) / np.pi * 180 + angle
for i in range(xangle.shape[0]):
if x[i] > 0 and y[i] > 0:
xangle[i] = xangle[i]
elif x[i] > 0 and y[i] < 0:
xangle[i] = xangle[i]
elif x[i] < 0 and y[i] > 0:
xangle[i] = 180 - xangle[i]
elif x[i] < 0 and y[i] < 0:
xangle[i] = 180 + xangle[i]
return xangle
######################################################################
# Encode longitude and latitude into readable form for google API
#
# :geo: the numpy.array consist with file format in $filename,
# geo[:, 0] -> lon
# geo[:, 1] -> lat
def encode_latlon(geo):
geo = geo.astype(str)
return '|'.join([_convert_array(geo[i]) for i in range(geo.shape[0])])
######################################################################
# Visualize testing segment,
# from group_start to group_end
def visualize_cases(client, intersections, group_start, group_end, flag_raw=False):
itsc = figarr = []
intersections_label = _load_labels()
for i in range(group_start, group_end + 1):
print 'ID: %i' % i
instance_id = i
geo = intersections[instance_id][:, 3:5]
# Shift the points according to the error of GPS samples
geo[:, 1] -= 0.00016
# geo = _parse_points(geo)
data = snap_to_roads(client, geo, interpolate=False)
# print data
geo_inter = read_snapped_points(data)
geo_inter = _parse_points(geo_inter)
geo_inter = read_snapped_points(snap_to_roads(client, geo_inter, interpolate=False))
# print geo_inter.shape, geo.shape
if geo.shape != geo_inter.shape:
print "bad match"
continue
if flag_raw == False:
# Need to change here----------------
intersections[instance_id][:, 3:5] = geo_inter
lon_max = max(geo_inter[:, 0])
lon_min = min(geo_inter[:, 0])
lat_max = max(geo_inter[:, 1])
lat_min = min(geo_inter[:, 1])
else:
intersections[instance_id][:, 3:5] = geo
lon_max = max(geo[:, 0])
lon_min = min(geo[:, 0])
lat_max = max(geo[:, 1])
lat_min = min(geo[:, 1])
itsc.append(intersections[instance_id])
# Plot the street map
map = smopy.Map((lat_min, lon_min, lat_max, lon_max), z=18)
# print 'Printing..'
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 12))
# fig, ax1 = plt.subplots(1, 1, figsize=(16, 20))
map.show_mpl(ax=ax1, figsize=(8,10))
map.show_mpl(ax=ax2, figsize=(8,10))
speed = intersections[instance_id][:, -1]
theta = intersections[instance_id][:, -3]
acc = intersections[instance_id][:, -2]
# Full length
speed, theta, acc = _make_labels(speed, theta, acc)
# Using begining 5 frames as input to predict next frame's label,
# so the label_lenght is FEATURE_NAME.length - 5
for tmp in intersections_label:
if tmp[0, 0] == instance_id: break
speed_pred , theta_pred, acc_pred = tmp[:, 2], tmp[:, 4], tmp[:, 6]
speed_pred = np.vstack((speed[:5].reshape(-1, 1), speed_pred.reshape(-1, 1)))
theta_pred = np.vstack((theta[:5].reshape(-1, 1), theta_pred.reshape(-1, 1)))
acc_pred = np.vstack((acc[:5].reshape(-1, 1), acc_pred.reshape(-1, 1)))
speed[5:], theta[5:], acc[5:] = tmp[:, 1], tmp[:, 3], tmp[:, 5]
# print theta_pred
# Transform geometry information into pixels
x, y = map.to_pixels(geo_inter[:, 1], geo_inter[:, 0])
# Scatter the (x, y) points on the map, and color them by acc
ax1.scatter(x, y, c=_encode_color(acc), s=7)
ax2.scatter(x, y, c=_encode_color(acc_pred), s=7)
opt = {
'scale_units':'xy',
'scale':1,
'width':.0010
}
xx = x[1:] - x[:-1]
yy = y[1:] - y[:-1]
xx = np.hstack((xx, xx[-1].reshape(-1,)))
yy = np.hstack((yy, yy[-1].reshape(-1,)))
# Encode arrow length by speed
mul = _encode_multiplier(speed)
mul_pred = _encode_multiplier(speed_pred)
dist = (xx * mul, yy * mul)
dist_pred = (xx * mul_pred, yy * mul_pred)
# Encode arrow angle by theta
angles = _encode_angle(theta)
angles_pred = _encode_angle(theta_pred)
xangles = _compute_xangle(angles, xx, yy)
xangles_pred = _compute_xangle(angles_pred, xx, yy)
# Plot the arrow
ax1.quiver(x, y, dist[0], dist[1], angles=xangles, **opt)
ax1.plot(x, y, 'c', lw=.1)
ax2.quiver(x, y, dist_pred[0], dist_pred[1], angles=xangles_pred, **opt)
ax2.plot(x, y, 'c', lw=.1)
ax1.set_title("Ground Truth", fontsize=10)
ax2.set_title("Prediction", fontsize=10)
# Add color bar
fig.tight_layout()
fig.subplots_adjust(right=0.8)
cax = fig.add_axes([0.85, 0.15, 0.02, 0.7])
m = cm.ScalarMappable(cmap=cm.coolwarm)
m.set_array(np.linspace(0, 4, 4))
cbar = fig.colorbar(m, cax=cax, ticks=range(5))
cbar.ax.set_yticklabels([str(i - 2) for i in range(5)])
fig.suptitle('Intersection ID: %s' % str(int(intersections[instance_id][0, 1])),
fontsize=25)
figarr.append(fig)
# Save the fig
# fig.savefig('../update/pics/model_visualization/visualization_itsc_%i.eps' \
# %intersections[instance_id][0, 1],
# format='eps', dpi=100)
fig.savefig('/Users/CullenGao/Desktop/model_visualization/visualization_itsc_%i.eps' \
%intersections[instance_id][0, 1],
format='eps', dpi=100)
return figarr
######################################################################
# Visualize plotting,
# for speed -> 0 - 11,
# for theta -> -3 - 3,
# for acc -> -2 - 2;
# left - 13 38 52 86 149
# right - 4 5 6 15 135
# visualize_cases(m_client, _construct_cases(), 54, 54, True)
# visualize_cases(m_client, _construct_cases(), 13, 13, False)
# visualize_cases(m_client, _construct_cases(), 38, 38, False)
# visualize_cases(m_client, _construct_cases(), 52, 52, False)
# visualize_cases(m_client, _construct_cases(), 86, 86, False)
# visualize_cases(m_client, _construct_cases(), 149, 149, False)
# visualize_cases(m_client, _construct_cases(), 4, 6, False)
# visualize_cases(m_client, _construct_cases(), 15, 15, False)
# visualize_cases(m_client, _construct_cases(), 135, 135, False)
# visualize_cases(m_client, _construct_cases(), 0, 2, False)
In [3]:
######################################################################
# Testing cases
# from googlemaps import client
# def test_snap_to_road(client, filename):
# itsc = _construct_cases(filename)
# geo = itsc[0][:, 3:5]
# snap_to_roads(client, geo, interpolate=False)
# def test_snapped_speed_limits(client, filename):
# itsc = _construct_cases(filename)
# geo = itsc[0][:, 3:5]
# snapped_speed_limits(client, geo)
# if __name__ == '__main__':
# KEY = 'AIzaSyCs9WzwEaqGYHOaCp8zMTJUhwBEO4cL31o'
# m_client = client.Client(key=KEY)
# print test_snap_to_road(m_client, filename)
# # print '--------------------'
# # print test_snapped_speed_limits(m_client, filename)