Top trending words in Twitter

- Find the word from the tweet which is occurring most number of the times in the whole tweets corpus.
- Top 10 trending topic.

Since this is streaming application, we will use python logging module to log. Further read.


In [1]:
import logging # python logging module

# basic format for logging
logFormat = "%(asctime)s - [%(levelname)s] (%(funcName)s:%(lineno)d) %(message)s"

# logs will be stored in tweepy.log
logging.basicConfig(filename='tweepytrends.log', level=logging.INFO, 
                   format=logFormat, datefmt="%Y-%m-%d %H:%M:%S")

Authentication and Authorisation

Create an app in twitter here. Copy the necessary keys and access tokens, which will be used here in our code.

The authorization is done using Oauth, An open protocol to allow secure authorization in a simple and standard method from web, mobile and desktop applications. Further read.

We will use Tweepy a python module. Tweepy is open-sourced, hosted on GitHub and enables Python to communicate with Twitter platform and use its API. Tweepy supports oauth authentication. Authentication is handled by the tweepy.AuthHandler class.


In [2]:
import tweepy  # importing all the modules required
import socket  # will be used to create sockets
import json    # manipulate json

from httplib import IncompleteRead

In [3]:
# Keep these tokens secret, as anyone can have full access to your
# twitter account, using these tokens

consumerKey = "#"
consumerSecret = "#"

accessToken = "#-#"
accessTokenSecret = "#"

Post this step, we will have full access to twitter api's


In [4]:
# Performing the authentication and authorization, post this step 
# we will have full access to twitter api's
def connectToTwitter():
    """Connect to twitter."""
    try:
        auth = tweepy.OAuthHandler(consumerKey, consumerSecret)
        auth.set_access_token(accessToken, accessTokenSecret)

        api = tweepy.API(auth)
        logging.info("Successfully logged in to twitter.")
        return api, auth
    except Exception as e:
        logging.info("Something went wrong in oauth, please check your tokens.")
        logging.error(e)

Streaming with tweepy

The Twitter streaming API is used to download twitter messages in real time. We use streaming api instead of rest api because, the REST api is used to pull data from twitter but the streaming api pushes messages to a persistent session. This allows the streaming api to download more data in real time than could be done using the REST API.

In Tweepy, an instance of tweepy.Stream establishes a streaming session and routes messages to StreamListener instance. The on_data method of a stream listener receives all messages and calls functions according to the message type.

But the on_data method is only a stub, so we need to implement the functionality by subclassing StreamListener.

Using the streaming api has three steps.

  1. Create a class inheriting from StreamListener
  2. Using that class create a Stream object
  3. Connect to the Twitter API using the Stream.

In [5]:
# Tweet listner class which subclasses from tweepy.StreamListener
class TweetListner(tweepy.StreamListener):
    """Twitter stream listner"""
    
    def __init__(self, csocket):
        self.clientSocket = csocket
    
    def on_data(self, raw_data):
        """ Called when raw data is received from connection.
            return False to stop stream and close connection.
        """
        try:
            msg = json.loads(raw_data)
            self.clientSocket.send(msg['text'].encode('utf-8'))
            return True
        except Exception as e:
            logging.error("An unhandled exception has occured, check your data processing")
            logging.error(e)
            raise e
    
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        logging.error("A non-200 status code is returned")
        return True

    def on_exception(self, exception):
        """Called when an unhandled exception occurs."""
        logging.error("An unhandled exception has occured")
        raise exception

In [6]:
# Creating a proxy socket
def createProxySocket(host, port):
    """ Returns a socket which can be used to connect
        to spark.
    """
    try:
        s = socket.socket()   # initialize socket instance
        s.bind((host, port))  # bind to the given host and port 
        s.listen(5)           # Enable a server to accept connections.
        logging.info("Listening on the port {}".format(port))
        cSocket, address = s.accept()  # waiting for a connection
        logging.info("Received Request from: {}".format(address))
        return cSocket
    except socket.error as e:  
        if e.errno == socket.errno.EADDRINUSE:  # Address in use
            logging.error("The given host:port {}:{} is already in use"\
                         .format(host, port))
            logging.info("Trying on port: {}".format(port + 1))
            return createProxySocket(host, port + 1)

In [7]:
if __name__ == "__main__":
    try:
        api, auth = connectToTwitter()  # connecting to twitter
        
        host = "localhost"
        port = 8878
        cSocket = createProxySocket(host, port)  # Creating a socket
        
        while True:
            try:
                # Connect/reconnect the stream
                tweetStream = tweepy.Stream(auth, TweetListner(cSocket))  # Stream the twitter data
                # DON'T run this approach async or you'll just create a ton of streams!
                tweetStream.filter(track=["Iphone", "iphone"])                  # Filter on trending topics
            except IncompleteRead:
                # Oh well, reconnect and keep trucking
                continue
            except KeyboardInterrupt:
                # Or however you want to exit this loop
                tweetStream.disconnect()
                break
            except Exception as e:
                logging.error("Unhandled exception has occured inside while")
                logging.error(e)
                continue
                       
    except KeyboardInterrupt:                   # Keyboard interrupt called
        logging.error("KeyboardInterrupt was hit")
    except Exception as e:
        logging.error("Unhandled exception has occured")
        logging.error(e)