Q1 Find Influential people in twitter:

- For simplicity assume the algorithm to find influential person is directly proportional to followers.
- Find top 20 Influential personalities from the twitter across the globe.

In [1]:
from __future__ import print_function

import logging # python logging module
import sys

import json
import pandas as pd

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [2]:
# basic format for logging
logFormat = "%(asctime)s - [%(levelname)s] (%(funcName)s:%(lineno)d) %(message)s"

# logs will be stored in tweepy.log
logging.basicConfig(filename='SparkStreamingRevamped.log', level=logging.INFO, 
                   format=logFormat, datefmt="%Y-%m-%d %H:%M:%S")

In [3]:
# magic function to plot inline
%matplotlib inline

In [4]:
if __name__ == "__main__":

    sc = SparkContext(appName="TwitterStreamAnalysis")
    ssc = StreamingContext(sc, 60 * 60)  # Setting 1hr interval
    
    # Host port of server which is sending text stream
    host = "localhost"
    port = 8889
    socketStream = ssc.socketTextStream(host, port) # Connecting to socket
    dStream = socketStream.window(60 * 60)  # Setting 1hr window
    def parseForPOI(dStream): # Data Manupulation
        try:
            data = json.loads(dStream)  # Load the json data
            return [( # Tuple of name and follower count
                     data.get("name", "undefined").decode("utf-8"), 
                     int(data.get("followersCount", 0))
                    )]
        except:
            return []
        
    def displayPOI(time, rdd): # Print the data in readable format
        try:
            print(time)
            print("Top 20 Influential personalities from the twitter across the globe: ")
            print("Rank".center(6, "-") + "|" + "Name".center(40, "-") + "|" + "Followers Count".center(20, "-"))
            for rank, item in enumerate(rdd.distinct().takeOrdered(20, key=lambda x: -x[1])):
                print(str(rank + 1).center(6, " ") + 
                      "|" + item[0].center(40, " ") + 
                      "|" + str(item[1]).rjust(15, " ")
                     )
        except ValueError:
            pass
    
    _influencial = dStream.flatMap(parseForPOI)\
                          .transform(  # Sorting the data
                                     lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)
                          ).foreachRDD(displayPOI)
    _influencial2 = dStream.flatMap(parseForPOI)\
                      .transform(  # Sorting the data
                                 lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)
                      ).count() # To find number of tweets
    _influencial2.pprint() # Number of tweets
    ssc.start()
    ssc.awaitTermination() # Await for keyboard interrupt


2017-10-08 12:30:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
  1   |             Times of India             |       10707298
  2   |                  NDTV                  |       10177338
  3   |               BBC Sport                |        7040979
  4   |               BBC Sport                |        7040961
  5   |            Sky Sports News             |        5683167
  6   |                 MARCA                  |        4751182
  7   |               Formula 1                |        3210857
  8   |               Formula 1                |        3210830
  9   |               Formula 1                |        3210803
  10  |               Formula 1                |        3210793
  11  |               Formula 1                |        3210786
  12  |               Formula 1                |        3210777
  13  |               Formula 1                |        3210705
  14  |               Formula 1                |        3210689
  15  |               Formula 1                |        3210667
  16  |               Formula 1                |        3210638
  17  |               Formula 1                |        3210621
  18  |            Azteca Deportes             |        2146353
  19  |            Red Bull Racing             |        1976991
  20  |            Red Bull Racing             |        1976960
-------------------------------------------
Time: 2017-10-08 12:30:00
-------------------------------------------
44288

2017-10-08 13:30:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
  1   |                  NDTV                  |       10177509
  2   |             BBC News (UK)              |        8809693
  3   |               BBC Sport                |        7040982
  4   |                  UFC                   |        6284242
  5   |            Sky Sports News             |        5683202
  6   |                 MARCA                  |        4751191
  7   |               The Hindu                |        4456988
  8   |               Bloomberg                |        4300750
  9   |             Radio Elshinta             |        3256006
  10  |             Radio Elshinta             |        3256004
  11  |               Formula 1                |        3211081
  12  |               Formula 1                |        3210986
  13  |               Formula 1                |        3210963
  14  |             BIG MONEY MIKE             |        3008180
  15  |                Sky TG24                |        2928036
  16  |                 CANAL+                 |        2882264
  17  |           Hollywood Reporter           |        2708591
  18  |             Mercedes-Benz              |        2072967
  19  |            Red Bull Racing             |        1977192
  20  |            Red Bull Racing             |        1977155
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-4-c1448c093325> in <module>()
     46     _influencial2.pprint()
     47     ssc.start()
---> 48     ssc.awaitTermination()

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/pyspark/streaming/context.pyc in awaitTermination(self, timeout)
    204         """
    205         if timeout is None:
--> 206             self._jssc.awaitTermination()
    207         else:
    208             self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1156             proto.END_COMMAND_PART
   1157 
-> 1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
   1160             answer, self.gateway_client, self.target_id, self.name)

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/py4j/java_gateway.pyc in send_command(self, command, retry, binary)
    906         connection = self._get_connection()
    907         try:
--> 908             response = connection.send_command(command)
    909             if binary:
    910                 return response, self._create_connection_guard(connection)

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/py4j/java_gateway.pyc in send_command(self, command)
   1053 
   1054         try:
-> 1055             answer = smart_decode(self.stream.readline()[:-1])
   1056             logger.debug("Answer received: {0}".format(answer))
   1057             if answer.startswith(proto.RETURN_MESSAGE):

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/socket.pyc in readline(self, size)
    449             while True:
    450                 try:
--> 451                     data = self._sock.recv(self._rbufsize)
    452                 except error, e:
    453                     if e.args[0] == EINTR:

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/pyspark/context.pyc in signal_handler(signal, frame)
    235         def signal_handler(signal, frame):
    236             self.cancelAllJobs()
--> 237             raise KeyboardInterrupt()
    238 
    239         # see http://stackoverflow.com/questions/23206787/

KeyboardInterrupt: