In [1]:
from moztelemetry import Dataset
from pyspark.sql import Row
from pyspark.sql.types import BooleanType, LongType
import pandas as pd
import pyspark.sql.functions as F
import datetime as dt
sc.setLogLevel("INFO")
Define util funcs
In [2]:
def utc2date(seconds):
"""
Takes unix time in seconds and returns a string representation
"""
utc = dt.datetime(1970, 1, 1)
try:
return dt.datetime.strftime(utc + dt.timedelta(seconds=seconds), format='%Y%m%d')
except:
return None
def shield_data(x):
"""
Grabs the data reported by the shield add-on
"""
return x.get("payload", {}).get("data", {}).get("attributes", {})
def _cast(col, f):
if col != 'null':
try:
return f(col)
except:
pass
return
_bool = lambda x: True if x == 'true' else False
castLong = F.udf(lambda x: _cast(x, long), LongType())
castBool = F.udf(lambda x: _cast(x, _bool), BooleanType())
def collapse_fields(x):
"""
Collapsed nested field names
and returns a flattened object as a
PySpark Row to prepare for DataFrame
conversion
"""
if x is None:
x = {}
data = x.get("payload", {}).get("data").get("attributes", {})
addons= x.get("environment", {}).get("addons", {}).get("activeAddons", {})
result = Row(
client_id=x.get("clientId"),
locale=x.get("environment", {}).get("settings", {}).get("locale"),
branch=x.get("payload", {}).get("branch"),
addon_id=data.get("addon_id"),
clicked_button=data.get("clickedButton"),
creation_date=x.get("creationDate"),
ping_type=data.get("pingType"),
saw_popup=data.get("sawPopup"),
src=data.get("srcURI"),
start_time_utc=data.get("startTime"),
dwell_time=data.get("aboutAddonsActiveTabSeconds"),
discopane_loaded=data.get("discoPaneLoaded"),
submission_date_s3=x.get("meta").get("submissionDate"),
current_addons=[i for i in addons if \
not addons[i].get('isSystem', True) and \
not addons[i].get('foreignInstall', True)]
)
return result
Define study dates in string and unix format
In [5]:
START_DATE_STR = "20180312"
END_DATE_STR = "20180423"
print("study start date: " + START_DATE_STR + "\n" + "study end date: " + END_DATE_STR)
Load raw pings from experiment
In [ ]:
# load all taar pings from our adjusted start date of 20171008
taarv2_pings = (
Dataset.from_source("telemetry")
.where(docType="shield-study-addon")
.where(submissionDate=lambda x: x >= START_DATE_STR and x <= END_DATE_STR)
.records(sc)
.filter(lambda x: x.get("payload", {}).get("study_name") == "TAARExperimentV2")
.filter(lambda x: x.get("payload", {}).get("addon_version") == "1.0.13")
.filter(lambda x: x.get("payload", {}).get("testing") == False)
)
Convert pings to a structured spark DataFrame
In [ ]:
# sampleRatio infers schema from first 0.1% of rows
taarv2_DF = taarnet-mozaws-prod-us-west-2-pipeline-analysisv2_pings.map(collapse_fields).toDF(sampleRatio=0.001)
Cast non-string columns to the appropriate type
In [10]:
bool_cols = [
'discopane_loaded',
'clicked_button',
'saw_popup',
]
long_cols = [
'start_time_utc',
'dwell_time',
]
for b in bool_cols:
taarv2_DF = taarv2_DF.withColumn(b, castBool(b))
for l in long_cols:
taarv2_DF = taarv2_DF.withColumn(l, castLong(l))
In [11]:
taarv2_DF.printSchema()
Write to S3, partitioning by branch, since most subsequent queries will involve aggregating by this field
In [12]:
S3_PATH = 's3://net-mozaws-prod-us-west-2-pipeline-analysis/taarv2/'
(
taarv2_DF
.repartition(1)
.write
.partitionBy('branch')
.mode("overwrite")
.parquet(S3_PATH)
)
In [13]:
# verify
t = sqlContext.read.parquet(S3_PATH)
print "n records:", t.count()
print "n clients:", t.select('client_id').distinct().count()
sd = t.select(F.min("submission_date_s3"),
F.max('submission_date_s3'))
print sd.collect()
In [ ]: