Overall Firefox Engagement Ratio

Compute the Engagement Ratio for the overall Firefox population as described in Bug 1240849. The resulting data is shown on the Firefox Dashboard, and the more granular MAU and DAU values can be viewed via the Diagnostic Data Viewer.

The actual Daily Active Users (DAU) and Monthly Active Users (MAU) computations are defined in standards.py in the python_moztelemetry repo.


In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from datetime import datetime as _datetime, timedelta, date
import boto3
import botocore
import csv
import os.path

bucket = "telemetry-parquet"
prefix = "main_summary/v3"
%time dataset = sqlContext.read.load("s3://{}/{}".format(bucket, prefix), "parquet")


CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 21.9 s

How many cores are we running on?


In [2]:
sc.defaultParallelism


Out[2]:
640

And what do the underlying records look like?


In [3]:
dataset.printSchema()


root
 |-- document_id: string (nullable = false)
 |-- client_id: string (nullable = true)
 |-- sample_id: integer (nullable = true)
 |-- channel: string (nullable = true)
 |-- normalized_channel: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- os: string (nullable = true)
 |-- os_version: string (nullable = true)
 |-- os_service_pack_major: string (nullable = true)
 |-- os_service_pack_minor: string (nullable = true)
 |-- profile_creation_date: integer (nullable = true)
 |-- subsession_start_date: string (nullable = true)
 |-- subsession_length: integer (nullable = true)
 |-- distribution_id: string (nullable = true)
 |-- submission_date: string (nullable = false)
 |-- sync_configured: boolean (nullable = true)
 |-- sync_count_desktop: integer (nullable = true)
 |-- sync_count_mobile: integer (nullable = true)
 |-- app_build_id: string (nullable = true)
 |-- app_display_version: string (nullable = true)
 |-- app_name: string (nullable = true)
 |-- app_version: string (nullable = true)
 |-- timestamp: long (nullable = false)
 |-- env_build_id: string (nullable = true)
 |-- env_build_version: string (nullable = true)
 |-- env_build_arch: string (nullable = true)
 |-- e10s_enabled: boolean (nullable = true)
 |-- e10s_cohort: string (nullable = true)
 |-- locale: string (nullable = true)
 |-- active_experiment_id: string (nullable = true)
 |-- active_experiment_branch: string (nullable = true)
 |-- reason: string (nullable = true)
 |-- timezone_offset: integer (nullable = true)
 |-- plugin_hangs: integer (nullable = true)
 |-- aborts_plugin: integer (nullable = true)
 |-- aborts_content: integer (nullable = true)
 |-- aborts_gmplugin: integer (nullable = true)
 |-- crashes_detected_plugin: integer (nullable = true)
 |-- crashes_detected_content: integer (nullable = true)
 |-- crashes_detected_gmplugin: integer (nullable = true)
 |-- crash_submit_attempt_main: integer (nullable = true)
 |-- crash_submit_attempt_content: integer (nullable = true)
 |-- crash_submit_attempt_plugin: integer (nullable = true)
 |-- crash_submit_success_main: integer (nullable = true)
 |-- crash_submit_success_content: integer (nullable = true)
 |-- crash_submit_success_plugin: integer (nullable = true)
 |-- active_addons_count: integer (nullable = true)
 |-- flash_version: string (nullable = true)
 |-- vendor: string (nullable = true)
 |-- is_default_browser: boolean (nullable = true)
 |-- default_search_engine_data_name: string (nullable = true)
 |-- loop_activity_open_panel: integer (nullable = true)
 |-- loop_activity_open_conversation: integer (nullable = true)
 |-- loop_activity_room_open: integer (nullable = true)
 |-- loop_activity_room_share: integer (nullable = true)
 |-- loop_activity_room_delete: integer (nullable = true)
 |-- devtools_toolbox_opened_count: integer (nullable = true)
 |-- search_counts: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- engine: string (nullable = false)
 |    |    |-- source: string (nullable = false)
 |    |    |-- count: long (nullable = false)
 |-- submission_date_s3: string (nullable = true)

We want to incrementally update the data, re-computing any values that are missing or for which data is still arriving. Define that logic here.


In [4]:
def fmt(the_date, date_format="%Y%m%d"):
    return _datetime.strftime(the_date, date_format)

# Our calculations look for activity date reported within
# a certain time window. If that window has passed, we do
# not need to re-compute data for that period.
def should_be_updated(record,
        target_col="day",
        generated_col="generated_on",
        date_format="%Y%m%d"):
    target = _datetime.strptime(record[target_col], date_format)
    generated = _datetime.strptime(record[generated_col], date_format)
    
    # Don't regenerate data that was already updated today.
    today = fmt(_datetime.utcnow(), date_format)
    if record[generated_col] >= today:
        return False
    
    diff = generated - target
    return diff.days <= 10


from moztelemetry.standards import filter_date_range, count_distinct_clientids

# Similar to the version in standards.py, but uses subsession_start_date
# instead of activityTimestamp
def dau(dataframe, target_day, future_days=10, date_format="%Y%m%d"):
    """Compute Daily Active Users (DAU) from the Executive Summary dataset.
    See https://bugzilla.mozilla.org/show_bug.cgi?id=1240849
    """
    target_day_date = _datetime.strptime(target_day, date_format)
    min_activity = _datetime.strftime(target_day_date, "%Y-%m-%d")
    max_activity = _datetime.strftime(target_day_date + timedelta(1), "%Y-%m-%d")
    act_col = dataframe.subsession_start_date

    min_submission = target_day
    max_submission_date = target_day_date + timedelta(future_days)
    max_submission = _datetime.strftime(max_submission_date, date_format)
    sub_col = dataframe.submission_date_s3

    filtered = filter_date_range(dataframe, act_col, min_activity, max_activity,
        sub_col, min_submission, max_submission)
    return count_distinct_clientids(filtered)

# Similar to the version in standards.py, but uses subsession_start_date
# instead of activityTimestamp
def mau(dataframe, target_day, past_days=28, future_days=10, date_format="%Y%m%d"):
    """Compute Monthly Active Users (MAU) from the Executive Summary dataset.
    See https://bugzilla.mozilla.org/show_bug.cgi?id=1240849
    """
    target_day_date = _datetime.strptime(target_day, date_format)

    # Compute activity over `past_days` days leading up to target_day
    min_activity_date = target_day_date - timedelta(past_days)
    min_activity = _datetime.strftime(min_activity_date, "%Y-%m-%d")
    max_activity = _datetime.strftime(target_day_date + timedelta(1), "%Y-%m-%d")
    act_col = dataframe.subsession_start_date

    min_submission = _datetime.strftime(min_activity_date, date_format)
    max_submission_date = target_day_date + timedelta(future_days)
    max_submission = _datetime.strftime(max_submission_date, date_format)
    sub_col = dataframe.submission_date_s3

    filtered = filter_date_range(dataframe, act_col, min_activity, max_activity,
        sub_col, min_submission, max_submission)
    return count_distinct_clientids(filtered)

# Identify all missing days, or days that have not yet passed
# the "still reporting in" threshold (as of 2016-03-17, that is
# defined as 10 days).
def update_engagement_csv(dataset, old_filename, new_filename, 
                          cutoff_days=30, date_format="%Y%m%d"):
    cutoff_date = _datetime.utcnow() - timedelta(cutoff_days)
    cutoff = fmt(cutoff_date, date_format)
    print "Cutoff date: {}".format(cutoff)

    fields = ["day", "dau", "mau", "generated_on"]

    should_write_header = True
    potential_updates = {}
    # Carry over rows we won't touch
    if os.path.exists(old_filename):
        with open(old_filename) as csv_old:
            reader = csv.DictReader(csv_old)
            with open(new_filename, "w") as csv_new:
                writer = csv.DictWriter(csv_new, fields)
                writer.writeheader()
                should_write_header = False
                for row in reader:
                    if row['day'] < cutoff:
                        writer.writerow(row)
                    else:
                        potential_updates[row['day']] = row

    with open(new_filename, "a") as csv_new:
        writer = csv.DictWriter(csv_new, fields)
        if should_write_header:
            writer.writeheader()

        for i in range(cutoff_days, 0, -1):
            target_day = fmt(_datetime.utcnow() - timedelta(i), date_format)
            if target_day in potential_updates and not should_be_updated(potential_updates[target_day]):
                # It's fine as-is.
                writer.writerow(potential_updates[target_day])
            else:
                # Update it.
                print "We should update data for {}".format(target_day)
                record = {"day": target_day, "generated_on": fmt(_datetime.utcnow(), date_format)}
                print "Starting dau {} at {}".format(target_day, _datetime.utcnow())
                record["dau"] = dau(dataset, target_day)
                print "Finished dau {} at {}".format(target_day, _datetime.utcnow())
                print "Starting mau {} at {}".format(target_day, _datetime.utcnow())
                record["mau"] = mau(dataset, target_day)
                print "Finished mau {} at {}".format(target_day, _datetime.utcnow())
                writer.writerow(record)


Unable to parse whitelist (/home/hadoop/anaconda2/lib/python2.7/site-packages/moztelemetry/histogram-whitelists.json). Assuming all histograms are acceptable.

Fetch existing data from S3

Attempt to fetch an existing data file from S3. If found, update it incrementally. Otherwise, re-compute the entire dataset.


In [5]:
from boto3.s3.transfer import S3Transfer
data_bucket = "net-mozaws-prod-us-west-2-pipeline-analysis"
engagement_basename = "engagement_ratio.csv"
new_engagement_basename = "engagement_ratio.{}.csv".format(_datetime.strftime(_datetime.utcnow(), "%Y%m%d"))
s3path = "mreid/maudau"
engagement_key = "{}/{}".format(s3path, engagement_basename)

client = boto3.client('s3', 'us-west-2')
transfer = S3Transfer(client)

try:
    transfer.download_file(data_bucket, engagement_key, engagement_basename)
except botocore.exceptions.ClientError as e:
    # If the file wasn't there, that's ok. Otherwise, abort!
    if e.response['Error']['Code'] != "404":
        raise e
    else:
        print "Did not find an existing file at '{}'".format(engagement_key)

In [6]:
# reorganize dataset
dataset = dataset.select(dataset.client_id.alias('clientId'), 'subsession_start_date', 'submission_date_s3')

In [7]:
update_engagement_csv(dataset, engagement_basename, new_engagement_basename)


Cutoff date: 20160411

Update data on S3

Now we have an updated dataset on the local filesystem.

Since it is so tiny, we keep a date-stamped backup of each dataset in addition to the "latest" file.

Upload the updated file back to S3, as well as relaying it to the S3 bucket that automatically relays to the dashboard server. This final upload appears in the Firefox Dashboard data dir as engagement_ratio.csv.


In [8]:
## Upload the updated csv file to S3

# Update the day-specific file:
new_s3_name = "{}/{}".format(s3path, new_engagement_basename)
transfer.upload_file(new_engagement_basename, data_bucket, new_s3_name)

# Update the "main" file
transfer.upload_file(new_engagement_basename, data_bucket, engagement_key)

# Update the dashboard file
dash_bucket = "net-mozaws-prod-metrics-data"
dash_s3_name = "firefox-dashboard/{}".format(engagement_basename)
transfer.upload_file(new_engagement_basename, dash_bucket, dash_s3_name,
                     extra_args={'ACL': 'bucket-owner-full-control'})