In [1]:
import config #put config.py in the same directory
import pyodbc
import json
import tweepy
from pylib import utils
from pylib.simplesqlserver import SimpleSQLServer
In [2]:
_debug_ = False
_online_ = True # read from online sources
_writedb_ = True # whether write outputs to database.
max_tweets = 5000
utils.enable_utf_print()
In [3]:
tweet_fields = {
'id':'tweet_id',
'text':'tweet_text',
'source':'source',
'retweet_count':'retweet_count',
'in_reply_to_status_id':'reply_tweet_id',
'in_reply_to_user_id':'reply_user_id',
'favorite_count':'favorite_count',
'created_at' : 'created_at',
'place' : 'place_json',
'retweeted' : 'retweet',
}
In [4]:
#to users table
user_fields = {
'name':'name',
'screen_name':'screen_name',
'id':'user_id',
'created_at':'created_at',
'followers_count':'followers_count',
'friends_count':'friends_count',
'favourites_count':'favourites_count',
'statuses_count':'statuses_count',
'description':'description',
'time_zone':'timezone',
'utc_offset':'utc_offset',
'verified':'verified',
'lang':'language',
'location':'location',
}
In [5]:
# to hashtags table
hashtags_fields = {
'text':'text',
}
In [6]:
# to mentions table
mentions_fields = {
'id':'target_user_id',
'screen_name':'screen_name',
'name':'name',
}
In [7]:
def get_fields(source, fields):
# given a source and a destination dict, copy the fields from source to destination according to the mapping provided by fields.
dest={}
if not isinstance(source, dict):
print "source is not a dict" + type(source)
for field in fields:
if source.get(field):
dest[fields[field]]=source[field]
return dest
In [8]:
class CustomStreamListener(tweepy.StreamListener):
counter = 0
def on_data(self, raw_tweet):
# on_data provides data in the raw json string
try:
# record the time of event
capture_date = utils.mysql_time()
tweet = json.loads(raw_tweet)
if tweet.get('delete'):
return True # skip delete tweet
elif tweet.get('lang') != 'en':
return True # skip non english tweet
tweet_data = {}
user_data = {}
hashtags_data = []
mentions_data = []
# retrieve and store tweet_data
tweet_data.update(get_fields(tweet,tweet_fields))
# check for required fields
if not tweet_data.get('tweet_id'):
if _debug_:
# save the tweet for debugging
utils.dump_json(tweet,'debug/tweet.txt')
print('No tweet id')
return True # discard this tweet
# fields that need special processing
if tweet_data.get('created_at'):
tweet_data['created_at'] = utils.utc_to_mysql_time(tweet_data['created_at'])
if tweet.get('coordinates'):
if tweet['coordinates'].get('coordinates'):
tweet_data['geo_long'] = tweet['coordinates']['coordinates'][0]
tweet_data['geo_lat'] = tweet['coordinates']['coordinates'][1]
if tweet.get('retweeted_status'):
tweet_data['retweet'] = 1
else:
tweet_data['retweet'] = 0
# retrieve and store user_data.
if tweet.get('user'):
user_data.update(get_fields(tweet['user'],user_fields))
if not user_data.get('user_id'):
if _debug_:
utils.dump_json(tweet,'debug/tweet%s.txt'%tweet_data.get('tweet_id'))
print('No tweet user id')
return True # discard this tweet
if user_data.get('created_at'):
user_data['created_at']=utils.utc_to_mysql_time(user_data['created_at'])
# retrieve and store hashtag, user_mention and url data.
if tweet.get('entities'):
if tweet['entities'].get('hashtags'):
for hashtag in tweet['entities']['hashtags']:
hashtag_data = {'tweet_id':tweet_data.get('tweet_id')}
hashtag_data.update(get_fields(hashtag, hashtags_fields))
hashtags_data.append(hashtag_data)
if tweet['entities'].get('user_mentions'):
for mention in tweet['entities']['user_mentions']: # extract every user_mention from the user_mentions list
mention_data = {'tweet_id':tweet_data.get('tweet_id'), 'source_user_id':user_data.get('user_id')}
mention_data.update(get_fields(mention,mentions_fields))
mentions_data.append(mention_data)
# enrichment
if tweet_data:
tweet_data['user_id'] = user_data.get('user_id')
tweet_data['capture_date']=capture_date
if user_data:
user_data['capture_date']=capture_date
user_data['period'] = capture_date[:10]
print "%s %i tweet id:%s"%(capture_date, self.counter, tweet_data.get('tweet_id'))
# write tweet_data and user_data into database
for table, data in (('users',user_data),('tweets',tweet_data)):
if data:
if _writedb_:
print "%i %s inserted"%(db.insert(table, data), table)
else:
print table
print data
# write hashtag, user_mention and url data into database at once
for table, data in (('hashtags', hashtags_data), ('mentions', mentions_data)):
if data:
if _writedb_:
print "%i %s inserted"%(db.inserts(table, data), table)
else:
print table
print data
# count the number of collected tweets
self.counter += 1
if self.counter >= max_tweets:
print 'target number of tweets =%s have reached'%self.counter
return False
except Exception, e:
if _debug_:
# write the tweet to local disk for debugging
utils.dump_json(tweet,'debug/tweet.txt')
raise
else:
pass # ignore errors
def on_error(self, status_code):
print "Got an API error with status code %s" % str(status_code)
return True # continue to listening
def on_timeout(self):
print "Timeout..."
return True # continue to listening
In [9]:
if __name__ == '__main__':
# create a listener object
listener = CustomStreamListener()
# connect to db
if _writedb_:
db = SimpleSQLServer(dsn='Twitter', keep_alive=True)
if _online_:
# create an auth object
auth = tweepy.OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.oauth_token, config.oauth_token_secret)
# attach the listener object to twitter stream
stream = tweepy.streaming.Stream(auth, listener)
stream.filter(track=['#PokemonGO'])
while True:
try:
if stream.running is False:
print 'Stream stopped!'
break
time.sleep(1) # sleep for 1 sec
except KeyboardInterrupt:
break
stream.disconnect()
db.end
print 'Bye!'
else:
# offline mode
print "offline mode"
listener.on_data(utils.read_file('debug/tweet_example_entities.txt'))
exit(0)
In [ ]: