In [1]:
from datetime import datetime
from bokeh.models import ColumnDataSource
from bokeh.plotting import figure, output_file,output_notebook, show

## MOVE THIS SOMEWHERE ELSE:
## mpirun -n 4 --host mpicontainer_mpi_head_1,mpicontainer_mpi_node_1,mpicontainer_mpi_node_2,mpicontainer_mpi_node_3 /bin/bash -c "source /home/mpirun/shared/soma.ve/bin/activate && soma -k peahi.inf.ed.ac.uk" < /dev/null
## - document VOLUME

# the data we want to plot
time       = []
throughput = []
mpiprocs   = []

appdata = ColumnDataSource(data=dict(time=time, throughput=throughput, mpiprocs=mpiprocs))

def add_values(time, throughput, mpiprocs):
    """ Adds a new tuple to the ColumnDataSource.
    """
    if not isinstance(time, list): time = [time]
    appdata.data['time'] = appdata.data['time'] + time
    
    if not isinstance(throughput, list): throughput = [throughput]
    appdata.data['throughput'] = appdata.data['throughput'] + throughput
    
    if not isinstance(mpiprocs, list): mpiprocs = [mpiprocs]
    appdata.data['mpiprocs'] = appdata.data['mpiprocs'] + mpiprocs
        
    appdata.push_notebook()

output_notebook()

# create a new plot with a title and axis labels
p = figure(
    title="Runtime Metrics", 
    x_axis_type="datetime", 
    x_axis_label='Time', 
    y_axis_label='', 
    plot_width=900)

# add a line renderer with legend and line thickness
p.line('time', 'throughput', legend="Throughput (iterations/s)", color="blue", source=appdata, line_width=2)
p.line('time', 'mpiprocs', legend="MPI processes (comm.size)", color="red", source=appdata, line_width=2)

# show the results
show(p)


BokehJS successfully loaded.

In [ ]:
import json
import time, datetime
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer

client = KafkaClient("peahi.inf.ed.ac.uk:9092")
consumer = SimpleConsumer(client, "soma-group", "soma")

while True:

    time_list = []
    throughput_list = []
    mpiprocs_list = []
    
    for message in consumer.get_messages(count=6000, block=False):
        data = json.loads(message.message.value)  
        # extract the timestamp and convert it back to a datatime object
        # timestamp format: "2015-10-13 17:07:48.640310"
        ts = time.strptime(data['timestamp'], '%Y-%m-%d %H:%M:%S.%f')
        time_list.append(datetime.datetime.fromtimestamp(time.mktime(ts)))
        throughput_list.append(data['throughput'])
        mpiprocs_list.append(data['mpi_processes'])
        
    add_values(time_list, throughput_list, mpiprocs_list)
    time.sleep(.5)



In [ ]:


In [ ]: