In [2]:
import pyspark.sql.functions as F
import datetime as dt
import ast
import boto3
import json
import re
In [3]:
DATA_LOCATION = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/taar-api-logs-daily/"
In [4]:
# Parse the TAAR application logs from s3 source.
taar_logs = sqlContext\
.read.format("com.databricks.spark.csv")\
.option("header", "true")\
.option("inferschema", "true")\
.option("mode", "DROPMALFORMED")\
.load(DATA_LOCATION)
In [5]:
# Display log file schema.
print(taar_logs.schema)
# Display one exampel row of log data.
print("\n" + str(taar_logs.take(1)))
In [6]:
# Convert text timestamp to actual timestamp object.
time_format = "yyyy-MM-dd HH:mm:ss.SSS"
taar_logs_timestamps = taar_logs.withColumn("parsed_time", F.to_timestamp("timestamp", time_format)
.cast("double")
.cast("timestamp")).drop("timestamp")
print(taar_logs_timestamps.schema)
print("\n")
print(taar_logs_timestamps.take(1))
In [7]:
# Define a utility for writing results of this analysis to an accessible s3 bucket.
def write_to_s3(bucket_name, filename, data, aws_access_key_id=None, aws_secret_access_key=None):
""" write list as CSV to s3
params: bucket_name, str, name of bucket
filename, str, name of file (prefix + file name)
return: nothing
"""
s3 = boto3.Session(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key).resource('s3')
obj = s3.Object(bucket_name, filename)
obj.put(Body=json.dumps(data, ensure_ascii=False).encode('utf8'))
In [8]:
def is_log_type_recommendation(r):
return "taar.recommenders." in r["type"]
def is_log_type_ensemble(r):
return "ensemble_recommender" in r["type"]
def valid_uuid_as_field(r):
reg_comp = re.compile("[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}");
return reg_comp.findall(r['fields'])
def manual_dedup(p):
zes = "00000000-0000-0000-0000-000000000000"
a = set()
for c in p:
if len(c) == 1:
if c != zes:
a |= set(c)
else:
for g in c:
if g != zes:
a |= set(g)
uuid_list = list(a)
return uuid_list
In [9]:
# Filter out log data from outside experiment time
# 2018-03-12 begin date
# 2018-04-18 end date
print("lines of log data for TAAR service: " + str(taar_logs_timestamps.count()))
taar_logs_time_filtered = taar_logs_timestamps.where((taar_logs_timestamps.parsed_time > dt.datetime(2018, 3, 12, 0, 0, 0)) & (taar_logs_timestamps.parsed_time < dt.datetime(2018, 4, 23, 0, 0, 0)))
print("lines of log data after date filtering to study period: " + str(taar_logs_time_filtered.count()))
In [10]:
# Find clients that had data retrieval failures
def is_dynamo_interaction(p):
return 'taar.adapters.dynamo' in p["type"]
def is_client_data_fail(p):
return "message=Error loading client data for" in p["fields"]
clients_with_lookup_fail = taar_logs_time_filtered.rdd\
.filter(lambda p: is_dynamo_interaction(p))\
.filter(lambda p: is_client_data_fail(p))\
.map(lambda p: valid_uuid_as_field(p))
print("number of failed client lookups: " + str(clients_with_lookup_fail.count()))
unique_output_failed_lookup_clientIDs = clients_with_lookup_fail.toDF().distinct().collect()
print("post deduplication: " + str(len(unique_output_failed_lookup_clientIDs)))
# write the blacklist
write_to_s3("net-mozaws-prod-us-west-2-pipeline-analysis", "failed_dynamo_clients.csv", unique_output_failed_lookup_clientIDs)
In [11]:
def is_linear_recomender(p):
return 'taar.recommenders.recommendation_manager' in p["type"]
# Find clients successfully served by linear
client_ids_linear_serves = taar_logs_time_filtered.rdd\
.filter(lambda p: not is_dynamo_interaction(p))\
.filter(lambda p: not is_client_data_fail(p))\
.filter(lambda p: is_linear_recomender(p))\
.map(lambda p: valid_uuid_as_field(p))
print("number of linear taar service events: " + str(client_ids_linear_serves.count()))
unique_client_ids_linear_serves = client_ids_linear_serves.collect()
unique_client_ids_linear_serves = manual_dedup(unique_client_ids_linear_serves)
print("unique clients served by linear taar: " + str(len(unique_client_ids_linear_serves)))
write_to_s3("net-mozaws-prod-us-west-2-pipeline-analysis", "clients_served_linear.csv", unique_client_ids_linear_serves)
In [12]:
def is_ensemble_recommender(p):
return 'recommenders.ensemble_recommender' in p["type"]
def valid_ensemble_uuid(p):
reg_comp = re.compile("message=client_id: \\[")
txt = reg_comp.split(p['fields'])
return txt[1][0:36]
# find clients successfully served by ensemble
client_ids_ensemble_serves = taar_logs_time_filtered.rdd\
.filter(lambda p: not is_dynamo_interaction(p))\
.filter(lambda p: not is_client_data_fail(p))\
.filter(lambda p: is_ensemble_recommender(p))\
.map(lambda p: valid_ensemble_uuid(p))
print("number of ensemble taar service events: " + str(client_ids_ensemble_serves.count()))
unique_client_ids_ensemble_serves = list(set(client_ids_ensemble_serves.collect()))
print("unique clients served by ensemble taar: " + str(len(unique_client_ids_ensemble_serves)))
write_to_s3("net-mozaws-prod-us-west-2-pipeline-analysis", "clients_served_ensemble.csv", unique_client_ids_ensemble_serves)