Tweepy streamer

Language based analysis:

- Find counts of tweets in different languages.
- Get top 7 languages in terms of volume.

Since this is streaming application, we will use python logging module to log. Further read.


In [1]:
import logging # python logging module

# basic format for logging
logFormat = "%(asctime)s - [%(levelname)s] (%(funcName)s:%(lineno)d) %(message)s"

# logs will be stored in tweepy.log
logging.basicConfig(filename='tweepylang.log', level=logging.INFO, 
                   format=logFormat, datefmt="%Y-%m-%d %H:%M:%S")

Authentication and Authorisation

Create an app in twitter here. Copy the necessary keys and access tokens, which will be used here in our code.

The authorization is done using Oauth, An open protocol to allow secure authorization in a simple and standard method from web, mobile and desktop applications. Further read.

We will use Tweepy a python module. Tweepy is open-sourced, hosted on GitHub and enables Python to communicate with Twitter platform and use its API. Tweepy supports oauth authentication. Authentication is handled by the tweepy.AuthHandler class.


In [2]:
import tweepy  # importing all the modules required
import socket  # will be used to create sockets
import json    # manipulate json

from httplib import IncompleteRead

In [3]:
# Keep these tokens secret, as anyone can have full access to your
# twitter account, using these tokens

consumerKey = "#"
consumerSecret = "#"

accessToken = "#-#"
accessTokenSecret = "#"

Post this step, we will have full access to twitter api's


In [4]:
# Performing the authentication and authorization, post this step 
# we will have full access to twitter api's
def connectToTwitter():
    """Connect to twitter."""
    try:
        auth = tweepy.OAuthHandler(consumerKey, consumerSecret)
        auth.set_access_token(accessToken, accessTokenSecret)

        api = tweepy.API(auth)
        logging.info("Successfully logged in to twitter.")
        return api, auth
    except Exception as e:
        logging.info("Something went wrong in oauth, please check your tokens.")
        logging.error(e)

Streaming with tweepy

The Twitter streaming API is used to download twitter messages in real time. We use streaming api instead of rest api because, the REST api is used to pull data from twitter but the streaming api pushes messages to a persistent session. This allows the streaming api to download more data in real time than could be done using the REST API.

In Tweepy, an instance of tweepy.Stream establishes a streaming session and routes messages to StreamListener instance. The on_data method of a stream listener receives all messages and calls functions according to the message type.

But the on_data method is only a stub, so we need to implement the functionality by subclassing StreamListener.

Using the streaming api has three steps.

  1. Create a class inheriting from StreamListener
  2. Using that class create a Stream object
  3. Connect to the Twitter API using the Stream.

In [5]:
# Tweet listner class which subclasses from tweepy.StreamListener
class TweetListner(tweepy.StreamListener):
    """Twitter stream listner"""
    
    def __init__(self, csocket):
        self.clientSocket = csocket
        
    def dataProcessing(self, data):
        """Process the data, before sending to spark streaming
        """
        sendData = {}  # data that is sent to spark streamer
        user = data.get("user", {})
        name = user.get("name", "undefined").encode('utf-8')
        lang = user.get("lang", "undefined").encode('utf-8')
        sendData["name"] = name
        sendData["lang"] = lang
        #data_string = "{}:{}".format(name, followersCount) 
        self.clientSocket.send(json.dumps(sendData) + u"\n")  # append new line character, so that spark recognizes it
        logging.debug(json.dumps(sendData))
    
    def on_data(self, raw_data):
        """ Called when raw data is received from connection.
            return False to stop stream and close connection.
        """
        try:
            data = json.loads(raw_data)
            self.dataProcessing(data)
            #self.clientSocket.send(json.dumps(sendData) + u"\n") # Because the connection was breaking
            return True
        except Exception as e:
            logging.error("An unhandled exception has occured, check your data processing")
            logging.error(e)
            raise e
    
    def on_error(self, status_code):
        """Called when a non-200 status code is returned"""
        logging.error("A non-200 status code is returned: {}".format(status_code))
        return True

In [6]:
# Creating a proxy socket
def createProxySocket(host, port):
    """ Returns a socket which can be used to connect
        to spark.
    """
    try:
        s = socket.socket()   # initialize socket instance
        s.bind((host, port))  # bind to the given host and port 
        s.listen(5)           # Enable a server to accept connections.
        logging.info("Listening on the port {}".format(port))
        cSocket, address = s.accept()  # waiting for a connection
        logging.info("Received Request from: {}".format(address))
        return cSocket
    except socket.error as e:  
        if e.errno == socket.errno.EADDRINUSE:  # Address in use
            logging.error("The given host:port {}:{} is already in use"\
                         .format(host, port))
            logging.info("Trying on port: {}".format(port + 1))
            return createProxySocket(host, port + 1)

Drawbacks of twitter streaming API

The major drawback of the Streaming API is that Twitter’s Steaming API provides only a sample of tweets that are occurring. The actual percentage of total tweets users receive with Twitter’s Streaming API varies heavily based on the criteria users request and the current traffic. Studies have estimated that using Twitter’s Streaming API users can expect to receive anywhere from 1% of the tweets to over 40% of tweets in near real-time. The reason that you do not receive all of the tweets from the Twitter Streaming API is simply because Twitter doesn’t have the current infrastructure to support it, and they don’t want to; hence, the Twitter Firehose. Ref

So we will use a hack i.e. get the top trending topics and use that to filter data.

Problem with retweet count

  • Maybe you're looking in the wrong place for the value. The Streaming API is in real time. When tweets are created and streamed, their retweet_count is always zero. The only time you'll see a non-zero retweet_count in the Streaming API is for when you're streamed a tweet that represents a retweet. Those tweets have a child node called "retweeted_status" that contains the original tweet that was retweeted embedded within it. The retweet_count value attached to that node represents, roughly, the number of times that original tweet has been retweeted as of some time near when you were streamed the tweet. Retweets themselves are currently not retweetable, so should not have a non-zero retweet_count. Source: here
  • This is quite normal as it is expected when you are using streaming api endpoint, its because you receive the tweets as they are posted live on twitter platform, by the time you receive the tweet no other user had a chance to retweet it so retweet_count will always be 0. If you want to find out the retweet_count you have to refetch this particular tweet some time later using the rest api then you can see the retweet_count will contain the number of retweets happened till this particular point in time. Source: here

In [7]:
if __name__ == "__main__":
    try:
        api, auth = connectToTwitter()  # connecting to twitter
        # Global information is available by using 1 as the WOEID
        # woeid = getWOEIDForTrendsAvailable(api, "Worldwide")  # get the woeid of the worldwide
        
        host = "localhost"
        port = 8500
        cSocket = createProxySocket(host, port)  # Creating a socket
        
        while True:
            try:
                # Connect/reconnect the stream
                tweetStream = tweepy.Stream(auth, TweetListner(cSocket))  # Stream the twitter data
                # DON'T run this approach async or you'll just create a ton of streams!
                tweetStream.filter(track=["iphone", "iPhone", "iphoneX", "iphonex"])                  # Filter on trending topics
            except IncompleteRead:
                # Oh well, reconnect and keep trucking
                continue
            except KeyboardInterrupt:
                # Or however you want to exit this loop
                tweetStream.disconnect()
                break
            except Exception as e:
                logging.error("Unhandled exception has occured")
                logging.error(e)
                continue
                       
    except KeyboardInterrupt:                   # Keyboard interrupt called
        logging.error("KeyboardInterrupt was hit")
    except Exception as e:
        logging.error("Unhandled exception has occured")
        logging.error(e)