Divolte Example: Processing with Spark

This notebook contains some examples for working with data collected by the Divolte Collector. Cells should be executed in the order they are listed in the notebook.

We demonstrate the following 3 things:

  1. Counting the total number of events in the log files.
  2. Displaying an arbitrary event.
  3. Determinging the ID of the session with the most events, along with the first 10 events from that session.

For comparison, these are also demonstrated in the standalone python and scala examples.

Start with the modules that we need for the rest of the notebook:

In [1]:
from datetime import datetime
import json

Configure the location of the logs that will be processed:

In [2]:
HDFS_URI = "/divolte/published/*.avro"

Create the RDD containing all records. (Hadoop files are always read as an RDD of key/value pairs. Avro files contain only keys, however, so we immediately discard the values.)

In [3]:
events_rdd = sc.newAPIHadoopFile(
    keyConverter='io.divolte.spark.pyspark.avro.AvroWrapperToJavaConverter').map(lambda (k,v): k)

We are going to process the RDD several times, so cache the original set in cluster memory so it doesn't have to be loaded each time.

In [4]:

PythonRDD[3] at RDD at PythonRDD.scala:43

Calculate the total number of events.

In [5]:


Display an event.

Get the first event in our dataset (which isn't ordered yet).

In [6]:
an_event = events_rdd.take(1)
print json.dumps(an_event, indent=2)

    "detectedCorruption": false, 
    "userAgentOsVendor": "Apple Computer, Inc.", 
    "referer": "http://localhost:8000/", 
    "eventType": "pageView", 
    "userAgentOsFamily": "OS X", 
    "viewportPixelHeight": 918, 
    "userAgentName": "Chrome", 
    "userAgentString": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.111 Safari/537.36", 
    "location": "http://localhost:8000/overview-summary.html", 
    "screenPixelHeight": 1027, 
    "userAgentVendor": "Google Inc.", 
    "firstInSession": true, 
    "userAgentDeviceCategory": "Personal computer", 
    "userAgentType": "Browser", 
    "timestamp": 1415027725428, 
    "viewportPixelWidth": 852, 
    "screenPixelWidth": 1676, 
    "detectedDuplicate": false, 
    "userAgentFamily": "Chrome", 
    "remoteHost": "0:0:0:0:0:0:0:1", 
    "pageViewId": "0:tvxtMmlPrQFT4Hv6aYcvxyiW0jITYAH9", 
    "userAgentOsVersion": "10.10.0", 
    "sessionId": "0:i21yz0pu:fKpuWbx5~cgW~8rLt4FYbLRWZFxJCDXf", 
    "userAgentVersion": "38.0.2125.111", 
    "partyId": "0:i21yz0pu:b4fxNa7rNBlk8Vvp4G2H0tgW76u2wrGf"

Longest Session

Find the session with the most events.

In [7]:
(longest_session_id, longest_session_count) = events_rdd \
        .map(lambda event: (event['sessionId'], 1)) \
        .reduceByKey(lambda x,y: x + y) \
        .reduce(lambda x,y: max(x, y, key=lambda (e, c): c))
print "Session with id '%s' has the most events: %d" % (longest_session_id, longest_session_count)

Session with id '0:i21yoxph:7NXOXIW3ohe5V76fEk48owG2cohlu2Fx' has the most events: 15

Next, define a function that we can use to format timestamps.

In [8]:
def timestamp_to_string(ts):
    return datetime.fromtimestamp(ts / 1000.0).strftime('%Y-%m-%d %H:%M:%S')

Finally for the session with the most events, find the first 10 events.

In [9]:
first_events = events_rdd \
    .filter(lambda event: event['sessionId'] == longest_session_id) \
    .map(lambda event: (event['location'], event['timestamp'])) \
    .takeOrdered(10, lambda event: event[1])
print "\n".join(["%s: %s" % (timestamp_to_string(ts), location) for (location, ts) in first_events])

2014-11-03 16:12:05: http://localhost:8000/overview-summary.html
2014-11-03 16:12:25: http://localhost:8000/org/apache/commons/lang3/CharEncoding.html
2014-11-03 16:12:37: http://localhost:8000/org/apache/commons/lang3/time/DateFormatUtils.html
2014-11-03 16:12:56: http://localhost:8000/org/apache/commons/lang3/SerializationUtils.html
2014-11-03 16:13:07: http://localhost:8000/org/apache/commons/lang3/builder/StandardToStringStyle.html
2014-11-03 16:13:18: http://localhost:8000/org/apache/commons/lang3/builder/ToStringBuilder.html
2014-11-03 16:13:20: http://localhost:8000/org/apache/commons/lang3/tuple/Triple.html
2014-11-03 16:13:33: http://localhost:8000/org/apache/commons/lang3/text/WordUtils.html
2014-11-03 16:13:39: http://localhost:8000/org/apache/commons/lang3/BooleanUtils.html
2014-11-03 16:13:46: http://localhost:8000/org/apache/commons/lang3/BitField.html


This notebook is licensed under the terms of the Apache License, Version 2.0.