Compute the Engagement Ratio for the overall Firefox population as described in Bug 1240849. The resulting data is shown on the Firefox Dashboard, and the more granular MAU and DAU values can be viewed via the Diagnostic Data Viewer.
The actual Daily Active Users (DAU) and Monthly Active Users (MAU) computations are defined in standards.py in the python_moztelemetry repo.
In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from datetime import datetime as _datetime, timedelta, date
import boto3
import botocore
import csv
import os.path
bucket = "telemetry-parquet"
prefix = "main_summary/v3"
%time dataset = sqlContext.read.load("s3://{}/{}".format(bucket, prefix), "parquet")
How many cores are we running on?
In [2]:
sc.defaultParallelism
Out[2]:
And what do the underlying records look like?
In [3]:
dataset.printSchema()
We want to incrementally update the data, re-computing any values that are missing or for which data is still arriving. Define that logic here.
In [4]:
def fmt(the_date, date_format="%Y%m%d"):
return _datetime.strftime(the_date, date_format)
# Our calculations look for activity date reported within
# a certain time window. If that window has passed, we do
# not need to re-compute data for that period.
def should_be_updated(record,
target_col="day",
generated_col="generated_on",
date_format="%Y%m%d"):
target = _datetime.strptime(record[target_col], date_format)
generated = _datetime.strptime(record[generated_col], date_format)
# Don't regenerate data that was already updated today.
today = fmt(_datetime.utcnow(), date_format)
if record[generated_col] >= today:
return False
diff = generated - target
return diff.days <= 10
from moztelemetry.standards import filter_date_range, count_distinct_clientids
# Similar to the version in standards.py, but uses subsession_start_date
# instead of activityTimestamp
def dau(dataframe, target_day, future_days=10, date_format="%Y%m%d"):
"""Compute Daily Active Users (DAU) from the Executive Summary dataset.
See https://bugzilla.mozilla.org/show_bug.cgi?id=1240849
"""
target_day_date = _datetime.strptime(target_day, date_format)
min_activity = _datetime.strftime(target_day_date, "%Y-%m-%d")
max_activity = _datetime.strftime(target_day_date + timedelta(1), "%Y-%m-%d")
act_col = dataframe.subsession_start_date
min_submission = target_day
max_submission_date = target_day_date + timedelta(future_days)
max_submission = _datetime.strftime(max_submission_date, date_format)
sub_col = dataframe.submission_date_s3
filtered = filter_date_range(dataframe, act_col, min_activity, max_activity,
sub_col, min_submission, max_submission)
return count_distinct_clientids(filtered)
# Similar to the version in standards.py, but uses subsession_start_date
# instead of activityTimestamp
def mau(dataframe, target_day, past_days=28, future_days=10, date_format="%Y%m%d"):
"""Compute Monthly Active Users (MAU) from the Executive Summary dataset.
See https://bugzilla.mozilla.org/show_bug.cgi?id=1240849
"""
target_day_date = _datetime.strptime(target_day, date_format)
# Compute activity over `past_days` days leading up to target_day
min_activity_date = target_day_date - timedelta(past_days)
min_activity = _datetime.strftime(min_activity_date, "%Y-%m-%d")
max_activity = _datetime.strftime(target_day_date + timedelta(1), "%Y-%m-%d")
act_col = dataframe.subsession_start_date
min_submission = _datetime.strftime(min_activity_date, date_format)
max_submission_date = target_day_date + timedelta(future_days)
max_submission = _datetime.strftime(max_submission_date, date_format)
sub_col = dataframe.submission_date_s3
filtered = filter_date_range(dataframe, act_col, min_activity, max_activity,
sub_col, min_submission, max_submission)
return count_distinct_clientids(filtered)
# Identify all missing days, or days that have not yet passed
# the "still reporting in" threshold (as of 2016-03-17, that is
# defined as 10 days).
def update_engagement_csv(dataset, old_filename, new_filename,
cutoff_days=30, date_format="%Y%m%d"):
cutoff_date = _datetime.utcnow() - timedelta(cutoff_days)
cutoff = fmt(cutoff_date, date_format)
print "Cutoff date: {}".format(cutoff)
fields = ["day", "dau", "mau", "generated_on"]
should_write_header = True
potential_updates = {}
# Carry over rows we won't touch
if os.path.exists(old_filename):
with open(old_filename) as csv_old:
reader = csv.DictReader(csv_old)
with open(new_filename, "w") as csv_new:
writer = csv.DictWriter(csv_new, fields)
writer.writeheader()
should_write_header = False
for row in reader:
if row['day'] < cutoff:
writer.writerow(row)
else:
potential_updates[row['day']] = row
with open(new_filename, "a") as csv_new:
writer = csv.DictWriter(csv_new, fields)
if should_write_header:
writer.writeheader()
for i in range(cutoff_days, 0, -1):
target_day = fmt(_datetime.utcnow() - timedelta(i), date_format)
if target_day in potential_updates and not should_be_updated(potential_updates[target_day]):
# It's fine as-is.
writer.writerow(potential_updates[target_day])
else:
# Update it.
print "We should update data for {}".format(target_day)
record = {"day": target_day, "generated_on": fmt(_datetime.utcnow(), date_format)}
print "Starting dau {} at {}".format(target_day, _datetime.utcnow())
record["dau"] = dau(dataset, target_day)
print "Finished dau {} at {}".format(target_day, _datetime.utcnow())
print "Starting mau {} at {}".format(target_day, _datetime.utcnow())
record["mau"] = mau(dataset, target_day)
print "Finished mau {} at {}".format(target_day, _datetime.utcnow())
writer.writerow(record)
In [5]:
from boto3.s3.transfer import S3Transfer
data_bucket = "net-mozaws-prod-us-west-2-pipeline-analysis"
engagement_basename = "engagement_ratio.csv"
new_engagement_basename = "engagement_ratio.{}.csv".format(_datetime.strftime(_datetime.utcnow(), "%Y%m%d"))
s3path = "mreid/maudau"
engagement_key = "{}/{}".format(s3path, engagement_basename)
client = boto3.client('s3', 'us-west-2')
transfer = S3Transfer(client)
try:
transfer.download_file(data_bucket, engagement_key, engagement_basename)
except botocore.exceptions.ClientError as e:
# If the file wasn't there, that's ok. Otherwise, abort!
if e.response['Error']['Code'] != "404":
raise e
else:
print "Did not find an existing file at '{}'".format(engagement_key)
In [6]:
# reorganize dataset
dataset = dataset.select(dataset.client_id.alias('clientId'), 'subsession_start_date', 'submission_date_s3')
In [7]:
update_engagement_csv(dataset, engagement_basename, new_engagement_basename)
Now we have an updated dataset on the local filesystem.
Since it is so tiny, we keep a date-stamped backup of each dataset in addition to the "latest" file.
Upload the updated file back to S3, as well as relaying it to the S3 bucket that automatically relays to the dashboard server. This final upload appears in the Firefox Dashboard data dir as engagement_ratio.csv.
In [8]:
## Upload the updated csv file to S3
# Update the day-specific file:
new_s3_name = "{}/{}".format(s3path, new_engagement_basename)
transfer.upload_file(new_engagement_basename, data_bucket, new_s3_name)
# Update the "main" file
transfer.upload_file(new_engagement_basename, data_bucket, engagement_key)
# Update the dashboard file
dash_bucket = "net-mozaws-prod-metrics-data"
dash_s3_name = "firefox-dashboard/{}".format(engagement_basename)
transfer.upload_file(new_engagement_basename, dash_bucket, dash_s3_name,
extra_args={'ACL': 'bucket-owner-full-control'})