In [116]:
import threading
from collections import deque
from IPython.display import display
import ipywidgets as widgets
import time
import websocket
from rsplib.processing import RSPSource
import json

Out Stream


In [122]:
stream0 = RSPSource("http://aarhustrafficdata182955", 4000);
stream0.location()


Out[122]:
'ws://aarhustrafficdata182955:4040/primus'

In [123]:
buffer = deque([], 10)

In [124]:
def on_message(ws, message):
    buffer.append(message)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    def run(*args):
        for i in range(3):
            time.sleep(1)
            ws.send("Hello %d" % i)
        time.sleep(1)
        ws.close()
        print("thread terminating...")


def proc(on_open, on_error, on_message, on_close):
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(stream0.location(),
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

thread = threading.Thread(target=proc, args=(on_open, on_error, on_message, on_close,))
thread.start()


--- request header ---
GET /primus HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: aarhustrafficdata182955:4040
Origin: http://aarhustrafficdata182955:4040
Sec-WebSocket-Key: AaDOeTZf3qz0r7zF0TGXRg==
Sec-WebSocket-Version: 13


-----------------------
--- response header ---
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: P32MzlZsRo55up1Wo08yB9kWp28=
-----------------------

Visualizing a data example


In [126]:
json.loads(buffer[0])


Out[126]:
{'@context': {'ct': 'http://www.insight-centre.org/citytraffic#',
  'generatedAt': {'@id': 'http://www.w3.org/ns/prov#generatedAtTime',
   '@type': 'http://www.w3.org/2001/XMLSchema#dateTime'},
  'prov': 'http://www.w3.org/ns/prov#',
  'ses': 'http://www.insight-centre.org/dataset/SampleEventService#',
  'sld': 'http://www.streamreasoning.org/sld#',
  'ssn': 'http://purl.oclc.org/NET/ssnx/ssn#',
  'xsd': 'http://www.w3.org/2001/XMLSchema#'},
 '@graph': [{'@id': 'ses:Property-11d60592-7d15-4934-9e23-f1b38df90fd5',
   '@type': ['ct:CongestionLevel', 'ct:CongestionLevelA']},
  {'@id': 'http://www.rsp-lab.org/triplewave/citybench/AarhusTrafficData182955/20772458-CL93325975236c24a409b131f665903002',
   '@type': 'ssn:Observation',
   'ct:hasValue': {'@type': 'xsd:decimal', '@value': '0.0313111545988258'},
   'ssn:observedBy': {'@id': 'ses:AarhusTrafficData182955'},
   'ssn:observedProperty': {'@id': 'ses:Property-11d60592-7d15-4934-9e23-f1b38df90fd5'}}],
 '@id': 'http://aarhustrafficdata182955:4000/20772458-CL93325975236c24a409b131f665903002',
 'prov:generatedAtTime': {'@type': 'xsd:dateTime',
  '@value': '2017-11-04T17:14:59.777+0000'},
 'sld:eventTime': {'@type': 'xsd:dateTime', '@value': '2014-08-01T12:45:00.0'}}

Wait for it...data is changed


In [129]:
json.loads(buffer[0])


Out[129]:
{'@context': {'ct': 'http://www.insight-centre.org/citytraffic#',
  'generatedAt': {'@id': 'http://www.w3.org/ns/prov#generatedAtTime',
   '@type': 'http://www.w3.org/2001/XMLSchema#dateTime'},
  'prov': 'http://www.w3.org/ns/prov#',
  'ses': 'http://www.insight-centre.org/dataset/SampleEventService#',
  'sld': 'http://www.streamreasoning.org/sld#',
  'ssn': 'http://purl.oclc.org/NET/ssnx/ssn#',
  'xsd': 'http://www.w3.org/2001/XMLSchema#'},
 '@id': 'http://aarhustrafficdata158505:4001/20771289-ETc3bc2283801c850ecb892479c22d6d67',
 'prov:generatedAtTime': {'@type': 'xsd:dateTime',
  '@value': '2017-11-04T17:15:26.186+0000'},
 'sld:eventTime': {'@type': 'xsd:dateTime', '@value': '2014-08-01T12:35:00.0'}}

In [ ]: