- Most popular tweets means here is the tweet which has been re-tweeted maximum number of times.
- Get top 100 most re-tweeted tweets in last 1 hour related to “iphone”.

In [1]:
from __future__ import print_function

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sn # For plotting
import time
import json

from IPython import display  # To work with graphs on jupyter
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from collections import namedtuple #function for creating tuple subclasses with named fields

In [2]:
# magic function to plot inline
%matplotlib inline

In [3]:
if __name__ == "__main__":

    sc = SparkContext(appName="TwitterStreamer")
    ssc = StreamingContext(sc, 60 * 60)  # Setting 1hr interval
    sqlContext = SQLContext(sc)  # Sql context for running sql query
 
    # Host port of server which is sending text stream
    host = "localhost"
    port = 8500
    socketStream = ssc.socketTextStream(host, port) # Connecting to socket
    dStream = socketStream.window(60 * 60)  # Setting 1hr window
    
    def parseTweet(dStream): # Data Manupulation
        try:
            data = json.loads(dStream)  # Load the json data
            return [( # Tuple of name and follower count
                     data.get("name", "undefined"), 
                     data.get("lang", "undefined")
                    )]
        except:
            return []
    
    fields = ("lang", "count")
    Tweet_Lang = namedtuple('Tweet', fields)
    # DStream where all the computation is done
    dStream.flatMap(parseTweet)\
            .transform(lambda rdd: rdd.map(lambda x:(x[1], 1))\
                        .reduceByKey(lambda a, b: a + b))\
            .map(lambda tup: Tweet_Lang( tup[0], tup[1]))\
            .foreachRDD(lambda rdd: rdd.toDF().sort(desc("count"))\
                        .limit(30).registerTempTable("tweets_lang"))
    

    ssc.start()
#    ssc.awaitTermination()

In [4]:
while True:  # Display graph here
    try:
        time.sleep(60 * 60)  # Sleep 1hr, plot graph every hour
        topics = sqlContext.sql('Select lang, count from tweets_lang')
        topics = topics.toPandas()
        display.clear_output(wait=True)
        sn.set_style("whitegrid")  # Styling of plot
        sn.plt.figure(figsize = (10, 8)) # Figuresize of plot
        ax = sn.barplot(x=topics["lang"], y=topics["count"], estimator=sum)
        ax.set(xlabel='Trending Language', ylabel='Count') # Labeling of plot
        sn.plt.show()
    except KeyboardInterrupt:  # User interrupt
        ssc.stop()
        print("Stoping the program")
        break
    # Continue even if there is exception and stop only on Keyboard Interrupt
    except Exception as e:  
        print(e)
        continue


Stoping the program