In [1]:
import atexit
import os
import platform
import sys
os.environ["SPARK_HOME"] = '/usr/hdp/2.4.2.0-258/spark'
spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.1-src.zip'))
execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
###sys.path.append('/usr/hdp/2.4.2.0-258/spark/python')
import py4j
import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.storagelevel import StorageLevel
In [4]:
sc.version
Out[4]:
In [6]:
tweets = sc.textFile("/user/molnar/data/election2012/cache-117000000.json.gz")
tweets.count()
Out[6]:
In [5]:
import json
In [16]:
tw = json.loads(tweets.take(1)[0])
#tw[u'user'][u'screen_name']
#json.loads(tw[0])[u'entities'].keys()
#json.loads(tw[0])['entities']['hashtags']
tw['entities'].keys()
Out[16]:
In [7]:
def extract_hash(tw):
try:
return json.loads(tw)['entities']['hashtags']
except:
return ()
hashtags = tweets.flatMap(extract_hash).map(lambda x: (x['text'], 1))
hashtags.take(10)
Out[7]:
In [8]:
%%time
topcounts = hashtags.reduceByKey(lambda a, b: a+b)
In [9]:
%%time
topcounts.map(lambda (a,b): (b, a)).sortByKey(0,1).take(10)
Out[9]:
Now let's try all them ... this might take a while
In [10]:
tweets = sc.textFile("/user/molnar/data/election2012/cache-*.json.gz")
hashtags = tweets.flatMap(extract_hash).map(lambda x: (x['text'], 1))
topcounts = hashtags.reduceByKey(lambda a, b: a+b)
In [11]:
%%time
topcounts.map(lambda (a,b): (b, a)).sortByKey(0,1).take(20)
Out[11]:
In [ ]:
In [ ]:
In [ ]:
In [ ]: