In [1]:
from moztelemetry import Dataset
from pyspark.sql import Row
from pyspark.sql.types import BooleanType, LongType
import pandas as pd
import pyspark.sql.functions as F
import datetime as dt

sc.setLogLevel("INFO")

Define util funcs


In [2]:
def utc2date(seconds):
    """
    Takes unix time in seconds and returns a string representation
    """
    utc = dt.datetime(1970, 1, 1)
    try:
        return dt.datetime.strftime(utc + dt.timedelta(seconds=seconds), format='%Y%m%d')
    except:
        return None

    
def shield_data(x):
    """
    Grabs the data reported by the shield add-on 
    """
    return x.get("payload", {}).get("data", {}).get("attributes", {})


def _cast(col, f):
    if col != 'null':
        try:
            return f(col)
        except:
            pass
    return

_bool = lambda x: True if x == 'true' else False

castLong = F.udf(lambda x: _cast(x, long), LongType())
castBool = F.udf(lambda x: _cast(x, _bool), BooleanType())
    

def collapse_fields(x):
    """
    Collapsed nested field names 
    and returns a flattened object as a 
    PySpark Row to prepare for DataFrame 
    conversion
    """
    if x is None:
        x = {}
    data = x.get("payload", {}).get("data").get("attributes", {})
    addons= x.get("environment", {}).get("addons", {}).get("activeAddons", {})
    result = Row(
        client_id=x.get("clientId"),
        locale=x.get("environment", {}).get("settings", {}).get("locale"),
        branch=x.get("payload", {}).get("branch"),
        addon_id=data.get("addon_id"),
        clicked_button=data.get("clickedButton"),
        creation_date=x.get("creationDate"),
        ping_type=data.get("pingType"),
        saw_popup=data.get("sawPopup"),
        src=data.get("srcURI"),
        start_time_utc=data.get("startTime"),
        dwell_time=data.get("aboutAddonsActiveTabSeconds"),
        discopane_loaded=data.get("discoPaneLoaded"),
        submission_date_s3=x.get("meta").get("submissionDate"),
        current_addons=[i for i in addons if \
                        not addons[i].get('isSystem', True) and \
                        not addons[i].get('foreignInstall', True)]
        )
    return result

Define study dates in string and unix format


In [5]:
START_DATE_STR = "20180312"
END_DATE_STR = "20180423"
print("study start date: " + START_DATE_STR + "\n" + "study end date: " + END_DATE_STR)


study start date: 20180312
study end date: 20180423

Load raw pings from experiment


In [ ]:
# load all taar pings from our adjusted start date of 20171008
taarv2_pings = (
    Dataset.from_source("telemetry")
           .where(docType="shield-study-addon")
           .where(submissionDate=lambda x: x >= START_DATE_STR and x <= END_DATE_STR)
           .records(sc)
           .filter(lambda x: x.get("payload", {}).get("study_name") == "TAARExperimentV2")
           .filter(lambda x: x.get("payload", {}).get("addon_version") == "1.0.13")
           .filter(lambda x: x.get("payload", {}).get("testing") == False)
)

Convert pings to a structured spark DataFrame


In [ ]:
# sampleRatio infers schema from first 0.1% of rows
taarv2_DF = taarnet-mozaws-prod-us-west-2-pipeline-analysisv2_pings.map(collapse_fields).toDF(sampleRatio=0.001)

Cast non-string columns to the appropriate type


In [10]:
bool_cols = [
    'discopane_loaded',
    'clicked_button',
    'saw_popup', 
]

long_cols = [
    'start_time_utc',
    'dwell_time',
]

for b in bool_cols:
    taarv2_DF = taarv2_DF.withColumn(b, castBool(b))
    
for l in long_cols:
    taarv2_DF = taarv2_DF.withColumn(l, castLong(l))

In [11]:
taarv2_DF.printSchema()


root
 |-- addon_id: string (nullable = true)
 |-- branch: string (nullable = true)
 |-- clicked_button: boolean (nullable = true)
 |-- client_id: string (nullable = true)
 |-- creation_date: string (nullable = true)
 |-- current_addons: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- discopane_loaded: boolean (nullable = true)
 |-- dwell_time: long (nullable = true)
 |-- locale: string (nullable = true)
 |-- ping_type: string (nullable = true)
 |-- saw_popup: boolean (nullable = true)
 |-- src: string (nullable = true)
 |-- start_time_utc: long (nullable = true)
 |-- submission_date_s3: string (nullable = true)

Write to S3, partitioning by branch, since most subsequent queries will involve aggregating by this field


In [12]:
S3_PATH = 's3://net-mozaws-prod-us-west-2-pipeline-analysis/taarv2/'

(
taarv2_DF
 .repartition(1)
 .write
 .partitionBy('branch')
 .mode("overwrite")
 .parquet(S3_PATH)
)

In [13]:
# verify
t = sqlContext.read.parquet(S3_PATH)

print "n records:", t.count()
print "n clients:", t.select('client_id').distinct().count()
sd = t.select(F.min("submission_date_s3"), 
              F.max('submission_date_s3'))
print sd.collect()


n records: 8762664
n clients: 3491762
[Row(min(submission_date_s3)=u'20180312', max(submission_date_s3)=u'20180417')]

In [ ]: