In [1]:
from __future__ import print_function
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sn # For plotting
import time
import json
from IPython import display # To work with graphs on jupyter
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from collections import namedtuple #function for creating tuple subclasses with named fields
In [2]:
# magic function to plot inline
%matplotlib inline
In [3]:
if __name__ == "__main__":
sc = SparkContext(appName="TwitterStreamer")
ssc = StreamingContext(sc, 60 * 60) # Setting 1hr interval
sqlContext = SQLContext(sc) # Sql context for running sql query
# Host port of server which is sending text stream
host = "localhost"
port = 8500
socketStream = ssc.socketTextStream(host, port) # Connecting to socket
dStream = socketStream.window(60 * 60) # Setting 1hr window
def parseTweet(dStream): # Data Manupulation
try:
data = json.loads(dStream) # Load the json data
return [( # Tuple of name and follower count
data.get("name", "undefined"),
data.get("lang", "undefined")
)]
except:
return []
fields = ("lang", "count")
Tweet_Lang = namedtuple('Tweet', fields)
# DStream where all the computation is done
dStream.flatMap(parseTweet)\
.transform(lambda rdd: rdd.map(lambda x:(x[1], 1))\
.reduceByKey(lambda a, b: a + b))\
.map(lambda tup: Tweet_Lang( tup[0], tup[1]))\
.foreachRDD(lambda rdd: rdd.toDF().sort(desc("count"))\
.limit(30).registerTempTable("tweets_lang"))
ssc.start()
# ssc.awaitTermination()
In [4]:
while True: # Display graph here
try:
time.sleep(60 * 60) # Sleep 1hr, plot graph every hour
topics = sqlContext.sql('Select lang, count from tweets_lang')
topics = topics.toPandas()
display.clear_output(wait=True)
sn.set_style("whitegrid") # Styling of plot
sn.plt.figure(figsize = (10, 8)) # Figuresize of plot
ax = sn.barplot(x=topics["lang"], y=topics["count"], estimator=sum)
ax.set(xlabel='Trending Language', ylabel='Count') # Labeling of plot
sn.plt.show()
except KeyboardInterrupt: # User interrupt
ssc.stop()
print("Stoping the program")
break
# Continue even if there is exception and stop only on Keyboard Interrupt
except Exception as e:
print(e)
continue