Plotting binned generated sample prices.

This notebook will display an auto-updating plot of prices as they are being generated by the example binning stream processor.

In addition to the requirements for the Winton Kafka Streams code, these libraries must also be installed:

  • pandas
  • jupyter
  • bokeh

These libraries can be installed manually or using pip: pip install .[binning_example]

Once installed, run these two commands in examples/binning/ :

  • The binning stream processor:
    • python binning.py
  • The price generator:
    • python generator.py -i A.N.Other-Corp,0.3,123,100.0,0.01 -l 6000 -f 250ms -kb localhost:9092 -kt prices -rt

The -rt argument will generate the prices in real-time. The prices can also be generated faster than realitime with the flag "-rtm " or in bulk by omitting the -rt flag


In [ ]:
import datetime
from pandas import Timestamp

from ipywidgets import interact

from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure
from bokeh.io import push_notebook, show, output_notebook

from confluent_kafka import Consumer, KafkaError

In [ ]:
output_notebook()

In [ ]:
# only plot prices for one symbol
symbol = 'A.N.Other-Corp'

In [ ]:
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test-group',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})

consumer.subscribe(['prices', 'bin-prices'])

In [ ]:
price_figure = figure(title=symbol, plot_height=300, 
                      plot_width=600, y_range=(90, 110), x_axis_type='datetime')
price_figure.xaxis.axis_label = 'Time'
price_figure.yaxis.axis_label = 'Price'

price_data = ColumnDataSource(data=dict(x=[datetime.datetime(2017,1,1)], y=[100]))
price_line = price_figure.line(x="x", y="y", color="blue", source=price_data, legend='Price')

bin_data = ColumnDataSource(data=dict(x=[], y=[]))
bin_circle = price_figure.circle(x="x", y="y", color="red", source=bin_data, legend='Binned price')

handle = show(price_figure, notebook_handle=True)

xp, yp= [], []
updated_price_data = dict(x=xp, y=yp)
xb, yb= [], []
updated_bin_data = dict(x=xb, y=yb)

In [ ]:
def process_price(msg, x, y, updated_data, price_data):
    dt, sym, prc = msg.value().decode("utf-8").split(',')
    if sym == symbol:
        dt = Timestamp(dt).to_pydatetime()
        prc = float(prc)

        x.append(dt)
        y.append(prc)

        updated_data['x'] = x
        updated_data['y'] = y
        price_data.stream(updated_data, len(x))
    
def process_bin(msg, x, y, updated_data, bin_data):
    prc = float(msg.value().decode("utf-8"))
    dt, sym = msg.key().decode("utf-8").split(',')
    
    if sym == symbol:
        x.append(Timestamp(dt).to_pydatetime())
        y.append(prc)

        updated_data['x'] = x
        updated_data['y'] = y
        bin_data.stream(updated_data, len(x))

last_date = None
running = True
while running:
    msg = consumer.poll()
    if not msg.error():
        #print(f'Received message: {msg.value().decode("utf-8")}')
        if msg.topic() == 'prices':
             process_price(msg, xp, yp, updated_price_data, price_data)
        elif msg.topic() == 'bin-prices':
            process_bin(msg, xb, yb, updated_bin_data, bin_data)
        
        push_notebook(handle=handle)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
        
c.close()

In [ ]:


In [ ]: