Processing Twitter Data


In [1]:
import atexit
import os
import platform
import sys

os.environ["SPARK_HOME"] = '/usr/hdp/2.4.2.0-258/spark'
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))


###sys.path.append('/usr/hdp/2.4.2.0-258/spark/python')

import py4j

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.storagelevel import StorageLevel


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.5 (default, Nov 20 2015 02:00:19)
SparkContext available as sc, HiveContext available as sqlContext.

In [4]:
sc.version


Out[4]:
u'1.6.1'

In [6]:
tweets = sc.textFile("/user/molnar/data/election2012/cache-117000000.json.gz")
tweets.count()


Out[6]:
1000000

In [5]:
import json

In [16]:
tw = json.loads(tweets.take(1)[0])
#tw[u'user'][u'screen_name']
#json.loads(tw[0])[u'entities'].keys()
#json.loads(tw[0])['entities']['hashtags']
tw['entities'].keys()


Out[16]:
[u'user_mentions', u'hashtags', u'urls']

In [7]:
def extract_hash(tw):
    try:
        return json.loads(tw)['entities']['hashtags']
    except:
        return ()
    
hashtags = tweets.flatMap(extract_hash).map(lambda x: (x['text'], 1))
hashtags.take(10)


Out[7]:
[(u'HAPPYBIRTHDAYAVALANNA', 1),
 (u'badgers', 1),
 (u'WHUFC', 1),
 (u'OOMF', 1),
 (u'UnFilmQuiMaMarqu\xe9', 1),
 (u'brfc', 1),
 (u'lufc', 1),
 (u'bcfc', 1),
 (u'Panthers', 1),
 (u'jobs', 1)]

In [8]:
%%time
topcounts = hashtags.reduceByKey(lambda a, b: a+b)


CPU times: user 8.91 ms, sys: 7.78 ms, total: 16.7 ms
Wall time: 59.1 ms

In [9]:
%%time
topcounts.map(lambda (a,b): (b, a)).sortByKey(0,1).take(10)


CPU times: user 18.1 ms, sys: 7.99 ms, total: 26.1 ms
Wall time: 1min 30s
Out[9]:
[(8854, u'Iran'),
 (6283, u'tcot'),
 (6197, u'ReligiousFreedom'),
 (6196, u'Nadarkhani'),
 (4890, u'Israel'),
 (3647, u'Obama'),
 (2436, u'ImACeleb'),
 (2392, u'EMAVoteOneDirection'),
 (2374, u'Gaza'),
 (2361, u'Pakistan')]

Now let's try all them ... this might take a while


In [10]:
tweets = sc.textFile("/user/molnar/data/election2012/cache-*.json.gz")
hashtags = tweets.flatMap(extract_hash).map(lambda x: (x['text'], 1))
topcounts = hashtags.reduceByKey(lambda a, b: a+b)

In [11]:
%%time
topcounts.map(lambda (a,b): (b, a)).sortByKey(0,1).take(20)


CPU times: user 75.8 ms, sys: 126 ms, total: 202 ms
Wall time: 23min 48s
Out[11]:
[(1341607, u'tcot'),
 (1199847, u'Obama'),
 (703426, u'Pakistan'),
 (598698, u'Romney'),
 (534551, u'p2'),
 (492539, u'Iran'),
 (397206, u'teaparty'),
 (354935, u'obama'),
 (293350, u'black'),
 (289500, u'Israel'),
 (264352, u'Sandy'),
 (248773, u'news'),
 (196201, u'tlot'),
 (194760, u'RomneyRyan2012'),
 (184762, u'Obama2012'),
 (178192, u'USA'),
 (166043, u'media'),
 (165223, u'GOP'),
 (165206, u'gop'),
 (164156, u'Benghazi')]

In [ ]:


In [ ]:


In [ ]:


In [ ]: