This notebook contains some examples for working with data collected by the Divolte Collector. Cells should be executed in the order they are listed in the notebook.
We demonstrate the following 3 things:
For comparison, these are also demonstrated in the standalone python and scala examples.
Start with the modules that we need for the rest of the notebook:
In [1]:
from datetime import datetime
import json
Configure the location of the logs that will be processed:
In [2]:
HDFS_URI = "/divolte/published/*.avro"
Create the RDD containing all records. (Hadoop files are always read as an RDD of key/value pairs. Avro files contain only keys, however, so we immediately discard the values.)
In [3]:
events_rdd = sc.newAPIHadoopFile(
HDFS_URI,
'org.apache.avro.mapreduce.AvroKeyInputFormat',
'org.apache.avro.mapred.AvroKey',
'org.apache.hadoop.io.NullWritable',
keyConverter='io.divolte.spark.pyspark.avro.AvroWrapperToJavaConverter').map(lambda (k,v): k)
We are going to process the RDD several times, so cache the original set in cluster memory so it doesn't have to be loaded each time.
In [4]:
events_rdd.cache()
Out[4]:
In [5]:
events_rdd.count()
Out[5]:
Get the first event in our dataset (which isn't ordered yet).
In [6]:
an_event = events_rdd.take(1)
print json.dumps(an_event, indent=2)
Find the session with the most events.
In [7]:
(longest_session_id, longest_session_count) = events_rdd \
.map(lambda event: (event['sessionId'], 1)) \
.reduceByKey(lambda x,y: x + y) \
.reduce(lambda x,y: max(x, y, key=lambda (e, c): c))
print "Session with id '%s' has the most events: %d" % (longest_session_id, longest_session_count)
Next, define a function that we can use to format timestamps.
In [8]:
def timestamp_to_string(ts):
return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')
Finally for the session with the most events, find the first 10 events.
In [9]:
first_events = events_rdd \
.filter(lambda event: event['sessionId'] == longest_session_id) \
.map(lambda event: (event['location'], event['timestamp'])) \
.takeOrdered(10, lambda event: event[1])
print "\n".join(["%s: %s" % (timestamp_to_string(ts), location) for (location, ts) in first_events])