In [1]:
from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer

import pandas as pd
import threading, logging, time

import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
avg_temps = {}
max_temps = []

In [3]:
def parse(s):
    split = s.split(",")
    return (split[0], float(split[1]))

def update_avg_temps(temp):
    avg_temps[temp[0]] = temp[1]
    
def update_max_temps(temp):
    max_temps.append(temp)

class Consumer(threading.Thread):
    daemon = True
    
    def __init__(self, topic, handler):
        threading.Thread.__init__(self)
        self.topic = topic
        self.handler = handler

    def run(self):
        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "python", self.topic)
        
        for m in consumer:
            #(city, temp)
            temp = parse(m.message.value)
            self.handler(temp)

avg_temp_consumer = Consumer("output_avg", update_avg_temps)
max_temp_consumer = Consumer("output_max", update_max_temps)

In [ ]:
max_temp_consumer.start()
avg_temp_consumer.start()

In [16]:
plt.plot([x[1] for x in max_temps])


Out[16]:
[<matplotlib.lines.Line2D at 0x1160dfc10>]

In [13]:
pd.DataFrame({"Avg_temp": avg_temps})


Out[13]:
Avg_temp
Amsterdam 19.590334
Andorra-la-vella 21.480398
Athens 20.190591
Baku 20.438654
Belfast 20.588814
Belgrade 20.638501
Berlin 19.296830
Berne 19.514307
Bratislava 20.514888
Brussels 19.424165
Bucharest 20.660340
Budapest 20.154037
Cayenne 20.065690
Copenhagen 18.858833
Dushanbe 19.464382
Edinburgh 20.368655
Fort-de-France 20.860609
Helsinki 18.859017
Jerevan 20.147542
Kiev 20.182951
Kishinev 19.197943
Lisbon 19.234160
Ljubljana 18.639246
London 20.963390
Luxemburg 19.102233
Madrid 20.151328
Minsk 18.713897
Monaco 20.910208
Moscow 19.708294
Nicosia 20.744566
Oslo 20.315720
Paris 19.268307
Prague 19.675716
Reykjavik 19.438333
Riga 18.355499
Rome 19.931610
San Marino 18.539063
Sarajevo 19.795584
Skopje 19.660211
Sofia 17.977708
Stockholm 20.426783
Tallinn 20.579483
Tbilisi 19.824898
Tirane 20.071971
Toshkent 21.712709
Vaduz 19.023461
Valletta 21.300480
Vatican City 18.607748
Vienna 21.880462
Vilnius 19.331865
Warsaw 21.106537
Zagreb 19.540364

In [ ]:


In [ ]: