Tweepy streamer

Q1 Find Influential people in twitter:

- For simplicity assume the algorithm to find influential person is directly proportional to followers.
- Find top 20 Influential personalities from the twitter across the globe.

Since this is streaming application, we will use python logging module to log. Further read.


In [1]:
import logging # python logging module

# basic format for logging
logFormat = "%(asctime)s - [%(levelname)s] (%(funcName)s:%(lineno)d) %(message)s"

# logs will be stored in tweepy.log
logging.basicConfig(filename='tweepy.log', level=logging.INFO, 
                   format=logFormat, datefmt="%Y-%m-%d %H:%M:%S")

Authentication and Authorisation

Create an app in twitter here. Copy the necessary keys and access tokens, which will be used here in our code.

The authorization is done using Oauth, An open protocol to allow secure authorization in a simple and standard method from web, mobile and desktop applications. Further read.

We will use Tweepy a python module. Tweepy is open-sourced, hosted on GitHub and enables Python to communicate with Twitter platform and use its API. Tweepy supports oauth authentication. Authentication is handled by the tweepy.AuthHandler class.


In [2]:
import tweepy  # importing all the modules required
import socket  # will be used to create sockets
import json    # manipulate json

from httplib import IncompleteRead

In [3]:
# Keep these tokens secret, as anyone can have full access to your
# twitter account, using these tokens

consumerKey = "#"
consumerSecret = "#"

accessToken = "#"
accessTokenSecret = "#"

Post this step, we will have full access to twitter api's


In [4]:
# Performing the authentication and authorization, post this step 
# we will have full access to twitter api's
def connectToTwitter():
    """Connect to twitter."""
    try:
        auth = tweepy.OAuthHandler(consumerKey, consumerSecret)
        auth.set_access_token(accessToken, accessTokenSecret)

        api = tweepy.API(auth)
        logging.info("Successfully logged in to twitter.")
        return api, auth
    except Exception as e:
        logging.info("Something went wrong in oauth, please check your tokens.")
        logging.error(e)

Streaming with tweepy

The Twitter streaming API is used to download twitter messages in real time. We use streaming api instead of rest api because, the REST api is used to pull data from twitter but the streaming api pushes messages to a persistent session. This allows the streaming api to download more data in real time than could be done using the REST API.

In Tweepy, an instance of tweepy.Stream establishes a streaming session and routes messages to StreamListener instance. The on_data method of a stream listener receives all messages and calls functions according to the message type.

But the on_data method is only a stub, so we need to implement the functionality by subclassing StreamListener.

Using the streaming api has three steps.

  1. Create a class inheriting from StreamListener
  2. Using that class create a Stream object
  3. Connect to the Twitter API using the Stream.

In [5]:
# Tweet listner class which subclasses from tweepy.StreamListener
class TweetListner(tweepy.StreamListener):
    """Twitter stream listner"""
    
    def __init__(self, csocket):
        self.clientSocket = csocket
        
    def dataProcessing(self, data):
        """Process the data, before sending to spark streaming
        """
        sendData = {}  # data that is sent to spark streamer
        user = data.get("user", {})
        name = user.get("name", "undefined").encode('utf-8')
        followersCount = user.get("followers_count", 0)
        sendData["name"] = name
        sendData["followersCount"] = followersCount
        #data_string = "{}:{}".format(name, followersCount) 
        self.clientSocket.send(json.dumps(sendData) + u"\n")  # append new line character, so that spark recognizes it
        logging.debug(json.dumps(sendData))
    
    def on_data(self, raw_data):
        """ Called when raw data is received from connection.
            return False to stop stream and close connection.
        """
        try:
            data = json.loads(raw_data)
            self.dataProcessing(data)
            #self.clientSocket.send(json.dumps(sendData) + u"\n") # Because the connection was breaking
            return True
        except Exception as e:
            logging.error("An unhandled exception has occured, check your data processing")
            logging.error(e)
            raise e
    
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        logging.error("A non-200 status code is returned")
        return True

In [6]:
# Creating a proxy socket
def createProxySocket(host, port):
    """ Returns a socket which can be used to connect
        to spark.
    """
    try:
        s = socket.socket()   # initialize socket instance
        s.bind((host, port))  # bind to the given host and port 
        s.listen(5)           # Enable a server to accept connections.
        logging.info("Listening on the port {}".format(port))
        cSocket, address = s.accept()  # waiting for a connection
        logging.info("Received Request from: {}".format(address))
        return cSocket
    except socket.error as e:  
        if e.errno == socket.errno.EADDRINUSE:  # Address in use
            logging.error("The given host:port {}:{} is already in use"\
                         .format(host, port))
            logging.info("Trying on port: {}".format(port + 1))
            return createProxySocket(host, port + 1)

Drawbacks of twitter streaming API

The major drawback of the Streaming API is that Twitter’s Steaming API provides only a sample of tweets that are occurring. The actual percentage of total tweets users receive with Twitter’s Streaming API varies heavily based on the criteria users request and the current traffic. Studies have estimated that using Twitter’s Streaming API users can expect to receive anywhere from 1% of the tweets to over 40% of tweets in near real-time. The reason that you do not receive all of the tweets from the Twitter Streaming API is simply because Twitter doesn’t have the current infrastructure to support it, and they don’t want to; hence, the Twitter Firehose. Ref

So we will use a hack i.e. get the top trending topics and use that to filter data.


In [7]:
def getWOEIDForTrendsAvailable(api, place):
    """Returns the WOEID of the country if the trend is available there. """
    
    # Iterate through trends
    data = api.trends_available()
    for item in data:
        if item["name"] == place:  # Use place = "Worldwide" to get woeid of world
            woeid = item["woeid"]
            break
    return woeid #name = India, woeid

In [8]:
# Get the list of trending topics from twitter
def getTrendingTopics(api, woeid):
    """Get the top trending topics from twitter"""
    data = api.trends_place(woeid)
    listOfTrendingTopic = [trend["name"] for trend in data[0]["trends"]]
    return listOfTrendingTopic

In [9]:
if __name__ == "__main__":
    try:
        api, auth = connectToTwitter()  # connecting to twitter
        # Global information is available by using 1 as the WOEID
        # woeid = getWOEIDForTrendsAvailable(api, "Worldwide")  # get the woeid of the worldwide
        woeid = 1
        trendingTopics = getTrendingTopics(api, woeid)[:10] # Pick only top 10 trending topics
        
        host = "localhost"
        port = 8888
        cSocket = createProxySocket(host, port)  # Creating a socket
        
        while True:
            try:
                # Connect/reconnect the stream
                tweetStream = tweepy.Stream(auth, TweetListner(cSocket))  # Stream the twitter data
                # DON'T run this approach async or you'll just create a ton of streams!
                tweetStream.filter(track=trendingTopics)                  # Filter on trending topics
            except IncompleteRead:
                # Oh well, reconnect and keep trucking
                continue
            except KeyboardInterrupt:
                # Or however you want to exit this loop
                tweetStream.disconnect()
                break
            except Exception as e:
                logging.error("Unhandled exception has occured")
                logging.error(e)
                continue
                       
    except KeyboardInterrupt:                   # Keyboard interrupt called
        logging.error("KeyboardInterrupt was hit")
    except Exception as e:
        logging.error("Unhandled exception has occured")
        logging.error(e)