In [1]:
import time
import twitter
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

import dateutil.parser
import json

class Tweet(dict):
    def __init__(self, tweet_in):
        super(Tweet, self).__init__(self)
        if tweet_in and 'delete' not in tweet_in:
            self['timestamp'] = dateutil.parser.parse(tweet_in[u'created_at']
                                ).replace(tzinfo=None).isoformat()
            self['text'] = tweet_in['text'].encode('utf-8')
            self['hashtags'] = [x['text'].encode('utf-8') for x in tweet_in['entities']['hashtags']]
            self['geo'] = tweet_in['geo']['coordinates'] if tweet_in['geo'] else None
            self['id'] = tweet_in['id']
            self['screen_name'] = tweet_in['user']['screen_name'].encode('utf-8')
            self['user_id'] = tweet_in['user']['id'] 
        
def connect_twitter():
    twitter_stream =  twitter.TwitterStream(auth=twitter.OAuth(token = "107968834-rWAKtQNiTiGDRRC5W7Iy3JDgag6SH5rbpvJoxYQP",token_secret = "BXBt4mG9LUJoWKPiua8rBoSD7XM2PMEc9byDygK4bXwS0",consumer_key = "wKC0XeeDsF2Nq6r443dsIrTE7",consumer_secret = "r1GcpLLTtbEqRcZE0x8W7MD0vucfklk6nPxEec7hIpIdoO3F3j"))
    return twitter_stream

def get_next_tweet(twitter_stream):
    stream = twitter_stream.statuses.sample(block=True)
    tweet_in = None
    while not tweet_in or 'delete' in tweet_in:
        tweet_in = stream.next()
        tweet_parsed = Tweet(tweet_in)
        if not "delete" in tweet_in:
            print "-- " , tweet_parsed['screen_name'], " # " , tweet_parsed['timestamp'], " # " , tweet_parsed['text']
        # print(json.dumps(tweet_in, indent=2, sort_keys=True))
    # return json.dumps(tweet_in, indent=2, sort_keys=True)
    return json.dumps(tweet_parsed)

def process_rdd_queue(twitter_stream):
    rddQueue = []
    for i in range(5):
        rddQueue += [ssc.sparkContext.parallelize([get_next_tweet(twitter_stream)], 5)]

    lines = ssc.queueStream(rddQueue)
    lines.saveAsTextFiles("out_files/files_")
    lines.pprint()

In [2]:
twitter_stream = connect_twitter()

In [3]:
ssc = StreamingContext(sc, 1)
process_rdd_queue(twitter_stream)
ssc.start()
time.sleep(4)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
time.sleep(100)
ssc = StreamingContext(sc, 1)
process_rdd_queue(twitter_stream)
ssc.start()
time.sleep(4)
ssc.stop(stopSparkContext=False, stopGraceFully=True)


--  natsumikisa7016  #  2016-04-13T11:01:17  #  仕事で近くに行ってたんだけど
すごかった!!!!👓の壁! https://t.co/ZpSEqqZ2Hy
--  holvoeteline  #  2016-04-13T11:01:19  #  RT @EmmelieVanhool: Ik mis men mooiste sterretje hierboven 😔😇❤️
--  sultan_ssdd  #  2016-04-13T11:01:20  #  RT @AliKerrouche: إذا أردت التوقف عن القلق والبدء بالحياة .. عدد نعمك وليس متاعبك..#قروب_غيم_للدعم
--  AmazingInfoTech  #  2016-04-13T11:01:21  #  RT @cloudera: Video Demo: Getting Started with Hive-on-Spark https://t.co/VaDB6kV4fC
--  bukkket_  #  2016-04-13T11:01:22  #  RT @garez_ank: #VeBenimYalanım ÖLMÜS  OLAN DEDEM BİRDAHA  ÖLDÜ BUGÜN İŞE GELEMEYECEM
-------------------------------------------
Time: 2016-04-13 13:01:23
-------------------------------------------
{"user_id": 217079960, "screen_name": "natsumikisa7016", "timestamp": "2016-04-13T11:01:17", "hashtags": [], "text": "\u4ed5\u4e8b\u3067\u8fd1\u304f\u306b\u884c\u3063\u3066\u305f\u3093\u3060\u3051\u3069\n\u3059\u3054\u304b\u3063\u305f\uff01\uff01\uff01\uff01\ud83d\udc53\u306e\u58c1\uff01 https://t.co/ZpSEqqZ2Hy", "geo": null, "id": 720205210715459584}

-------------------------------------------
Time: 2016-04-13 13:01:24
-------------------------------------------
{"user_id": 4504597529, "screen_name": "holvoeteline", "timestamp": "2016-04-13T11:01:19", "hashtags": [], "text": "RT @EmmelieVanhool: Ik mis men mooiste sterretje hierboven \ud83d\ude14\ud83d\ude07\u2764\ufe0f", "geo": null, "id": 720205219141955584}

-------------------------------------------
Time: 2016-04-13 13:01:25
-------------------------------------------
{"user_id": 4074317475, "screen_name": "sultan_ssdd", "timestamp": "2016-04-13T11:01:20", "hashtags": ["\u0642\u0631\u0648\u0628_\u063a\u064a\u0645_\u0644\u0644\u062f\u0639\u0645"], "text": "RT @AliKerrouche: \u0625\u0630\u0627 \u0623\u0631\u062f\u062a \u0627\u0644\u062a\u0648\u0642\u0641 \u0639\u0646 \u0627\u0644\u0642\u0644\u0642 \u0648\u0627\u0644\u0628\u062f\u0621 \u0628\u0627\u0644\u062d\u064a\u0627\u0629 .. \u0639\u062f\u062f \u0646\u0639\u0645\u0643 \u0648\u0644\u064a\u0633 \u0645\u062a\u0627\u0639\u0628\u0643..#\u0642\u0631\u0648\u0628_\u063a\u064a\u0645_\u0644\u0644\u062f\u0639\u0645", "geo": null, "id": 720205223319441408}

-------------------------------------------
Time: 2016-04-13 13:01:26
-------------------------------------------
{"user_id": 2763518095, "screen_name": "AmazingInfoTech", "timestamp": "2016-04-13T11:01:21", "hashtags": [], "text": "RT @cloudera: Video Demo: Getting Started with Hive-on-Spark https://t.co/VaDB6kV4fC", "geo": null, "id": 720205227505422336}

-------------------------------------------
Time: 2016-04-13 13:01:27
-------------------------------------------
{"user_id": 399478811, "screen_name": "bukkket_", "timestamp": "2016-04-13T11:01:22", "hashtags": ["VeBenimYalan\u0131m"], "text": "RT @garez_ank: #VeBenimYalan\u0131m \u00d6LM\u00dcS  OLAN DEDEM B\u0130RDAHA  \u00d6LD\u00dc BUG\u00dcN \u0130\u015eE GELEMEYECEM", "geo": null, "id": 720205231691313152}

-------------------------------------------
Time: 2016-04-13 13:01:28
-------------------------------------------

-------------------------------------------
Time: 2016-04-13 13:01:29
-------------------------------------------

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-3-f03c9caed65e> in <module>()
      4 time.sleep(4)
      5 ssc.stop(stopSparkContext=False, stopGraceFully=True)
----> 6 time.sleep(100)
      7 ssc = StreamingContext(sc, 1)
      8 process_rdd_queue(twitter_stream)

KeyboardInterrupt: 

In [10]:


In [ ]:


In [ ]:


In [ ]: