In [1]:
from pyspark import SparkContext
sc = SparkContext()
In [2]:
import json
rdd = sc.textFile("stratahadoop-BCN-2014.json").map(lambda tweet: json.loads(tweet))
In [3]:
rdd.count()
Out[3]:
In [4]:
import pytz
from datetime import datetime, timedelta
cet = pytz.timezone("CET")
utc = pytz.timezone("UTC")
EVENT_START = datetime(2014, 11, 19, 9, 0, 0, tzinfo=cet).astimezone(utc)
EVENT_END = datetime(2014, 11, 21, 19, 0, 0, tzinfo=cet).astimezone(utc)
def created_during_event(tweet):
"""
Return True if tweet has been created exactly during the current event.
"""
created_at = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S +0000 %Y")
return EVENT_START <= utc.localize(created_at) <= EVENT_END
In [5]:
rdd.filter(created_during_event).count()
Out[5]:
In [6]:
GEOBOX = (41.4083817701,2.2177593708, 41.4106115707, 2.220836401)
def has_geo(tweet):
"""
Return True if tweet has geo information.
"""
if not tweet["coordinates"]:
return False
return True
def close_to_venue(tweet):
"""
Return True if tweet has been created inside the geobox around venue.
"""
lon, lat = tweet["coordinates"]["coordinates"]
top_lat, top_lon, bottom_lat, bottom_lon = GEOBOX
if lat < top_lat or lat > bottom_lat:
return False
if lon < top_lon or lon > bottom_lon:
return False
return True
In [7]:
rdd.filter(has_geo).count()
Out[7]:
In [8]:
rdd.filter(created_during_event).filter(has_geo).count()
Out[8]:
In [9]:
rdd.filter(created_during_event).filter(has_geo).filter(close_to_venue).count()
Out[9]:
In [10]:
rdd_during_event = rdd.filter(created_during_event).cache()
In [11]:
from operator import add
hashtags_during_event = rdd_during_event\
.flatMap(lambda tweet: [h['text'].lower() for h in tweet['entities']['hashtags']])\
.map(lambda hashtag: (hashtag, 1))\
.reduceByKey(add)\
.map(lambda (hashtag, cnt): (cnt, hashtag))\
.sortByKey()
hashtags_during_event.top(25)
Out[11]:
In [12]:
authors_during_event = rdd_during_event\
.map(lambda tweet: (tweet['user']['screen_name'], 1))\
.reduceByKey(add)\
.map(lambda (name, cnt): (cnt, name))\
.sortByKey()
authors_during_event.top(25)
Out[12]:
In [14]:
def is_not_retweet(tweet):
if tweet['text'].startswith('RT '):
return False
return not tweet.get('retweeted_status')
real_authors_during_event = rdd_during_event\
.filter(is_not_retweet)\
.map(lambda tweet: (tweet['user']['screen_name'], 1))\
.reduceByKey(add)\
.map(lambda (name, cnt): (cnt, name))\
.sortByKey()
real_authors_during_event.top(25)
Out[14]:
In [15]:
def get_bucket(date):
"""
Generate bucket name based on the date.
"""
created_at = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
return utc.localize(created_at).astimezone(cet).strftime("%Y%m%d%H")
rdd.filter(created_during_event)\
.map(lambda tweet: (get_bucket(tweet["created_at"]), 1))\
.reduceByKey(add)\
.sortByKey(ascending=True)\
.collect()
Out[15]:
In [16]:
rdd.map(lambda tweet: (tweet['user']['screen_name'], tweet['user']['profile_image_url']))\
.map(lambda user: (user, 1))\
.reduceByKey(add)\
.filter(lambda (user, count): count > 10)\
.map(lambda (user, count): user)\
.takeSample(False, 10)
Out[16]:
In [ ]: