In [1]:
import config #put config.py in the same directory
import pyodbc
import json
import tweepy
from pylib import utils
from pylib.simplesqlserver import SimpleSQLServer

In [2]:
_debug_ = False
_online_ = True # read from online sources
_writedb_ = True # whether write outputs to database.
max_tweets = 5000

utils.enable_utf_print()

In [3]:
tweet_fields = {
    'id':'tweet_id',
    'text':'tweet_text',
    'source':'source',
    'retweet_count':'retweet_count',    
    'in_reply_to_status_id':'reply_tweet_id',
    'in_reply_to_user_id':'reply_user_id',
    'favorite_count':'favorite_count',
    'created_at' : 'created_at',
    'place' : 'place_json',
    'retweeted' : 'retweet',
}

In [4]:
#to users table
user_fields = {
    'name':'name',
    'screen_name':'screen_name',
    'id':'user_id',
    'created_at':'created_at',
    'followers_count':'followers_count',
    'friends_count':'friends_count',
    'favourites_count':'favourites_count',
    'statuses_count':'statuses_count',
    'description':'description',
    'time_zone':'timezone',
    'utc_offset':'utc_offset',
    'verified':'verified',
    'lang':'language',
    'location':'location',
}

In [5]:
# to hashtags table
hashtags_fields = {
    'text':'text',
}

In [6]:
# to mentions table
mentions_fields = {
    'id':'target_user_id',
    'screen_name':'screen_name',
    'name':'name',
}

In [7]:
def get_fields(source, fields):
    # given a source and a destination dict, copy the fields from source to destination according to the mapping provided by fields.  
    dest={}    
    if not isinstance(source, dict):
        print "source is not a dict" + type(source)

    for field in fields:
        if source.get(field):
            dest[fields[field]]=source[field]
    return dest

In [8]:
class CustomStreamListener(tweepy.StreamListener): 
    counter = 0               
    def on_data(self, raw_tweet):
        # on_data provides data in the raw json string
        try:    
            # record the time of event
            capture_date = utils.mysql_time()            
            tweet = json.loads(raw_tweet)
            if tweet.get('delete'):
                return True # skip delete tweet  
            elif tweet.get('lang') != 'en':
                return True # skip non english tweet
                
            tweet_data = {}
            user_data = {}
            hashtags_data = []
            mentions_data = []
            
            # retrieve and store tweet_data    
            tweet_data.update(get_fields(tweet,tweet_fields))
            
            # check for required fields
            if not tweet_data.get('tweet_id'):
                if _debug_:
                    # save the tweet for debugging
                    utils.dump_json(tweet,'debug/tweet.txt')
                print('No tweet id')
                return True  # discard this tweet 
            
            # fields that need special processing
            if tweet_data.get('created_at'):
                tweet_data['created_at'] = utils.utc_to_mysql_time(tweet_data['created_at'])
            if tweet.get('coordinates'):
                if tweet['coordinates'].get('coordinates'):
                    tweet_data['geo_long'] = tweet['coordinates']['coordinates'][0]
                    tweet_data['geo_lat'] = tweet['coordinates']['coordinates'][1]
            if tweet.get('retweeted_status'):
                tweet_data['retweet'] = 1
            else:
                tweet_data['retweet'] = 0
                    
            # retrieve and store user_data.          
            if tweet.get('user'):
                user_data.update(get_fields(tweet['user'],user_fields))    
                if not user_data.get('user_id'):
                    if _debug_:
                        utils.dump_json(tweet,'debug/tweet%s.txt'%tweet_data.get('tweet_id'))
                    print('No tweet user id')
                    return True  # discard this tweet            
                if user_data.get('created_at'):
                    user_data['created_at']=utils.utc_to_mysql_time(user_data['created_at'])

            # retrieve and store hashtag, user_mention and url data.   
            if tweet.get('entities'): 
                if tweet['entities'].get('hashtags'): 
                    for hashtag in tweet['entities']['hashtags']:
                        hashtag_data = {'tweet_id':tweet_data.get('tweet_id')}
                        hashtag_data.update(get_fields(hashtag, hashtags_fields))
                        hashtags_data.append(hashtag_data)
                if tweet['entities'].get('user_mentions'):    
                    for mention in tweet['entities']['user_mentions']:  # extract every user_mention from the user_mentions list
                        mention_data = {'tweet_id':tweet_data.get('tweet_id'), 'source_user_id':user_data.get('user_id')}
                        mention_data.update(get_fields(mention,mentions_fields))
                        mentions_data.append(mention_data)             

            # enrichment    
            if tweet_data: 
                tweet_data['user_id'] = user_data.get('user_id')
                tweet_data['capture_date']=capture_date
           
            if user_data:
                user_data['capture_date']=capture_date
                user_data['period'] = capture_date[:10]
         
            print "%s %i tweet id:%s"%(capture_date, self.counter, tweet_data.get('tweet_id'))
                        
            # write tweet_data and user_data into database
            for table, data in (('users',user_data),('tweets',tweet_data)):
                if data: 
                    if _writedb_:
                        print "%i %s inserted"%(db.insert(table, data), table)
                    else:
                        print table
                        print data
            
            # write hashtag, user_mention and url data into database at once
            for table, data in (('hashtags', hashtags_data), ('mentions', mentions_data)):
                if data: 
                    if _writedb_:
                        print "%i %s inserted"%(db.inserts(table, data), table)                        
                    else:
                        print table
                        print data
                        
            # count the number of collected tweets
            self.counter += 1          
            if self.counter >= max_tweets:
                print 'target number of tweets =%s have reached'%self.counter
                return False
        
        except Exception, e:
            if _debug_:
                # write the tweet to local disk for debugging
                utils.dump_json(tweet,'debug/tweet.txt')
                raise 
            else:
                pass # ignore errors
   
    def on_error(self, status_code):
        print "Got an API error with status code %s" % str(status_code)
        return True     # continue to listening
    
    def on_timeout(self):
        print "Timeout..."
        return True  # continue to listening

In [9]:
if __name__ == '__main__':

    # create a listener object
    listener = CustomStreamListener()
    
    # connect to db
    if _writedb_:
        db = SimpleSQLServer(dsn='Twitter', keep_alive=True)

    if _online_:    
        # create an auth object
        auth = tweepy.OAuthHandler(config.consumer_key, config.consumer_secret)
        auth.set_access_token(config.oauth_token, config.oauth_token_secret)
            
        # attach the listener object to twitter stream
        stream = tweepy.streaming.Stream(auth, listener)
        stream.filter(track=['#PokemonGO'])    

        while True:
            try:
                if stream.running is False:
                    print 'Stream stopped!'
                    break
                time.sleep(1) # sleep for 1 sec
            except KeyboardInterrupt:
                break
                
        stream.disconnect()
        db.end
        print 'Bye!'
    else:
        # offline mode
        print "offline mode" 
        listener.on_data(utils.read_file('debug/tweet_example_entities.txt'))
        exit(0)

In [ ]: