Parsing Tweets

This notebooks is used for parsing tweets from the twitter API and extracting the images for those tweets that contain them using pyspark. Then using the s3 api we post the images to S3 bucket for further processing.

Requirements

  • PySpark
  • Anaconda Python
  • AWS Boto Python module
  • DIGITS Webserver running with a model for classification
  • Numpy

In [ ]:
import json
import io
import zipfile
import pycurl
from io import BytesIO
import pycurl
from io import BytesIO
import boto
import numpy as np
import pycurl
import shutil
import urllib
from StringIO import StringIO
import json
import boto

In [ ]:
# Parameters that need to be set 

aws_access_key_id = 'my_access_key_id'
aws_secret_access_key = 'mysecret_access_key'
deep_learning_digits_server_ip = 'digits_ip'

# We start with some tweets that exist on HDFS. You can
# collect your own tweets using the twitter API and 
# use them as a starting point.

# Read in json text files of tweets
tweetsRdd = sc.textFile("combined/*")

In [ ]:
# Utility functions
#print the json in a pretty way
def pretty(d, indent=0):
   for key, value in d.iteritems():
      print '\t' * indent + str(key)
      if isinstance(value, dict):
         pretty(value, indent+1)
      #else:
      #   print '\t' * (indent+1) + str(value)
def getMediaUrl(obj):
    items = []
    for item in getNestedKeyValue(obj, ["media"]):
        items.append([obj['id'],getNestedKeyValue(item, ["media_url"])])
    return items

#find an item in the dictionary
def _finditem(obj, key):
    if key in obj: return obj[key]
    for k, v in obj.items():
        if isinstance(v,dict):
            item = _finditem(v, key)
            if item is not None:
                return item    

#return an item the 
def getNestedKeyValue(obj, keys):
    for num in range(len(keys)):
        if isinstance(obj, type({})):
            obj = _finditem(obj, keys[num])
        if obj is None:
            return ""
    return obj

In [ ]:
def getCoordinates(obj):
    x = getNestedKeyValue(obj, ["location", "geo", "coordinates"])
    if x == "" or x is None:
        return ""

    x = x[0]
    return [[(x[0][0] + x[2][0])/2, (x[0][1] + x[1][1])/2]]

In [ ]:
def posturlS3(tweetid_imgurlpair, aws_access_key_id=None, aws_secret_access_key=None):
    bucketid = 'none'
    tweetid = tweetid_imgurlpair[0]
    imgurl = tweetid_imgurlpair[1]
    
    ext = "."+ imgurl.split('.')[-1]
    iduniq = tweetid.split(":")[-1]
    
    buffer = BytesIO()
    c = pycurl.Curl()
    c.setopt(c.URL,  imgurl)
    c.setopt(c.WRITEDATA, buffer)
    c.perform()
    #c.close()
    
    s3c = boto.connect_s3(aws_access_key_id, aws_secret_access_key)
    bucket = s3c.lookup(bucketid)
    key = bucket.key_class(b,iduniq+ext)
    key.set_contents_from_string(buffer.getvalue())
    return [c.getinfo(c.RESPONSE_CODE)]

In [ ]:
#function to query the GPU server for the classification category parse the JSON and add the tweet ID
def getClassificationJson(tweetid, job_id='20150706-203919-1597', serverip=deep_learning_digits_server_ip):
    c = pycurl.Curl()
    imageurl = "    digitsurl = "http://%s/models/images/classification/classify_one.json"% (serverip)
    buf = StringIO()
    c.setopt(c.URL, digitsurl)
    c.setopt(c.POSTFIELDS, \
             urllib.urlencode((('job_id',job_id),('image_url',imageurl))))
    c.setopt(c.WRITEDATA, buf)
    c.setopt(c.POST, 1)
    c.perform()
    print "%s?%s" % (digitsurl,urllib.urlencode((('job_id',job_id),('image_url',imageurl))))
    print c.getinfo(c.RESPONSE_CODE)
    if c.getinfo(c.RESPONSE_CODE) == 200:
        jdict = json.loads(buf.getvalue())
        jdict['tweet_id'] = tweetid
        return [json.dumps(jdict)]
    else:
        return []

In [ ]:
#function to query the GPU server for the classification category parse the JSON and add the tweet ID
def getHardhatClassificationJson(tweetid):
    hardhatmodel_job_id = '20151030-223956-5b04'
    return getClassificationJson(tweetid, job_id=hardhatmodel_job_id)

Tweet Parsing


In [ ]:
#map the parse the json objects in the tweets
jsondict = tweetsRdd.map(lambda x: json.loads(x))

In [ ]:
#map the tweet IDs
tweetIDMapRDD = jsondict.map(lambda x:  x['id'].split(":")[-1])

In [ ]:
# get the list of image urls from the twitter posts
mediaRdd = jsondict.flatMap(lambda x: getMediaUrl(x))

# post the images to an S3 bucket for ease of access
responses = mediaRdd.flatMap(lambda x: posturlS3(x) ).collect()

In [ ]:
responsearray = np.array(responses)

In [ ]:
# print diagnostics about the http responses for gathering the twitter images

print "Number of correct responses: ", sum(responsearray==200), "\n", "Number of redirects (307):",\
    sum(responsearray==307),"\nNumber of forbidden (403): ", sum(responsearray==403),\
    "\nNumber of page not found (404): ", sum(responsearray==404),\
    "\nNumber of service unavailable (503): ", sum(responsearray==503)

In [ ]:
tweetIDMapRDD = jsondict.map(lambda x:  x['id'].split(":")[-1])

In [ ]:
classifyJsonHardhatRDD = tweetIDMapRDD.flatMap(lambda x: getHardhatClassificationJson(x))

In [ ]:
classifyJsonRDD = tweetIDMapRDD.flatMap(lambda x: getClassificationJson(x))

In [ ]:
classifyJsonRDD.saveAsTextFile("tweetPredictions.txt")

In [ ]:
classifyJsonHardhatRDD.saveAsTextFile("tweetPredictionsHardHat.txt")