In [1]:
from __future__ import print_function

import logging # python logging module
import sys

import json
import pandas as pd

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

In [2]:
# basic format for logging
logFormat = "%(asctime)s - [%(levelname)s] (%(funcName)s:%(lineno)d) %(message)s"

# logs will be stored in tweepy.log
logging.basicConfig(filename='SparkStreamingRevamped.log', level=logging.INFO, 
                   format=logFormat, datefmt="%Y-%m-%d %H:%M:%S")

In [3]:
# magic function to plot inline
%matplotlib inline

In [4]:
def print_happiest_words(rdd):
    top_list = rdd.take(5)
    print("Happiest topics in the last 5 seconds (%d total):" % rdd.count())
    for tuple in top_list:
        print("%s (%d happiness)" % (tuple[1], tuple[0]))

if __name__ == "__main__":

    sc = SparkContext(appName="TwitterStreamAnalysis")
    ssc = StreamingContext(sc, 60)
    
    # Host port of server which is sending text stream
    host = "localhost"
    port = 8889
    dStream = ssc.socketTextStream(host, port)
    
    def parseForPOI(dStream):
        try:
            data = json.loads(dStream)
            return [(
                     data.get("name", "undefined").decode("utf-8"), 
                     int(data.get("followersCount", 0))
                    )]
        except:
            return []
        
    def displayPOI(time, rdd):
        try:
            print(time)
            print("Top 20 Influential personalities from the twitter across the globe: ")
            print("Rank".center(6, "-") + "|" + "Name".center(40, "-") + "|" + "Followers Count".center(20, "-"))
            for rank, item in enumerate(rdd.distinct().takeOrdered(20, key=lambda x: -x[1])):
                print(str(rank + 1).center(6, " ") + 
                      "|" + item[0].center(40, " ") + 
                      "|" + str(item[1]).rjust(15, " ")
                     )
        except ValueError:
            pass
    
    _influencial = dStream.flatMap(parseForPOI)\
                          .transform(  # Sorting the data
                                     lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)
                          ).foreachRDD(displayPOI)
    _influencial2 = dStream.flatMap(parseForPOI)\
                      .transform(  # Sorting the data
                                 lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)
                      ).count()
    _influencial2.pprint(20)
    ssc.start()
    ssc.awaitTermination()


2017-10-05 15:37:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
  1   |              The Red Tide              |         331194
  2   |              The Red Tide              |         331194
  3   |                 GUESS                  |         130404
  4   |              A . Alshehri              |         108873
  5   |          The London Economic           |          57841
  6   |            itsjohnwetzel39             |          52624
  7   |             Bhadmus Hakeem             |          49779
  8   |             Bhadmus Hakeem             |          49779
  9   |                 Figen                  |          37191
  10  |                Altinget                |          34188
  11  |           Esio G. Moreno R.            |          31227
  12  |           EU Maritime & Fish           |          30356
  13  |             Binnaz Toprak              |          29094
  14  |             Ed sheeran jr.             |          27189
  15  |                   t                    |          23422
  16  |                 Silvia                 |          22320
  17  |             you're awesome             |          21571
  18  |             #SavePriyanshu             |          21529
  19  |           BEAST OF THE WEST            |          21464
  20  |           BEAST OF THE WEST            |          21464
-------------------------------------------
Time: 2017-10-05 15:37:00
-------------------------------------------
697

2017-10-05 15:38:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
  1   |                Xbox UK                 |         590456
  2   |           Meenakshi Goswami            |          93253
  3   |              Miur Social               |          80390
  4   |                mamdouh                 |          78100
  5   |           CBeebies Grown-Ups           |          65423
  6   |           Trendinalia India            |          52918
  7   |             Breaking News              |          45804
  8   |             Breaking News              |          45804
  9   |            Maleeha Manzoor             |          42244
  10  |             mohammeDiary++             |          38719
  11  |            Epson Indonesia             |          38585
  12  |                 Mezome                 |          37955
  13  |               dr.dre.NYC               |          37499
  14  |           gangbang_fun 31k+            |          31985
  15  |             Willy Tolerdo              |          29935
  16  |              Viaksh Singh              |          28395
  17  |                 Smyrna                 |          25367
  18  |                 Smyrna                 |          25367
  19  |              Kajol Saxena              |          25097
  20  |           ATATURKUM ONDERiM            |          24068
-------------------------------------------
Time: 2017-10-05 15:38:00
-------------------------------------------
910

2017-10-05 15:39:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
  1   |          ComptitiveController          |         223814
  2   |            Stedelijk Museum            |         199601
  3   |              Mystic Guru               |         140378
  4   |          Esppeonza [PARODIA]           |          74186
  5   |             Bhadmus Hakeem             |          49779
  6   |             Bhadmus Hakeem             |          49779
  7   |               Majid Agha               |          40267
  8   |               Monet Amca               |          31794
  9   |           EU Maritime & Fish           |          30356
  10  |            Belgrade Theatre            |          29590
  11  |                 Yinka                  |          28577
  12  |              Viaksh Singh              |          28392
  13  |              Viaksh Singh              |          28392
  14  |              Kajol Saxena              |          25097
  15  |                 Life.                  |          24648
  16  |           BEAST OF THE WEST            |          21464
  17  |                 Amita                  |          20291
  18  |                 Pooja                  |          19628
  19  |              metin cihan               |          18649
  20  |              IAMCharpMan               |          18476
-------------------------------------------
Time: 2017-10-05 15:39:00
-------------------------------------------
977

2017-10-05 15:40:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
  1   |          FrancescoFacchinetti          |        1134716
  2   |               iPantellas               |         530028
  3   |          Oxford Dictionaries           |         266032
  4   |                   -                    |         217911
  5   |              Levante-EMV               |         201987
  6   |              Levante-EMV               |         201987
  7   |              Levante-EMV               |         201987
  8   |              Levante-EMV               |         201987
  9   |               #NewsOnTV3               |         193530
  10  |               POWER 98.7               |         183523
  11  |              Mystic Guru               |         140378
  12  |          HuffPostUK Lifestyle          |         108575
  13  |           Meenakshi Goswami            |          93253
  14  |            The Morning Call            |          75173
  15  |          Esppeonza [PARODIA]           |          74186
  16  |                Les Sims                |          72217
  17  |             Polresta Depok             |          66368
  18  |            itsjohnwetzel39             |          52625
  19  |             Breaking News              |          45804
  20  |          BirminghamHippodrome          |          45248
-------------------------------------------
Time: 2017-10-05 15:40:00
-------------------------------------------
953

2017-10-05 15:41:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
  1   |            laSexta Noticias            |        1018853
  2   |            laSexta Noticias            |        1018853
  3   |            Noticias Cuatro             |         773998
  4   |                zaytung                 |         336255
  5   |              Mystic Guru               |         140378
  6   |          Blue Planet Society           |         138710
  7   |            Middle East Eye             |         117024
  8   |          Sidharth Malhotra FC          |         100517
  9   |             nieuwemarlean              |          89540
  10  |          Manhattan Connection          |          84018
  11  |               The Lowry                |          65724
  12  |                 #Buzz                  |          60761
  13  |               Paco Lobo                |          50931
  14  |                 Lokman                 |          49696
  15  |                 Aysen                  |          40934
  16  |            Pankhudi Sharma             |          40689
  17  |             Linked Supply              |          36799
  18  |                  NCTL                  |          35329
  19  |                  see                   |          33227
  20  |                Almeida                 |          32888
-------------------------------------------
Time: 2017-10-05 15:41:00
-------------------------------------------
1031

2017-10-05 15:42:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
  1   |                Sky News                |        4322240
  2   |            Antena3Noticias             |        1635068
  3   |           Greenpeace Italia            |         508731
  4   |            The Fake Marian             |         423605
  5   |                Kfm 94.5                |         313878
  6   |              Mario Sechi               |          97105
  7   |           Meenakshi Goswami            |          93253
  8   |          Universidad Alicante          |          90700
  9   |            There Is No God             |          88373
  10  |            Melhor do Volei             |          77661
  11  |          UN Education Report           |          57981
  12  |                 Lokman                 |          49696
  13  |          BirminghamHippodrome          |          45248
  14  |             Project AWARE              |          37770
  15  |          Trendinalia Colombia          |          37102
  16  |                  Puiv                  |          32201
  17  |             Ed sheeran jr.             |          27189
  18  |             Ed sheeran jr.             |          27189
  19  |              LondonDance               |          26865
  20  |             Emanuel Yeswal             |          25914
-------------------------------------------
Time: 2017-10-05 15:42:00
-------------------------------------------
1028

2017-10-05 15:43:00
Top 20 Influential personalities from the twitter across the globe: 
-Rank-|------------------Name------------------|--Followers Count---
---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-4-e86f396abc58> in <module>()
     46     _influencial2.pprint(20)
     47     ssc.start()
---> 48     ssc.awaitTermination()

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/pyspark/streaming/context.pyc in awaitTermination(self, timeout)
    204         """
    205         if timeout is None:
--> 206             self._jssc.awaitTermination()
    207         else:
    208             self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1156             proto.END_COMMAND_PART
   1157 
-> 1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
   1160             answer, self.gateway_client, self.target_id, self.name)

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/py4j/java_gateway.pyc in send_command(self, command, retry, binary)
    906         connection = self._get_connection()
    907         try:
--> 908             response = connection.send_command(command)
    909             if binary:
    910                 return response, self._create_connection_guard(connection)

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/py4j/java_gateway.pyc in send_command(self, command)
   1053 
   1054         try:
-> 1055             answer = smart_decode(self.stream.readline()[:-1])
   1056             logger.debug("Answer received: {0}".format(answer))
   1057             if answer.startswith(proto.RETURN_MESSAGE):

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/socket.pyc in readline(self, size)
    449             while True:
    450                 try:
--> 451                     data = self._sock.recv(self._rbufsize)
    452                 except error, e:
    453                     if e.args[0] == EINTR:

/home/dalonlobo/anaconda/envs/py2/lib/python2.7/site-packages/pyspark/context.pyc in signal_handler(signal, frame)
    235         def signal_handler(signal, frame):
    236             self.cancelAllJobs()
--> 237             raise KeyboardInterrupt()
    238 
    239         # see http://stackoverflow.com/questions/23206787/

KeyboardInterrupt: 

In [7]:
# always stop the streamer
# ssc.stop()

from pyspark.rdd import RDD

RDD.distinct

In [ ]:
str.ljust