In [1]:
from pyspark import SparkContext
sc = SparkContext()

Import data from the file


In [2]:
import json
rdd = sc.textFile("stratahadoop-BCN-2014.json").map(lambda tweet: json.loads(tweet))

In [3]:
rdd.count()


Out[3]:
8485

filter tweets that were created during the event


In [4]:
import pytz
from datetime import datetime, timedelta

cet = pytz.timezone("CET")
utc = pytz.timezone("UTC")

EVENT_START = datetime(2014, 11, 19, 9, 0, 0, tzinfo=cet).astimezone(utc)
EVENT_END = datetime(2014, 11, 21, 19, 0, 0, tzinfo=cet).astimezone(utc)

def created_during_event(tweet):
    """
    Return True if tweet has been created exactly during the current event.

    """
    created_at = datetime.strptime(tweet["created_at"], "%a %b %d %H:%M:%S +0000 %Y")
    return EVENT_START <= utc.localize(created_at) <= EVENT_END

In [5]:
rdd.filter(created_during_event).count()


Out[5]:
5686

filter tweets that were created near the conference's venue


In [6]:
GEOBOX = (41.4083817701,2.2177593708, 41.4106115707, 2.220836401)

def has_geo(tweet):
    """
    Return True if tweet has geo information.

    """
    if not tweet["coordinates"]:
        return False
    return True

def close_to_venue(tweet):
    """
    Return True if tweet has been created inside the geobox around venue.

    """
    lon, lat = tweet["coordinates"]["coordinates"]
    top_lat, top_lon, bottom_lat, bottom_lon = GEOBOX
    if lat < top_lat or lat > bottom_lat:
        return False
    if lon < top_lon or lon > bottom_lon:
        return False
    return True

In [7]:
rdd.filter(has_geo).count()


Out[7]:
277

In [8]:
rdd.filter(created_during_event).filter(has_geo).count()


Out[8]:
271

In [9]:
rdd.filter(created_during_event).filter(has_geo).filter(close_to_venue).count()


Out[9]:
173

cache


In [10]:
rdd_during_event = rdd.filter(created_during_event).cache()

hastags


In [11]:
from operator import add

hashtags_during_event = rdd_during_event\
    .flatMap(lambda tweet: [h['text'].lower() for h in tweet['entities']['hashtags']])\
    .map(lambda hashtag: (hashtag, 1))\
    .reduceByKey(add)\
    .map(lambda (hashtag, cnt): (cnt, hashtag))\
    .sortByKey()
hashtags_during_event.top(25)


Out[11]:
[(5689, u'stratahadoop'),
 (500, u'bigdata'),
 (187, u'spark'),
 (170, u'hadoop'),
 (112, u'elasticsearch'),
 (103, u'datascience'),
 (82, u'iot'),
 (81, u'strataconf'),
 (76, u'ignite'),
 (71, u'ioth'),
 (70, u'eudatalandscape'),
 (66, u'barcelona'),
 (57, u'sparkcamp'),
 (47, u'data'),
 (36, u'analytics'),
 (30, u'nosql'),
 (25, u'velocityconf'),
 (23, u'stratabarcelona'),
 (23, u'dataviz'),
 (22, u'privacy'),
 (22, u'opendata'),
 (21, u'livestream'),
 (20, u'ibm'),
 (20, u'changetheratio'),
 (20, u'bluemix')]

authors


In [12]:
authors_during_event = rdd_during_event\
    .map(lambda tweet: (tweet['user']['screen_name'], 1))\
    .reduceByKey(add)\
    .map(lambda (name, cnt): (cnt, name))\
    .sortByKey()
authors_during_event.top(25)


Out[12]:
[(609, u'strataconf'),
 (87, u'jbaquerot'),
 (80, u'stevenbeeckman'),
 (77, u'trieloff'),
 (65, u'eudatalandscape'),
 (62, u'duncan3ross'),
 (59, u'kate_ting'),
 (57, u'cgogus'),
 (56, u'andreademarco'),
 (52, u'juantomas'),
 (51, u'kestelyn'),
 (51, u'ClouderaEvents'),
 (50, u'jbenno'),
 (48, u'peeterskris'),
 (44, u'indizen_insight'),
 (43, u'pacoid'),
 (42, u'DaanDebie'),
 (41, u'ch_doig'),
 (40, u'NoSQLDigest'),
 (39, u'DatenAssistance'),
 (38, u'ManfredBo'),
 (36, u'mounialalmas'),
 (35, u'adrianafreitas'),
 (34, u'AureliePols'),
 (33, u'mmaibaum')]

In [14]:
def is_not_retweet(tweet):
    if tweet['text'].startswith('RT '):
        return False
    return not tweet.get('retweeted_status')

real_authors_during_event = rdd_during_event\
    .filter(is_not_retweet)\
    .map(lambda tweet: (tweet['user']['screen_name'], 1))\
    .reduceByKey(add)\
    .map(lambda (name, cnt): (cnt, name))\
    .sortByKey()
real_authors_during_event.top(25)


Out[14]:
[(153, u'strataconf'),
 (57, u'trieloff'),
 (48, u'kestelyn'),
 (48, u'eudatalandscape'),
 (47, u'kate_ting'),
 (47, u'jbenno'),
 (47, u'duncan3ross'),
 (44, u'indizen_insight'),
 (42, u'jbaquerot'),
 (38, u'stevenbeeckman'),
 (38, u'peeterskris'),
 (31, u'DaanDebie'),
 (25, u'ClouderaEvents'),
 (21, u'mmaibaum'),
 (21, u'miguelmalvarez'),
 (21, u'adrianafreitas'),
 (21, u'BMCControlM'),
 (19, u'BartAelterman'),
 (17, u'zahedab'),
 (17, u'padolphs'),
 (17, u'CapdevilaPujol'),
 (16, u'natalinobusa'),
 (16, u'kimknilsson'),
 (16, u'boudicca'),
 (16, u'PeterSpeyer')]

histogram


In [15]:
def get_bucket(date):
    """
    Generate bucket name based on the date.
    """
    created_at = datetime.strptime(date, "%a %b %d %H:%M:%S +0000 %Y")
    return utc.localize(created_at).astimezone(cet).strftime("%Y%m%d%H")

rdd.filter(created_during_event)\
    .map(lambda tweet: (get_bucket(tweet["created_at"]), 1))\
    .reduceByKey(add)\
    .sortByKey(ascending=True)\
    .collect()


Out[15]:
[('2014111909', 89),
 ('2014111910', 73),
 ('2014111911', 82),
 ('2014111912', 106),
 ('2014111913', 39),
 ('2014111914', 78),
 ('2014111915', 79),
 ('2014111916', 80),
 ('2014111917', 92),
 ('2014111918', 136),
 ('2014111919', 28),
 ('2014111920', 21),
 ('2014111921', 29),
 ('2014111922', 16),
 ('2014111923', 21),
 ('2014112000', 8),
 ('2014112001', 14),
 ('2014112002', 5),
 ('2014112003', 1),
 ('2014112004', 4),
 ('2014112006', 8),
 ('2014112007', 11),
 ('2014112008', 52),
 ('2014112009', 274),
 ('2014112010', 594),
 ('2014112011', 309),
 ('2014112012', 259),
 ('2014112013', 136),
 ('2014112014', 283),
 ('2014112015', 140),
 ('2014112016', 195),
 ('2014112017', 215),
 ('2014112018', 114),
 ('2014112019', 67),
 ('2014112020', 45),
 ('2014112021', 39),
 ('2014112022', 49),
 ('2014112023', 29),
 ('2014112100', 34),
 ('2014112101', 12),
 ('2014112102', 13),
 ('2014112103', 8),
 ('2014112104', 4),
 ('2014112105', 19),
 ('2014112106', 11),
 ('2014112107', 9),
 ('2014112108', 54),
 ('2014112109', 204),
 ('2014112110', 432),
 ('2014112111', 252),
 ('2014112112', 91),
 ('2014112113', 96),
 ('2014112114', 142),
 ('2014112115', 112),
 ('2014112116', 147),
 ('2014112117', 150),
 ('2014112118', 76)]

authors profile pictures


In [16]:
rdd.map(lambda tweet: (tweet['user']['screen_name'], tweet['user']['profile_image_url']))\
    .map(lambda user: (user, 1))\
    .reduceByKey(add)\
    .filter(lambda (user, count): count > 10)\
    .map(lambda (user, count): user)\
    .takeSample(False, 10)


Out[16]:
[(u'voukka',
  u'http://pbs.twimg.com/profile_images/535453618116517888/gId7m6kz_normal.jpeg'),
 (u'jpalmaer',
  u'http://pbs.twimg.com/profile_images/521514481/johanpalmaer_normal.jpg'),
 (u'semanticfire',
  u'http://pbs.twimg.com/profile_images/1490549910/bart_normal.jpg'),
 (u'JMateosGarcia',
  u'http://pbs.twimg.com/profile_images/501752501599625218/zFsUIvJ3_normal.png'),
 (u'MapR_EMEA',
  u'http://pbs.twimg.com/profile_images/432031537257664512/-0rh4XDS_normal.png'),
 (u'timoreilly',
  u'http://pbs.twimg.com/profile_images/2823681988/f4f6f2bed8ab4d5a48dea4b9ea85d5f1_normal.jpeg'),
 (u'DaniScherer',
  u'http://pbs.twimg.com/profile_images/520402903295467521/8yCjgSU7_normal.jpeg'),
 (u'BartAelterman',
  u'http://pbs.twimg.com/profile_images/1450705749/Bart2_normal.jpg'),
 (u'Datarella',
  u'http://pbs.twimg.com/profile_images/378800000539612812/55cc6003e421ebeb5f143a89f2197c8a_normal.jpeg'),
 (u'zillioninfotech',
  u'http://pbs.twimg.com/profile_images/527137027242733569/wpAhvC_W_normal.png')]

In [ ]: