In [ ]:
from tqdm.notebook import tqdm
import networkx
import csv
import itertools
import collections
import attr
from lxml import etree
import gzip
import tempfile
In [ ]:
def ancestors(elt):
while elt is not None:
yield elt
elt = elt.getparent()
In [ ]:
NSMAP = {
'xsd': 'http://www.w3.org/2001/XMLSchema',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
'v8': 'http://www.thalesgroup.com/rtti/XmlTimetable/v8'
}
# tt_path = '/Volumes/Expansion Desk/data/timetables/20200105020726_v8.xml.gz'
# tt_path = "/Volumes/Expansion Desk/data/timetables/20200209020725_v8.xml.gz"
tt_path = "/Volumes/Expansion Desk/data/timetables/20200307020721_v8.xml.gz"
# route_anchors = ['DEPTFD', 'ABWD', 'BLKHTH',];
# route_anchors = ['EBSFLTI', 'ASHFKY', 'DEAL',];
# route_anchors = ["STPANCI","STFORDI","EBSFLTI","ASHFKY"]
# route_anchors = ['HAYS', 'LNDNBDE'];
route_anchors = ["LNDNBDE", "GRVPK"]
In [ ]:
@attr.s(frozen=True)
class Route(object):
# _atoc, _uid, _s_start, _s_end, _ord, tl
atoc = attr.ib()
uid = attr.ib()
date = attr.ib()
@attr.s(frozen=True)
class RtLocation(object):
tiploc = attr.ib()
public_arrival = attr.ib()
public_departure = attr.ib()
def public_time(self):
return self.public_departure or self.public_arrival
In [ ]:
by_route = {}
with gzip.open(tt_path, 'rb') as f:
f.seek(0, 2)
sz = f.tell()
t = tqdm(total=sz,unit='bytes')
f.seek(0)
for action, elt in (etree.iterparse(f)):
#print(action, elt)
if elt.tag == '{http://www.thalesgroup.com/rtti/XmlTimetable/v8}Journey':
if elt.attrib.get('isPassengerSvc', 'true') == 'false':
continue
# OR|IP|DT
anchors = [next(iter(elt.xpath(
'(./v8:OR | ./v8:IP | ./v8:DT)[@tpl=$tpl]',
tpl=a, namespaces=NSMAP)
), None) for a in route_anchors]
if not all(a is not None for a in anchors):
continue
idxs = [elt.index(a) for a in anchors]
if idxs != sorted(idxs):
continue
rt = Route(elt.attrib['toc'], elt.attrib['uid'], elt.attrib['ssd'])
locations = [RtLocation(l.attrib['tpl'], l.attrib.get('pta', None), l.attrib.get('ptd', None))
for l in elt.xpath('(./v8:OR | ./v8:IP | ./v8:DT)', namespaces=NSMAP)]
by_route[rt] = locations
if elt.getparent() is not None and elt.getparent().tag == '{http://www.thalesgroup.com/rtti/XmlTimetable/v8}PportTimetable':
# https://lxml.de/1.3/parsing.html#iterparse-and-iterwalk
elt.clear()
while elt.getprevious() is not None:
del elt.getparent()[0]
t.n = f.tell()
t.update(0)
len(by_route)
In [ ]:
def anchor_pairs(route_anchors):
return zip([None] + route_anchors, route_anchors + [None])
def match_route(locations):
idx = 0;
chunks = {}
stops = [l.tiploc for l in locations]
for (start, end) in anchor_pairs(route_anchors):
if end is not None:
if end not in stops[idx:]:
# No match
#print("Skip", start, end, stops[idx:])
return
next_idx = stops.index(end, idx)
#print("Idx", (idx,next_idx))
seg = locations[idx:next_idx+1]
idx = next_idx
else:
next_idx = None
#print("Idx", (idx,next_idx))
seg = locations[idx:next_idx]
#print((start,end), seg)
chunks[(start,end)] = seg
return chunks
def route_graph(by_route):
seen = set()
segments = collections.defaultdict(networkx.DiGraph)
for (route, locations) in by_route.items():
chunks = match_route(locations)
if chunks is not None:
for k, seg in chunks.items():
seg = tuple((l.tiploc for l in seg))
if seg not in seen:
seen.add(seg)
g = segments[k]
for (a,b) in zip(seg, seg[1:]):
g.add_edge(a,b)
return segments
def orderings(by_route):
order = []
for (k, g) in route_graph(by_route).items():
seg = list(networkx.topological_sort(g))
order.append((k, seg))
return order
orderings(by_route)
In [ ]:
ordr = orderings(by_route)
for (chunk_key, stns) in ordr:
#print(k, "\t", v)
pass
for (schedule_key, rt) in itertools.islice(by_route.items(), 5):
chunks = match_route(rt)
if chunks is None:
continue
print (schedule_key)
for (chunk_key, stns) in ordr:
print((chunk_key))
chunk_locs = chunks.get(chunk_key, None)
for stop in stns:
location = next(
(l for l in chunk_locs if l.tiploc == stop),
None
)
#print(location)
if location is not None:
print ("{:>10}\t{:8}".format(
stop,
location.public_time() or '–')) # or location.passed
else:
print("{:>10}\t{:8}".format(stop, "–"))
#print()
In [ ]:
def columnify_by_route(by_route):
has_pub_stop = {l.tiploc for ls in by_route.values() for l in ls if l.public_time() is not None}
ordr = [
(k, [s for s in stns if s in has_pub_stop])
for (k, stns) in orderings(by_route)
]
stns_list = []
for (chunk_key, stns) in ordr:
#print("ordr", chunk_key, "\t", stns)
stns_list.extend(stns)
print()
columns_by_route = {}
for (rt, locations) in by_route.items():
chunks = match_route(locations)
if chunks is None:
continue
col = []
for (chunk_key, stns) in ordr:
chunk_locs = chunks.get(chunk_key, None)
for stop in stns:
#print(("In ordr loop", stop, chunk_locs))
location = next(
(l for l in chunk_locs if l.tiploc == stop),
None
)
col.append(location)
#print (ordr, rt, col)
columns_by_route[rt] = col
# Integrity check
for (i, stn) in enumerate(stns_list):
for (rt, ls) in columns_by_route.items():
#print((stns_list, ls))
if ls[i] is not None:
assert ls[i].tiploc == stn, f"Location {ls[i]} should match order:{stn}"
return stns_list, columns_by_route
In [ ]:
def render_by_route(by_route):
stns_list, columns_by_route = columnify_by_route(by_route)
rts = list(columns_by_route.keys())
rts.sort(key=lambda rt: next(l.public_time() for l in columns_by_route[rt] if l.public_time()))
print("_\t", "\t".join(stns_list))
for rt in rts:
col = columns_by_route[rt]
print(rt.uid, "\t".join(
(((location.public_time()) if location is not None else None) or '–')
for location in col
))
In [ ]:
from lxml.builder import E
from IPython.core.display import HTML
def html_by_route(by_route):
stns_list, columns_by_route = columnify_by_route(by_route)
rts = list(columns_by_route.keys())
rts.sort(key=lambda rt: next((rt.date, l.public_time()) for l in columns_by_route[rt] if l and l.public_time()))
rows = []
rows.append(E.tr(*(E.th(x or ' ') for x in [None, None] + stns_list)))
for rt in rts:
col = columns_by_route[rt]
row = [E.th(rt.date), E.th(rt.uid)]
for location in col:
if location:
row.append(E.td(location.public_time() or ' '))
else:
row.append(E.td(" "))
rows.append(E.tr(*row))
return E.table(E.tbody(*rows))
markup = html_by_route(by_route)
with tempfile.NamedTemporaryFile(prefix=",".join(route_anchors), suffix=".tt.html", delete=False) as tmpf:
tmpf.write(etree.tostring(markup))
print(tmpf.name)
HTML(etree.tostring(markup, encoding="unicode"))
In [ ]:
In [ ]: