In [ ]:
from tqdm.notebook import tqdm
import networkx
import csv
import itertools
import collections
import attr
from lxml import etree
import gzip
import tempfile

In [ ]:
def ancestors(elt):
    while elt is not None:
        yield elt
        elt = elt.getparent()

In [ ]:
NSMAP = {
 'xsd': 'http://www.w3.org/2001/XMLSchema',
 'xsi': 'http://www.w3.org/2001/XMLSchema-instance',
 'v8': 'http://www.thalesgroup.com/rtti/XmlTimetable/v8'
}

# tt_path = '/Volumes/Expansion Desk/data/timetables/20200105020726_v8.xml.gz'
# tt_path = "/Volumes/Expansion Desk/data/timetables/20200209020725_v8.xml.gz"
tt_path = "/Volumes/Expansion Desk/data/timetables/20200307020721_v8.xml.gz"

# route_anchors = ['DEPTFD', 'ABWD', 'BLKHTH',];
# route_anchors = ['EBSFLTI', 'ASHFKY', 'DEAL',];
# route_anchors = ["STPANCI","STFORDI","EBSFLTI","ASHFKY"]
# route_anchors = ['HAYS', 'LNDNBDE'];
route_anchors = ["LNDNBDE", "GRVPK"]

In [ ]:
@attr.s(frozen=True)
class Route(object):
    # _atoc, _uid, _s_start, _s_end, _ord, tl
    atoc = attr.ib()
    uid = attr.ib()
    date = attr.ib()

@attr.s(frozen=True)
class RtLocation(object):
    tiploc = attr.ib()
    public_arrival = attr.ib()
    public_departure = attr.ib()
    
    def public_time(self):
        return self.public_departure or self.public_arrival

In [ ]:
by_route = {}

with gzip.open(tt_path, 'rb') as f:
    f.seek(0, 2)
    sz = f.tell()
    t = tqdm(total=sz,unit='bytes')
    f.seek(0)

    for action, elt in (etree.iterparse(f)):
        #print(action, elt)

        if elt.tag == '{http://www.thalesgroup.com/rtti/XmlTimetable/v8}Journey':
            if elt.attrib.get('isPassengerSvc', 'true') == 'false':
                continue
            # OR|IP|DT
            anchors = [next(iter(elt.xpath(
                '(./v8:OR | ./v8:IP | ./v8:DT)[@tpl=$tpl]',
                tpl=a, namespaces=NSMAP)
                    ), None) for a in route_anchors]
            if not all(a is not None for a in anchors):
                continue
            idxs = [elt.index(a) for a in anchors]
            if idxs != sorted(idxs):
                continue

            rt = Route(elt.attrib['toc'], elt.attrib['uid'], elt.attrib['ssd'])
            locations = [RtLocation(l.attrib['tpl'], l.attrib.get('pta', None), l.attrib.get('ptd', None))
                         for l in elt.xpath('(./v8:OR | ./v8:IP | ./v8:DT)', namespaces=NSMAP)]
            
            by_route[rt] = locations

        if elt.getparent() is not None and elt.getparent().tag == '{http://www.thalesgroup.com/rtti/XmlTimetable/v8}PportTimetable':
            # https://lxml.de/1.3/parsing.html#iterparse-and-iterwalk
            elt.clear()
            while elt.getprevious() is not None:
                del elt.getparent()[0]
        
        t.n = f.tell()
        t.update(0)
len(by_route)

In [ ]:
def anchor_pairs(route_anchors):
    return zip([None] + route_anchors, route_anchors + [None])

def match_route(locations):
    idx = 0;
    chunks = {}
    stops = [l.tiploc for l in locations]
    for (start, end) in anchor_pairs(route_anchors):
        if end is not None:
            if end not in stops[idx:]:
                # No match
                #print("Skip", start, end, stops[idx:])
                return
            next_idx = stops.index(end, idx)
            #print("Idx", (idx,next_idx))
            seg = locations[idx:next_idx+1]
            idx = next_idx
        else:
            next_idx = None
            #print("Idx", (idx,next_idx))
            seg = locations[idx:next_idx]

        #print((start,end), seg)
        chunks[(start,end)] = seg
    return chunks

def route_graph(by_route):
    seen = set()
    segments = collections.defaultdict(networkx.DiGraph)
    for (route, locations) in by_route.items():
        chunks = match_route(locations)
        if chunks is not None:
            for k, seg in chunks.items():
                seg = tuple((l.tiploc for l in seg))
                if seg not in seen:
                    seen.add(seg)
                g = segments[k]
                for (a,b) in zip(seg, seg[1:]):
                    g.add_edge(a,b)
    return segments

def orderings(by_route):
    order = []
    for (k, g) in route_graph(by_route).items():
        seg = list(networkx.topological_sort(g))
        order.append((k, seg))
    return order
orderings(by_route)

In [ ]:
ordr = orderings(by_route)
for (chunk_key, stns) in ordr:
    #print(k, "\t", v)
    pass
    
for (schedule_key, rt) in itertools.islice(by_route.items(), 5):
    chunks = match_route(rt)
    if chunks is None:
        continue
    print (schedule_key)
    for (chunk_key, stns) in ordr:
        print((chunk_key))
        
        chunk_locs = chunks.get(chunk_key, None)
        for stop in stns:
            location = next(
                (l for l in chunk_locs if l.tiploc == stop),
                None
            )
            #print(location)
            if location is not None:
                print ("{:>10}\t{:8}".format(
                    stop,
                    location.public_time() or '–')) # or location.passed
            else:
                print("{:>10}\t{:8}".format(stop, "–"))
    #print()

In [ ]:
def columnify_by_route(by_route):
    has_pub_stop = {l.tiploc for ls in by_route.values() for l in ls if l.public_time() is not None}
    
    ordr = [
        (k, [s for s in stns if s in has_pub_stop])
        for (k, stns) in orderings(by_route)
    ]
    stns_list = []
    for (chunk_key, stns) in ordr:
        #print("ordr", chunk_key, "\t", stns)
        stns_list.extend(stns)

    print()

    columns_by_route = {}
    for (rt, locations) in by_route.items():
        chunks = match_route(locations)
        if chunks is None:
            continue

        col = []
        for (chunk_key, stns) in ordr:
            chunk_locs = chunks.get(chunk_key, None)
            for stop in stns:
                #print(("In ordr loop", stop, chunk_locs))
                location = next(
                    (l for l in chunk_locs if l.tiploc == stop),
                    None
                )
                col.append(location)
        #print (ordr, rt, col)
        columns_by_route[rt] = col
        
    # Integrity check
    for (i, stn) in enumerate(stns_list):
        for (rt, ls) in columns_by_route.items():
            #print((stns_list, ls))
            if ls[i] is not None:
                assert ls[i].tiploc == stn, f"Location {ls[i]} should match order:{stn}"

    return stns_list, columns_by_route

In [ ]:
def render_by_route(by_route):
    stns_list, columns_by_route = columnify_by_route(by_route)
    rts = list(columns_by_route.keys())
    rts.sort(key=lambda rt: next(l.public_time() for l in columns_by_route[rt] if l.public_time()))
    print("_\t", "\t".join(stns_list))
    for rt in rts:
        col = columns_by_route[rt]
        print(rt.uid, "\t".join(
            (((location.public_time()) if location is not None else None) or '–')
            for location in col
        ))

In [ ]:
from lxml.builder import E
from IPython.core.display import HTML


def html_by_route(by_route):
    stns_list, columns_by_route = columnify_by_route(by_route)
    rts = list(columns_by_route.keys())
    rts.sort(key=lambda rt: next((rt.date, l.public_time()) for l in columns_by_route[rt] if l and l.public_time()))
    rows = []
    rows.append(E.tr(*(E.th(x or ' ') for x in [None, None] + stns_list)))

    for rt in rts:
        col = columns_by_route[rt]

        row = [E.th(rt.date), E.th(rt.uid)]
        for location in col:
            if location:
                row.append(E.td(location.public_time() or ' '))
            else:
                row.append(E.td(" "))
        rows.append(E.tr(*row))
        
    return E.table(E.tbody(*rows))

markup = html_by_route(by_route)

with tempfile.NamedTemporaryFile(prefix=",".join(route_anchors), suffix=".tt.html", delete=False) as tmpf:
    tmpf.write(etree.tostring(markup))
    print(tmpf.name)
    

HTML(etree.tostring(markup, encoding="unicode"))

In [ ]:


In [ ]: