In [1]:
from sqlalchemy import create_engine
import tweepy as tweepy
import pandas as pd
In [2]:
import tweet_functions as tf
from credentials import *
In [3]:
db_connection_string = "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8mb4&use_unicode=True".format(DB_USER, DB_PASS, DB_HOST, DB_PORT, DB_SCHEMA)
In [4]:
db_engine = create_engine(db_connection_string, encoding="utf8")
In [5]:
auth = tweepy.OAuthHandler(TWITTER_CONSUMER_TOKEN, TWITTER_CONSUMER_SECRET)
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)
In [6]:
# log the start of thebatch
connection = db_engine.raw_connection()
try:
cursor = connection.cursor()
results = cursor.callproc('start_batch')
results = list(cursor.fetchall())
cursor.close()
connection.commit()
finally:
connection.close()
batch_id = results[0][0]
In [7]:
term_df = pd.read_sql('call get_terms()', db_engine)
In [8]:
pd.options.mode.chained_assignment = None
max_tweets_per_term = 100
In [9]:
for index, term_row in term_df.iterrows():
term_id = term_row['term_id']
term = term_row['term']
term_max_id = term_row['max_id']
searched_tweets = []
last_id = -1
while len(searched_tweets) < max_tweets_per_term:
count = max_tweets_per_term - len(searched_tweets)
try:
new_tweets = api.search(q=term, count=count, max_id=str(last_id - 1), since_id=term_max_id)
if not new_tweets:
break
searched_tweets.extend(new_tweets)
last_id = new_tweets[-1].id
except tweepy.TweepError as e:
# depending on TweepError.code, one may want to retry or wait
# to keep things simple, we will give up on an error
break
number_collected = len(searched_tweets)
if number_collected > 0:
# there were tweets... so need to format and upload them to DB
# first quickly get the new max id and save it
new_max_id = str(searched_tweets[0].id)
# turn the collection of Status objects into a dataframe
tweets, headers = tf.tweets_to_list(searched_tweets)
tweets_df = pd.DataFrame(tweets, columns=headers)
# make a smaller data frame of just the text, then remove any urls from the text:
tweet_text_df = tweets_df[['tweet_id','tweet_text']]
tweet_text_df.loc[:,'tweet_text'] = tweet_text_df['tweet_text'].apply(tf.remove_urls)
# could also remove other stuff here... maybe @name tags?
# apply tokenizer function to get message_id <-> word table, then stem those words
words_df = tweet_text_df.groupby('tweet_id', group_keys=False).apply(tf.pivot_words)
words_df.loc[:,'word'] = words_df['word'].apply(tf.stemmer.stem)
# going back to the Status objects, now make a dataframe of entities
entities, headers = tf.tweets_to_entity_list(searched_tweets)
entities_df = pd.DataFrame(entities, columns=headers)
# add the term ID to tweets df
tweets_df['term_id'] = term_id
try:
connection = db_engine.raw_connection()
cursor = connection.cursor()
try:
cursor.callproc('truncate_staging')
except:
print("Error truncating staging")
try:
tweets_df.to_sql('stage_tweet', db_engine, if_exists='append', index=False)
cursor.callproc('merge_stage_tweet', [str(batch_id)])
except:
print("Error loading/merging tweets")
try:
words_df.to_sql('stage_word', db_engine, if_exists='append', index=False)
cursor.callproc('merge_stage_word')
except:
print("Error loading/merging words")
try:
entities_df.to_sql('stage_entity', db_engine, if_exists='append', index=False)
cursor.callproc('merge_stage_entity')
except:
print("Error loading/merging entities")
cursor.close()
connection.commit()
finally:
connection.close()
else:
# no tweets, so ignore the stuff above, and set the max_id to same as before
new_max_id = term_max_id
# tweets or no, log the term batch before moving on to next term
connection = db_engine.raw_connection()
try:
cursor = connection.cursor()
cursor.callproc('log_batch_term', [str(batch_id), str(term_id), new_max_id, str(number_collected)])
cursor.close()
connection.commit()
finally:
connection.close()