Twitter provides two types of API to access their data:
The reason why you would like to use streaming API:
Twitter streaming API can provide data through a streaming HTTP response. This is very similar to downloading a file where you read a number of bytes and store it to disk and repeat until the end of file. The only difference is this stream is endless. The only things that could stop this stream are:
This means that this process will be using the thread that it was launched from until it is stopped. In production, you should always start this in a different thread or process to make sure your software doesn't freeze until you stop the stream.
You will need four numbers from twitter development to start using streaming API. First, let's import some important libraries for dealing with twitter API, data analysis, data storage ... etc
In [1]:
import numpy as np
import pandas as pd
import tweepy
import matplotlib.pyplot as plt
import pymongo
import ipywidgets as wgt
from IPython.display import display
from sklearn.feature_extraction.text import CountVectorizer
import re
from datetime import datetime
%matplotlib inline
In [2]:
api_key = "yP0yoCitoUNgD63ebMerGyJaE" # <---- Add your API Key
api_secret = "kLO5YUtlth3cd4lOHLy8nlLHW5npVQgUfO4FhsyCn6wCMIz5E6" # <---- Add your API Secret
access_token = "259862037-iMXNjfL8JBApm4LVcdfwc3FcMm7Xta4TKg5cd44K" # <---- Add your access token
access_token_secret = "UIgh08dtmavzlvlWWukIXwN5HDIQD0wNwyn5sPzhrynBf" # <---- Add your access token secret
auth = tweepy.OAuthHandler(api_key, api_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)
In [3]:
col = pymongo.MongoClient()["tweets"]["StreamingTutorial"]
col.count()
Out[3]:
We need a listener which should extend tweepy.StreamListener
class. There is a number of methods that you can extend to instruct the listener class to perform functionality. Some of the important methods are:
on_status(self, status)
: This will pass a status "tweet" object when a tweet is receivedon_data(self, raw_data)
: Called when any any data is received and the raw data will be passedon_error(self, status_code)
: Called when you get a response with code other than 200 (ok)
In [4]:
class MyStreamListener(tweepy.StreamListener):
counter = 0
def __init__(self, max_tweets=1000, *args, **kwargs):
self.max_tweets = max_tweets
self.counter = 0
super().__init__(*args, **kwargs)
def on_connect(self):
self.counter = 0
self.start_time = datetime.now()
def on_status(self, status):
# Increment counter
self.counter += 1
# Store tweet to MongoDB
col.insert_one(status._json)
if self.counter % 1 == 0:
value = int(100.00 * self.counter / self.max_tweets)
mining_time = datetime.now() - self.start_time
progress_bar.value = value
html_value = """<span class="label label-primary">Tweets/Sec: %.1f</span>""" % (self.counter / max([1,mining_time.seconds]))
html_value += """ <span class="label label-success">Progress: %.1f%%</span>""" % (self.counter / self.max_tweets * 100.0)
html_value += """ <span class="label label-info">ETA: %.1f Sec</span>""" % ((self.max_tweets - self.counter) / (self.counter / max([1,mining_time.seconds])))
wgt_status.value = html_value
#print("%s/%s" % (self.counter, self.max_tweets))
if self.counter >= self.max_tweets:
myStream.disconnect()
print("Finished")
print("Total Mining Time: %s" % (mining_time))
print("Tweets/Sec: %.1f" % (self.max_tweets / mining_time.seconds))
progress_bar.value = 0
myStreamListener = MyStreamListener(max_tweets=100)
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
There are two methods to connect to a stream:
filter(follow=None, track=None, async=False, locations=None, stall_warnings=False, languages=None, encoding='utf8', filter_level=None)
firehose(count=None, async=False)
Firehose captures everything. You should make sure that you have connection speed that can handle the stream and you have the storage capacity that can store these tweets at the same rate. We cannot really use firehose for this tutorial but we'll be using filter
.
You have to specify one of two things to filter:
follow
: A list of user ID to follow. This will stream all their tweets, retweets, and others retweeting their tweets. This doesn't include mentions and manual retweets where the user doesn't press the retweet button.track
: A string or list of string to be used for filtering. If you use multiple words separated by spaces, this will be used for AND operator. If you use multiple words in a string separated by commas or pass a list of words this will be treated as OR operator.Note: track
is case insensitive.
I want to collect all tweets that contains any of these words:
This could be done with a string or a list. It is easier to to it with a list to make your code clear to read.
In [5]:
keywords = ["Jupyter",
"Python",
"Data Mining",
"Machine Learning",
"Data Science",
"Big Data",
"DataMining",
"MachineLearning",
"DataScience",
"BigData",
"IoT",
"#R",
]
# Visualize a progress bar to track progress
progress_bar = wgt.IntProgress(value=0)
display(progress_bar)
wgt_status = wgt.HTML(value="""<span class="label label-primary">Tweets/Sec: 0.0</span>""")
display(wgt_status)
# Start a filter with an error counter of 20
for error_counter in range(20):
try:
myStream.filter(track=keywords)
print("Tweets collected: %s" % myStream.listener.counter)
print("Total tweets in collection: %s" % col.count())
break
except:
print("ERROR# %s" % (error_counter + 1))
In [6]:
col.find_one()
Out[6]:
In [29]:
dataset = [{"created_at": item["created_at"],
"text": item["text"],
"user": "@%s" % item["user"]["screen_name"],
"source": item["source"],
} for item in col.find()]
dataset = pd.DataFrame(dataset)
dataset
Out[29]:
In [30]:
cv = CountVectorizer()
count_matrix = cv.fit_transform(dataset.text)
word_count = pd.DataFrame(cv.get_feature_names(), columns=["word"])
word_count["count"] = count_matrix.sum(axis=0).tolist()[0]
word_count = word_count.sort_values("count", ascending=False).reset_index(drop=True)
word_count[:50]
Out[30]:
In [37]:
def get_source_name(x):
value = re.findall(pattern="<[^>]+>([^<]+)</a>", string=x)
if len(value) > 0:
return value[0]
else:
return ""
In [38]:
dataset.source_name = dataset.source.apply(get_source_name)
source_counts = dataset.source_name.value_counts().sort_values()[-10:]
bottom = [index for index, item in enumerate(source_counts.index)]
plt.barh(bottom, width=source_counts, color="orange", linewidth=0)
y_labels = ["%s %.1f%%" % (item, 100.0*source_counts[item]/len(dataset)) for index,item in enumerate(source_counts.index)]
plt.yticks(np.array(bottom)+0.4, y_labels)
source_counts
Out[38]: