In [34]:
data = sc.textFile("/Users/jhaddad/dev/killranalytics/kafka.txt")
import json

parsed = data.flatMap(lambda x: x.split("\n") ).map(json.loads)

In [35]:
print sample.collect()


[u'{"site_id": "02559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/whatever.js"}', u'{"site_id": "02559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/index.html"}', u'{"site_id": "02559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/index.html"}', u'{"site_id": "02559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/index.html"}', u'{"site_id": "02559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/index.html"}', u'{"site_id": "02559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/archive.php"}', u'{"site_id": "02559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/something.css"}', u'{"site_id": "12559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/archive.php"}', u'{"site_id": "22559c4f-ec20-4579-b2ca-72922a90d0df", "page": "/archive.php"}']

In [38]:
#summed = parsed.map(lambda event: (event['site_id'], 1)).\
#                reduceByKey(lambda x,y: x + y).\
#                map(lambda x: {"site_id": x[0], "ts": str(uuid1()), "pageviews": x[1]})
        
# aggregate by site and page?
parsed.map(lambda event: ((event['site_id'], event['page']), 1) ).reduceByKey(lambda x,y: x + y)


Out[38]:
[((u'02559c4f-ec20-4579-b2ca-72922a90d0df', u'/index.html'), 4),
 ((u'22559c4f-ec20-4579-b2ca-72922a90d0df', u'/archive.php'), 1),
 ((u'02559c4f-ec20-4579-b2ca-72922a90d0df', u'/archive.php'), 1),
 ((u'02559c4f-ec20-4579-b2ca-72922a90d0df', u'/something.css'), 1),
 ((u'12559c4f-ec20-4579-b2ca-72922a90d0df', u'/archive.php'), 1),
 ((u'02559c4f-ec20-4579-b2ca-72922a90d0df', u'/whatever.js'), 1)]

In [22]:


In [ ]: