In [1]:
    
from __future__ import print_function
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sn # For plotting
import time
import json
from IPython import display  # To work with graphs on jupyter
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import desc
from collections import namedtuple #function for creating tuple subclasses with named fields
    
In [2]:
    
# magic function to plot inline
%matplotlib inline
    
In [3]:
    
if __name__ == "__main__":
    sc = SparkContext(appName="TwitterRetweet")
    ssc = StreamingContext(sc, 60 * 60)  # Setting 1hr interval
    sqlContext = SQLContext(sc)  # Sql context for running sql query
 
    # Host port of server which is sending text stream
    host = "localhost"
    port = 8700
    socketStream = ssc.socketTextStream(host, port) # Connecting to socket
    dStream = socketStream.window(60 * 60)  # Setting 1hr window
    
    def parseTweet(dStream): # Data Manupulation
        try:
            data = json.loads(dStream)  # Load the json data
            return [( # Tuple of name and follower count
                     data.get("text", "undefined"), 
                     int(data.get("retweetcount", 0))
                    )]
        except:
            return []
        
    def displayTweet(time, rdd): # Print the data in readable format
        try:
            print(time)
            print("Top 100 Popular Tweets: ")
            print("Rank".center(6, "-") + "|" + "Tweet".center(40, "-") + "|" + "Retweet Count".center(20, "-"))
            for rank, item in enumerate(rdd.distinct().takeOrdered(100, key=lambda x: -x[1])):
                print(str(rank + 1).center(6, " ") + 
                      "|" + item[0] + 
                      "|" + str(item[1]).rjust(15, " ")
                     )
        except ValueError:
            pass
    
    _influencial = dStream.flatMap(parseTweet)\
                          .transform(  # Sorting the data
                                     lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)
                          ).foreachRDD(displayTweet)
    
    fields = ("id", "count")
    Tweet = namedtuple('Tweet', fields)
    # DStream where all the computation is done
    (dStream.flatMap(parseTweet)\
                          .transform(  # Sorting the data
                                     lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)
                          )\
          .map(lambda rec: Tweet(rec[0], rec[1]))\
          .foreachRDD(lambda rdd: rdd.toDF().sort(desc("count"))\
                      .limit(10).registerTempTable("tweets")))
        
    ssc.start()
#    ssc.awaitTermination()
    
In [4]:
    
while True:  # Display graph here
    try:
        time.sleep(60 * 60)  # Sleep 1hr, plot graph every hour
        topics = sqlContext.sql('Select id, count from tweets')
        topics = topics.toPandas()
        display.clear_output(wait=True)
        sn.set_style("whitegrid")  # Styling of plot
        sn.plt.figure(figsize = (10, 8)) # Figuresize of plot
        ax = sn.barplot(x=(topics.index.values + 1), y=topics["count"], estimator=sum)
        ax.set(xlabel='Rank of Tweet', ylabel='Retweet Count') # Labeling of plot
        sn.plt.show()
    except KeyboardInterrupt:  # User interrupt
        ssc.stop()
        print("Stoping the program")
        break
    # Continue even if there is exception and stop only on Keyboard Interrupt
    except Exception as e:  
        print(e)
        continue
    
    
    
The reason why retweet count is always 0 is because we receive the tweets as they are posted live on twitter platform, by the time we receive the tweet no other user had a chance to retweet it. If we want to find out the retweet_count we have to refetch this particular tweet some time later using the rest api then we can see the retweet_count will contain the number of retweets happened till this particular point in time. This is not done here currently, because hitting rest api for such a large amount of tweets will hit the limit.