In [9]:
import datetime as dt
import pandas as pd
import ujson as json

from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client

%pylab inline


Populating the interactive namespace from numpy and matplotlib

Let's collect some data that can occur in multiple pings per client per day. We'll need to aggregate by client+day, then dump the data.


In [10]:
update_channel = "nightly"
now = dt.datetime.now()
start = now - dt.timedelta(3)
end = now - dt.timedelta(1)

pings = get_pings(sc, app="Fennec", channel=update_channel,
                  submission_date=(start.strftime("%Y%m%d"), end.strftime("%Y%m%d")),
                  build_id=("20100101000000", "99999999999999"),
                  fraction=1)

subset = get_pings_properties(pings, ["meta/clientId",
                                      "meta/documentId",
                                      "meta/submissionDate",
                                      "payload/UIMeasurements"])

Take the set of pings, make sure we have actual clientIds and remove duplicate pings.


In [11]:
def dedupe_pings(rdd):
    return rdd.filter(lambda p: p["meta/clientId"] is not None)\
              .map(lambda p: (p["meta/clientId"] + p["meta/documentId"], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])

subset = dedupe_pings(subset)
print subset.first()


{'meta/submissionDate': u'20160221', 'payload/UIMeasurements': {}, 'meta/clientId': u'1811cff2-165d-48e3-b9e5-07c8514ed231', 'meta/documentId': u'a2d66214-1a51-45c9-9da3-f9fbb7d03155'}

We're going to dump each event from the pings. Do a little empty data sanitization so we don't get NoneType errors during the dump. We create a JSON array of active experiments as part of the dump.


In [12]:
def safe_str(obj):
    """ return the byte string representation of obj """
    if obj is None:
        return unicode("")
    return unicode(obj)

def transform(ping):    
    output = []

    clientId = ping["meta/clientId"] # Should not be None since we filter those out
    submissionDate = ping["meta/submissionDate"] # Added via the ingestion process so should not be None

    events = ping["payload/UIMeasurements"]
    if events:
        for event in events:
            if event["type"] == "event":
                # Force all fields to strings
                timestamp = safe_str(event["timestamp"])
                action = safe_str(event["action"])
                method = safe_str(event["method"])

                # The extras is an optional field
                extras = unicode("")
                if "extras" in event and event["extras"] is not None:
                    extras = safe_str(event["extras"])

                sessions = {}
                experiments = []
                for session in event["sessions"]:
                    if "experiment.1:" in session:
                        experiments.append(safe_str(session[13:]))
                    elif "firstrun.1:" in session:
                        sessions[unicode("firstrun")] = 1
                    elif "awesomescreen.1:" in session:
                        sessions[unicode("awesomescreen")] = 1
                    elif "reader.1:" in session:
                        sessions[unicode("reader")] = 1

                output.append([clientId, submissionDate, timestamp, action, method, extras, json.dumps(sessions.keys())], json.dumps(experiments)])

    return output

rawEvents = subset.flatMap(transform)
print "Raw count: " + str(rawEvents.count())
print rawEvents.first()


Raw count: 171166
[u'cf5d926c-5b22-487f-bc44-c9cd7b9b903a', u'20160222', u'787307114', u'loadurl.1', u'intent', u'tabqueue-now', '[]']

The data can have duplicate events, due to a bug in the data collection that was fixed (bug 1246973). We still need to de-dupe the events. Because pings can be archived on device and submitted on later days, we can't assume dupes only happen on the same submission day. We don't use submission date when de-duping.


In [13]:
def dedupe_events(rdd):
    return rdd.map(lambda p: (p[0] + p[2] + p[3] + p[4], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])

uniqueEvents = dedupe_events(rawEvents)
print "Unique count: " + str(uniqueEvents.count())
print uniqueEvents.first()


Unique count: 87211
[u'7aa6a32f-a70c-4082-9b9c-59acee278ae4', u'20160223', u'9596560', u'show.1', u'system', u'application-background', '[]']

Output the set of events


In [14]:
grouped = pd.DataFrame(uniqueEvents.collect(), columns=["clientid", "submissiondate", "timestamp", "action", "method", "extras", "sessions", "experiments"])
!mkdir -p ./output
grouped.to_csv("./output/fennec-events-" + update_channel + "-" + end.strftime("%Y%m%d") + ".csv", index=False, encoding="utf-8")


s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mfinkle/android_events"
s3_output += "/v1/channel=" + update_channel + "/end_date=" + end.strftime("%Y%m%d") 
grouped = sqlContext.createDataFrame(transformed, ["clientid", "submissiondate", "timestamp", "action", "method", "extras", "sessions", "experiments"])
grouped.saveAsParquetFile(s3_output)