This notebook will display an auto-updating plot of prices as they are being generated by the example binning stream processor.
In addition to the requirements for the Winton Kafka Streams code, these libraries must also be installed:
These libraries can be installed manually or using pip: pip install .[binning_example]
Once installed, run these two commands in examples/binning/ :
The -rt argument will generate the prices in real-time. The prices can also be generated faster than realitime with the flag "-rtm
In [ ]:
import datetime
from pandas import Timestamp
from ipywidgets import interact
from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure
from bokeh.io import push_notebook, show, output_notebook
from confluent_kafka import Consumer, KafkaError
In [ ]:
output_notebook()
In [ ]:
# only plot prices for one symbol
symbol = 'A.N.Other-Corp'
In [ ]:
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test-group',
'default.topic.config': {'auto.offset.reset': 'earliest'}})
consumer.subscribe(['prices', 'bin-prices'])
In [ ]:
price_figure = figure(title=symbol, plot_height=300,
plot_width=600, y_range=(90, 110), x_axis_type='datetime')
price_figure.xaxis.axis_label = 'Time'
price_figure.yaxis.axis_label = 'Price'
price_data = ColumnDataSource(data=dict(x=[datetime.datetime(2017,1,1)], y=[100]))
price_line = price_figure.line(x="x", y="y", color="blue", source=price_data, legend='Price')
bin_data = ColumnDataSource(data=dict(x=[], y=[]))
bin_circle = price_figure.circle(x="x", y="y", color="red", source=bin_data, legend='Binned price')
handle = show(price_figure, notebook_handle=True)
xp, yp= [], []
updated_price_data = dict(x=xp, y=yp)
xb, yb= [], []
updated_bin_data = dict(x=xb, y=yb)
In [ ]:
def process_price(msg, x, y, updated_data, price_data):
dt, sym, prc = msg.value().decode("utf-8").split(',')
if sym == symbol:
dt = Timestamp(dt).to_pydatetime()
prc = float(prc)
x.append(dt)
y.append(prc)
updated_data['x'] = x
updated_data['y'] = y
price_data.stream(updated_data, len(x))
def process_bin(msg, x, y, updated_data, bin_data):
prc = float(msg.value().decode("utf-8"))
dt, sym = msg.key().decode("utf-8").split(',')
if sym == symbol:
x.append(Timestamp(dt).to_pydatetime())
y.append(prc)
updated_data['x'] = x
updated_data['y'] = y
bin_data.stream(updated_data, len(x))
last_date = None
running = True
while running:
msg = consumer.poll()
if not msg.error():
#print(f'Received message: {msg.value().decode("utf-8")}')
if msg.topic() == 'prices':
process_price(msg, xp, yp, updated_price_data, price_data)
elif msg.topic() == 'bin-prices':
process_bin(msg, xb, yb, updated_bin_data, bin_data)
push_notebook(handle=handle)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()
In [ ]:
In [ ]: