Since this is streaming application, we will use python logging module to log. Further read.
In [1]:
import logging # python logging module
# basic format for logging
logFormat = "%(asctime)s - [%(levelname)s] (%(funcName)s:%(lineno)d) %(message)s"
# logs will be stored in tweepy.log
logging.basicConfig(filename='tweepytrends.log', level=logging.INFO,
format=logFormat, datefmt="%Y-%m-%d %H:%M:%S")
Create an app in twitter here. Copy the necessary keys and access tokens, which will be used here in our code.
The authorization is done using Oauth, An open protocol to allow secure authorization in a simple and standard method from web, mobile and desktop applications. Further read.
We will use Tweepy a python module. Tweepy is open-sourced, hosted on GitHub and enables Python to communicate with Twitter platform and use its API. Tweepy supports oauth authentication. Authentication is handled by the tweepy.AuthHandler class.
In [2]:
import tweepy # importing all the modules required
import socket # will be used to create sockets
import json # manipulate json
from httplib import IncompleteRead
In [3]:
# Keep these tokens secret, as anyone can have full access to your
# twitter account, using these tokens
consumerKey = "#"
consumerSecret = "#"
accessToken = "#-#"
accessTokenSecret = "#"
Post this step, we will have full access to twitter api's
In [4]:
# Performing the authentication and authorization, post this step
# we will have full access to twitter api's
def connectToTwitter():
"""Connect to twitter."""
try:
auth = tweepy.OAuthHandler(consumerKey, consumerSecret)
auth.set_access_token(accessToken, accessTokenSecret)
api = tweepy.API(auth)
logging.info("Successfully logged in to twitter.")
return api, auth
except Exception as e:
logging.info("Something went wrong in oauth, please check your tokens.")
logging.error(e)
The Twitter streaming API is used to download twitter messages in real time. We use streaming api instead of rest api because, the REST api is used to pull data from twitter but the streaming api pushes messages to a persistent session. This allows the streaming api to download more data in real time than could be done using the REST API.
In Tweepy, an instance of tweepy.Stream establishes a streaming session and routes messages to StreamListener instance. The on_data method of a stream listener receives all messages and calls functions according to the message type.
But the on_data method is only a stub, so we need to implement the functionality by subclassing StreamListener.
Using the streaming api has three steps.
In [5]:
# Tweet listner class which subclasses from tweepy.StreamListener
class TweetListner(tweepy.StreamListener):
"""Twitter stream listner"""
def __init__(self, csocket):
self.clientSocket = csocket
def on_data(self, raw_data):
""" Called when raw data is received from connection.
return False to stop stream and close connection.
"""
try:
msg = json.loads(raw_data)
self.clientSocket.send(msg['text'].encode('utf-8'))
return True
except Exception as e:
logging.error("An unhandled exception has occured, check your data processing")
logging.error(e)
raise e
def on_error(self, status_code):
"""Called when a non-200 status code is returned"""
logging.error("A non-200 status code is returned")
return True
def on_exception(self, exception):
"""Called when an unhandled exception occurs."""
logging.error("An unhandled exception has occured")
raise exception
In [6]:
# Creating a proxy socket
def createProxySocket(host, port):
""" Returns a socket which can be used to connect
to spark.
"""
try:
s = socket.socket() # initialize socket instance
s.bind((host, port)) # bind to the given host and port
s.listen(5) # Enable a server to accept connections.
logging.info("Listening on the port {}".format(port))
cSocket, address = s.accept() # waiting for a connection
logging.info("Received Request from: {}".format(address))
return cSocket
except socket.error as e:
if e.errno == socket.errno.EADDRINUSE: # Address in use
logging.error("The given host:port {}:{} is already in use"\
.format(host, port))
logging.info("Trying on port: {}".format(port + 1))
return createProxySocket(host, port + 1)
In [7]:
if __name__ == "__main__":
try:
api, auth = connectToTwitter() # connecting to twitter
host = "localhost"
port = 8878
cSocket = createProxySocket(host, port) # Creating a socket
while True:
try:
# Connect/reconnect the stream
tweetStream = tweepy.Stream(auth, TweetListner(cSocket)) # Stream the twitter data
# DON'T run this approach async or you'll just create a ton of streams!
tweetStream.filter(track=["Iphone", "iphone"]) # Filter on trending topics
except IncompleteRead:
# Oh well, reconnect and keep trucking
continue
except KeyboardInterrupt:
# Or however you want to exit this loop
tweetStream.disconnect()
break
except Exception as e:
logging.error("Unhandled exception has occured inside while")
logging.error(e)
continue
except KeyboardInterrupt: # Keyboard interrupt called
logging.error("KeyboardInterrupt was hit")
except Exception as e:
logging.error("Unhandled exception has occured")
logging.error(e)