In [2]:
import threading
from collections import deque
from IPython.display import display
import ipywidgets as widgets
import time
import websocket
from rsplib.processing import RSPSource
import json, requests

Out Stream


In [7]:
output_sream = requests.get('http://jasper:8183/queries/ct/observers/default').json()
output_sream['sld:streamLocation']


Out[7]:
'ws://jasper:8283/ct/results'

In [8]:
buffer = deque([], 10)

In [9]:
def on_message(ws, message):
    buffer.append(message)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    def run(*args):
        for i in range(3):
            time.sleep(1)
            ws.send("Hello %d" % i)
        time.sleep(1)
        ws.close()
        print("thread terminating...")


def proc(on_open, on_error, on_message, on_close):
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp(output_sream['sld:streamLocation'],
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

thread = threading.Thread(target=proc, args=(on_open, on_error, on_message, on_close,))
thread.start()


--- request header ---
GET /ct/results HTTP/1.1
Upgrade: websocket
Connection: Upgrade
Host: jasper:8283
Origin: http://jasper:8283
Sec-WebSocket-Key: NmWmwbua7zAZ3sD8CGmgjQ==
Sec-WebSocket-Version: 13


-----------------------
--- response header ---
HTTP/1.1 101 Web Socket Protocol Handshake
Connection: Upgrade
Sec-WebSocket-Accept: 59fbJ1AjOgKcNRuU6w15t1oS4M4=
Upgrade: websocket
-----------------------

Visualizing a data example


In [10]:
json.loads(buffer[0])


Out[10]:
{'@context': {'ct': 'http://www.insight-centre.org/citytraffic#'},
 '@id': 'http://www.insight-centre.org/dataset/SampleEventService#Property-b9f96475-bd7f-4868-8a3b-4d01ff8f9359',
 '@type': 'ct:CongestionLevelT'}

In [ ]: