In [1]:
import tqdm
import tarfile
import pandas
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import PIL
import gzip
from urllib.request import urlopen
%matplotlib inline
matplotlib.style.use('ggplot')
# progress bar
tqdm.tqdm.pandas()
# 檔案名稱格式
filename_format="M06A_{year:04d}{month:02d}{day:02d}.tar.gz".format
xz_filename_format="xz/M06A_{year:04d}{month:02d}{day:02d}.tar.xz".format
csv_format = "M06A/{year:04d}{month:02d}{day:02d}/{hour:02d}/TDCS_M06A_{year:04d}{month:02d}{day:02d}_{hour:02d}0000.csv".format
# 設定欄位名稱
M06A_fields = ['VehicleType',
'DetectionTime_O','GantryID_O',
'DetectionTime_D','GantryID_D ',
'TripLength', 'TripEnd', 'TripInformation']
import datetime
# 用來解析時間格式
def strptime(x):
return datetime.datetime(int(x[:4]), int(x[5:7]), int(x[8:10]),
int(x[11:13]), int(x[14:16]), int(x[17:19]) )
def parse_tripinfo(tripinfo):
split1 = tripinfo.split("; ")
return [(strptime(t), t[20:]) for t in split1]
In [2]:
speed_limit_url = "http://tisvcloud.freeway.gov.tw/roadlevel_info.xml.gz"
speed_data = gzip.decompress(urlopen(speed_limit_url).read()).decode('utf8')
In [3]:
from bs4 import BeautifulSoup
speed_data_soup = BeautifulSoup(speed_data, "xml")
In [4]:
route_info = pandas.DataFrame([x.attrs for x in speed_data_soup.Infos.find_all('Info')])
route_info
Out[4]:
In [5]:
route_info = route_info.query("roadtype=='1'")[["fromkm","tokm","locationpath","speedlimit"]]
route_info
Out[5]:
In [6]:
set(route_info.locationpath)
Out[6]:
In [7]:
tmc_data = pandas.read_excel("http://61.57.40.123/RDS/20161129LocationTable.xlsx", sheetname="TMC")
tmc_data.head(10)
Out[7]:
In [8]:
road_name={}
for n in set(route_info.locationpath):
CROAD_NAME = set(tmc_data[tmc_data["LINEAR REFERENCE"]==int(n)].CROAD_NAME)
print(n, CROAD_NAME)
road_name[int(n)]=list(CROAD_NAME)[0]
In [9]:
# 把 km 表示法轉成數字 單位是 km/10
def convert_km(s):
a,b = s.split("K+")
return int(a)*10+int(b)/100
route_info['fromkm'] = route_info.fromkm.apply(convert_km)
route_info['tokm'] = route_info.tokm.apply(convert_km)
In [10]:
# 用來簡化速限範圍,把連續而且速限相同的區間連成一塊
def simplify(points, speeds):
# remove duplicates
r_points, r_speeds = [], []
last_s = None
for p, s in zip(points,speeds):
if s == last_s:
continue
r_points.append(p)
r_speeds.append(s)
last_s = s
return r_points, r_speeds
# 另外一個作法是用 numpy 方式
def np_simplify(points, speeds):
speeds = np.array([-1]+list(speeds))
idx = speeds[:-1] != speeds[1:]
return np.array(points)[idx], speeds[1:][idx]
In [11]:
# 速限分析, binary search 用的
southbound_speedlimit = {}
northbound_speedlimit = {}
def __process_speedlimit(lower_km, higher_km, speedlimit):
assert list(lower_km[1:])==list(higher_km[:-1]) #假設區段全部連在一起
nodes = list(lower_km)
speeds = map(int, speedlimit)
nodes, speeds = simplify(nodes,speeds)
nodes = nodes + list(higher_km[-1:]+0.5)
return nodes, speeds
for n in set(route_info.locationpath):
info = route_info[route_info.locationpath==n]
n = int(n)
print(n, road_name[n])
# southbound
infos = info.query("fromkm<tokm").sort_values("fromkm")
nodes, speeds = __process_speedlimit(infos.fromkm, infos.tokm, infos.speedlimit)
print("south", nodes, speeds)
southbound_speedlimit[n]=(nodes, [110]+speeds+[110]) # 超出範圍時,保守估計
# northbound
infon = info.query("fromkm>tokm").sort_values("fromkm")
nodes, speeds = __process_speedlimit(infon.tokm, infon.fromkm, infon.speedlimit)
print("north", nodes, speeds)
northbound_speedlimit[n]=(nodes, [110]+speeds+[110])
In [12]:
# 偵測站資料
node_data_url = "http://www.freeway.gov.tw/Upload/DownloadFiles/%e5%9c%8b%e9%81%93%e8%a8%88%e8%b2%bb%e9%96%80%e6%9e%b6%e5%ba%a7%e6%a8%99%e5%8f%8a%e9%87%8c%e7%a8%8b%e7%89%8c%e5%83%b9%e8%a1%a8104.09.04%e7%89%88.csv"
node_data = pandas.read_csv(urlopen(node_data_url), encoding='big5', header=1)
# 簡單清理資料
node_data = node_data[node_data["方向"].apply(lambda x:x in 'NS')]
node_data.head(5)
Out[12]:
In [13]:
# 手工寫的對照表
nodeprefix_to_routeid={
'01F': 166, # 國道一號
'01H': 147, #汐止五股高架段
'03F': 28, # 國道三號
'03A': 143, #國三甲
'05F': 154, #國道五號
}
In [14]:
# 清理一下 node_data 的編號格式
node_data["編號"] = node_data["編號"].apply(lambda x:x[:3]+x[4:7]+x[8:])
node_data.index = node_data.編號
node_data.head(10)
Out[14]:
In [15]:
# 計算
from bisect import bisect
def node_code_to_speedlimit(node_code):
if node_code[3]=='R':
return 110
prefix = node_code[:3]
routeid = nodeprefix_to_routeid[prefix]
speedlimit_table = southbound_speedlimit if node_code[-1]=='S' else northbound_speedlimit
location = int(node_code[3:7])
nodes, speeds = speedlimit_table[routeid]
i = bisect(nodes, location)
return speeds[i]
node_data['速限'] = node_data.編號.apply(node_code_to_speedlimit)
In [16]:
## 順便存一下檔案
node_data.to_json('node_data.json')
pandas.DataFrame(
{k:southbound_speedlimit[v] + northbound_speedlimit[v]
for k,v in nodeprefix_to_routeid.items() if v in southbound_speedlimit},
index=["south_loc", "south_speed", "north_loc", "north_speed"]).to_json("speedlimit.json")
In [17]:
# 這個改了一點, SpeedInfo 這個 named tuple 同時紀錄了區段的起始時間和兩端點
from collections import namedtuple
SpeedInfo = namedtuple("SpeedInfo", ["speed", "time","loc1", "loc2"])
def compute_speed(trip):
rtn = []
for (t1,n1), (t2,n2) in zip(trip[:-1], trip[1:]):
if n1[:3] != n2[:3] or n1[-1]!=n2[-1]:
continue
if n1[3]=='R' or n2[3]=='R':
continue
km1 = int(n1[3:-1])/10
km2 = int(n2[3:-1])/10
hr_delta = (t2-t1).total_seconds()/60/60
speed = abs(km2-km1)/hr_delta
rtn.append(SpeedInfo(speed, t1, n1, n2))
return rtn
In [18]:
# 打開剛才下載的檔案試試
data_config ={"year":2016, "month":12, "day":18}
tar = tarfile.open(filename_format(**data_config), 'r')
# 如果沒有下載,可以試試看 xz 檔案
#data_dconfig ={"year":2016, "month":11, "day":18}
#tar = tarfile.open(xz_filename_format(**data_config), 'r')
In [19]:
csvs = (tar.extractfile(csv_format(hour=hr, **data_config)) for hr in tqdm.trange(24))
data = pandas.concat([pandas.read_csv(csv, names=M06A_fields) for csv in csvs], ignore_index=True)
print("資料大小", data.shape)
# 檢查異常的資料
print("異常資料數:", data[data.TripEnd == 'N'].shape[0])
# 去除異常資料
data = data[data.TripEnd == 'Y']
# 只保留 TripInformation 和 VehicleType
data = data[['VehicleType', "TripInformation"]]
In [20]:
data['Trip'] = data.TripInformation.progress_apply(parse_tripinfo)
data['Speed'] = data.Trip.progress_apply(compute_speed)
In [21]:
# 要怎麼由 SpeedInfo 來判斷是否超速?
# 如果兩個測速站的速限不同,保守起見,取大的
# 方法一
def is_speeding(info):
speedlimit = node_data["速限"][[info.loc1, info.loc2]].max()
return info.speed > speedlimit + 10
# 方法二 先轉成 dict
speedlimit_lookup = node_data.速限.to_dict()
def is_speeding(info):
speedlimit = max(speedlimit_lookup[info.loc1], speedlimit_lookup[info.loc2])
return info.speed > speedlimit + 10
speeding_idx = data.Speed.progress_apply(lambda l:any(is_speeding(x) for x in l))
data[speeding_idx].head(10)
Out[21]:
In [22]:
num_total = data.shape[0]
num_valid = data.Speed.apply(bool).sum()
num_overspeed = speeding_idx.sum()
num_total, num_valid, num_overspeed, num_overspeed/num_total, num_overspeed/num_valid
Out[22]:
In [23]:
# 加入是否超速的欄位
data['is_speeding'] = speeding_idx
# 依照車輛種類分組
grouped = data.groupby('VehicleType')
# 計算非零的個數
num_total_by_type = grouped.agg(np.count_nonzero)
# 所以 Speed 就是 num_valid, is_speeding 就是 num_overspeeding
num_total_by_type
Out[23]:
In [24]:
# 依照車種計算超速比例
ratio_speeding_valid = lambda x: x.is_speeding/x.Speed
ratios = num_total_by_type.apply(ratio_speeding_valid, axis=1)
ratios
Out[24]:
In [26]:
ax = ratios.plot.bar()
ticks = np.vectorize('{:3.0f}%'.format)(100*ax.get_yticks())
ax.set_yticklabels(ticks);
In [ ]: