In [9]:
import datetime as dt
import pandas as pd
import ujson as json
from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client
%pylab inline
Let's collect some data that can occur in multiple pings per client per day. We'll need to aggregate by client+day, then dump the data.
In [10]:
update_channel = "nightly"
now = dt.datetime.now()
start = now - dt.timedelta(3)
end = now - dt.timedelta(1)
pings = get_pings(sc, app="Fennec", channel=update_channel,
submission_date=(start.strftime("%Y%m%d"), end.strftime("%Y%m%d")),
build_id=("20100101000000", "99999999999999"),
fraction=1)
subset = get_pings_properties(pings, ["meta/clientId",
"meta/documentId",
"meta/submissionDate",
"payload/UIMeasurements"])
Take the set of pings, make sure we have actual clientIds and remove duplicate pings.
In [11]:
def dedupe_pings(rdd):
return rdd.filter(lambda p: p["meta/clientId"] is not None)\
.map(lambda p: (p["meta/clientId"] + p["meta/documentId"], p))\
.reduceByKey(lambda x, y: x)\
.map(lambda x: x[1])
subset = dedupe_pings(subset)
print subset.first()
We're going to dump each event from the pings. Do a little empty data sanitization so we don't get NoneType errors during the dump. We create a JSON array of active experiments as part of the dump.
In [12]:
def safe_str(obj):
""" return the byte string representation of obj """
if obj is None:
return unicode("")
return unicode(obj)
def transform(ping):
output = []
clientId = ping["meta/clientId"] # Should not be None since we filter those out
submissionDate = ping["meta/submissionDate"] # Added via the ingestion process so should not be None
events = ping["payload/UIMeasurements"]
if events:
for event in events:
if event["type"] == "event":
# Force all fields to strings
timestamp = safe_str(event["timestamp"])
action = safe_str(event["action"])
method = safe_str(event["method"])
# The extras is an optional field
extras = unicode("")
if "extras" in event and event["extras"] is not None:
extras = safe_str(event["extras"])
sessions = {}
experiments = []
for session in event["sessions"]:
if "experiment.1:" in session:
experiments.append(safe_str(session[13:]))
elif "firstrun.1:" in session:
sessions[unicode("firstrun")] = 1
elif "awesomescreen.1:" in session:
sessions[unicode("awesomescreen")] = 1
elif "reader.1:" in session:
sessions[unicode("reader")] = 1
output.append([clientId, submissionDate, timestamp, action, method, extras, json.dumps(sessions.keys())], json.dumps(experiments)])
return output
rawEvents = subset.flatMap(transform)
print "Raw count: " + str(rawEvents.count())
print rawEvents.first()
The data can have duplicate events, due to a bug in the data collection that was fixed (bug 1246973). We still need to de-dupe the events. Because pings can be archived on device and submitted on later days, we can't assume dupes only happen on the same submission day. We don't use submission date when de-duping.
In [13]:
def dedupe_events(rdd):
return rdd.map(lambda p: (p[0] + p[2] + p[3] + p[4], p))\
.reduceByKey(lambda x, y: x)\
.map(lambda x: x[1])
uniqueEvents = dedupe_events(rawEvents)
print "Unique count: " + str(uniqueEvents.count())
print uniqueEvents.first()
Output the set of events
In [14]:
grouped = pd.DataFrame(uniqueEvents.collect(), columns=["clientid", "submissiondate", "timestamp", "action", "method", "extras", "sessions", "experiments"])
!mkdir -p ./output
grouped.to_csv("./output/fennec-events-" + update_channel + "-" + end.strftime("%Y%m%d") + ".csv", index=False, encoding="utf-8")
s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mfinkle/android_events"
s3_output += "/v1/channel=" + update_channel + "/end_date=" + end.strftime("%Y%m%d")
grouped = sqlContext.createDataFrame(transformed, ["clientid", "submissiondate", "timestamp", "action", "method", "extras", "sessions", "experiments"])
grouped.saveAsParquetFile(s3_output)