In [ ]:
import datetime as dt
import urllib2
import ujson as json
from os import environ
%pylab inline
Get the time when this job was started (for debugging purposes).
In [ ]:
starttime = dt.datetime.now()
starttime
Declare the channel to look at.
In [ ]:
channel_to_process = "release"
In [ ]:
sc.defaultParallelism
In [ ]:
today_env_str = environ.get("date", None)
# Uncomment the next two lines and adjust |today| as necessary to run manually
#today = dt.date.today()
#today_env_str = today.strftime("%Y%m%d")
# Comment out the next two lines to run manually
assert (today_env_str is not None), "The date environment parameter is missing."
today = dt.datetime.strptime(today_env_str, "%Y%m%d").date()
# Find the date of last Wednesday to get the proper 7 day range
last_wednesday = today
current_weekday = today.weekday()
if (current_weekday < 2):
last_wednesday -= (dt.timedelta(days=5) + dt.timedelta(days=current_weekday))
if (current_weekday > 2):
last_wednesday -= (dt.timedelta(days=current_weekday) - dt.timedelta(days=2))
min_range = last_wednesday - dt.timedelta(days=17)
report_date_str = last_wednesday.strftime("%Y%m%d")
min_range_str = min_range.strftime("%Y%m%d")
list([last_wednesday, min_range_str, report_date_str])
The longitudinal dataset can be accessed as a Spark DataFrame, which is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python.
In [ ]:
sql_str = "SELECT * FROM longitudinal_v" + today_env_str
frame = sqlContext.sql(sql_str)
sql_str
Restrict the dataframe to the desired channel.
In [ ]:
channel_subset = frame.filter(frame.normalized_channel == channel_to_process)
Restrict the dataframe to the desired data.
In [ ]:
data_subset = channel_subset.select("subsession_start_date",
"subsession_length",
"update_check_code_notify",
"update_check_no_update_notify",
"build.version",
"settings.update.enabled")
%time data_subset.count()
Restrict the data to the proper 7 day range, starting at least 17 days before the creation date of the longitudinal dataset.
In [ ]:
def start_date_filter(d):
try:
date = dt.datetime.strptime(d.subsession_start_date[0][:10], "%Y-%m-%d").date()
return min_range <= date
except ValueError:
return False
except TypeError:
return False
date_filtered = data_subset.rdd.filter(start_date_filter).cache()
%time date_filtered.count()
Analyze the data to determine the number of users on a current version of Firefox vs. a version that's out of date. A "user on a current version" is defined as being either on the version found in the version.txt file on hg.mozilla.org, or the two versions just prior to it. Versions prior to FF 42 are ignored since unified telemetry was not turned on by default on earlier versions.
In [ ]:
latest_version = urllib2.urlopen("http://hg.mozilla.org/releases/mozilla-" + channel_to_process + "/raw-file/tip/browser/config/version.txt").read()
latest_version = int(latest_version.split(".")[0])
def status_mapper(d):
try:
if d.version is None or d.version[0] is None:
return ("none-version", d)
curr_version = int(d.version[0].split(".")[0])
if curr_version < 42:
return ("ignore-version-too-low", d)
if curr_version < latest_version - 2:
# Check if the user ran a particular orphaned version of Firefox for at least 2 hours in
# the last 12 weeks. An orphaned user is running a version of Firefox that's at least 3
# versions behind the current version. This means that an update has been available for
# at least 12 weeks. 2 hours so most systems have had a chance to perform an update
# check, download the update, and restart Firefox after the update has been downloaded.
seconds = 0
curr_version = d.version[0]
index = 0
twelve_weeks_ago = last_wednesday - dt.timedelta(weeks=12)
while seconds < 7200 and index < len(d.version) and d.version[index] == curr_version:
try:
date = dt.datetime.strptime(d.subsession_start_date[index][:10], "%Y-%m-%d").date()
if date < twelve_weeks_ago:
return ("out-of-date-not-run-long-enough", d)
seconds += d.subsession_length[index]
index += 1
except ValueError:
index += 1
except TypeError:
index += 1
if seconds >= 7200:
return ("out-of-date", d)
return ("out-of-date-not-run-long-enough", d)
return ("up-to-date", d)
except ValueError:
return ("value-error", d)
statuses = date_filtered.map(status_mapper).cache()
up_to_date_results = statuses.countByKey()
up_to_date_json_results = json.dumps(up_to_date_results, ensure_ascii=False)
up_to_date_json_results
For people who are out-of-date, determine how many of them have updates disabled:
In [ ]:
out_of_date_statuses = statuses.filter(lambda p: "out-of-date" in p)
def update_disabled_mapper(d):
status, ping = d
if ping is None or ping.enabled is None or ping.enabled[0] is None:
return ("none-update-enabled", ping)
if ping.enabled[0] == True:
return ("update-enabled", ping)
return ("update-disabled", ping)
update_enabled_disabled_statuses = out_of_date_statuses.map(update_disabled_mapper)
update_enabled_disabled_results = update_enabled_disabled_statuses.countByKey()
update_enabled_disabled_json_results = json.dumps(update_enabled_disabled_results, ensure_ascii=False)
update_enabled_disabled_json_results
Focus on orphaned users who have updates enabled.
In [ ]:
update_enabled_statuses = update_enabled_disabled_statuses.filter(lambda p: "update-enabled" in p).cache()
For people who are out-of-date and have updates enabled, determine the distribution across Firefox versions.
In [ ]:
def version_mapper(d):
status, ping = d
return (ping.version[0], ping)
orphaned_by_versions = update_enabled_statuses.map(version_mapper)
orphaned_by_versions_results = orphaned_by_versions.countByKey()
orphaned_by_versions_json_results = json.dumps(orphaned_by_versions_results, ensure_ascii=False)
orphaned_by_versions_json_results
For people who are out-of-date and have updates enabled, determine what the update check returns.
In [ ]:
def update_check_code_notify_mapper(d):
status, ping = d
if ping is None or ping.update_check_code_notify is None:
return -1
for check_code in ping.update_check_code_notify:
counter = -1
for i in check_code:
counter += 1
if i != 0:
return counter
if ping.update_check_no_update_notify is not None and ping.update_check_no_update_notify[0] > 0:
return 0;
return -1
update_check_code_notify_statuses = update_enabled_statuses.map(update_check_code_notify_mapper)
update_check_code_notify_results = update_check_code_notify_statuses.countByValue()
update_check_code_notify_json_results = json.dumps(update_check_code_notify_results, ensure_ascii=False)
update_check_code_notify_json_results
Write results to JSON.
In [ ]:
latest_version_object = {"latest-version": latest_version}
up_to_date_object = {"up-to-date": up_to_date_results}
update_enabled_disabled_object = {"update-enabled-disabled": update_enabled_disabled_results}
update_check_code_notify_object = {"update-check-code-notify": update_check_code_notify_results}
orphaned_by_versions_object = {"orphaned-by-versions": orphaned_by_versions_results}
final_results = [up_to_date_object, update_enabled_disabled_object, update_check_code_notify_object, latest_version_object, orphaned_by_versions_object]
final_results_json = json.dumps(final_results, ensure_ascii=False)
final_results_json
Finally, store the output in the local directory to be uploaded automatically once the job completes. The file will be stored at:
https://analysis-output.telemetry.mozilla.org/SPARKJOBNAME/data/FILENAME
In [ ]:
filename = "./output/" + report_date_str + ".json"
with open(filename, 'w') as f:
f.write(final_results_json)
filename
Get the time when this job ended (for debugging purposes):
In [ ]:
endtime = dt.datetime.now()
endtime
In [ ]:
difference = endtime - starttime
difference
In [ ]: