In [2]:
import pyspark.sql.functions as F
import datetime as dt
import ast
import boto3
import json
import re

In [3]:
DATA_LOCATION = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/taar-api-logs-daily/"

In [4]:
# Parse the TAAR application logs from s3 source.
taar_logs = sqlContext\
    .read.format("com.databricks.spark.csv")\
    .option("header", "true")\
    .option("inferschema", "true")\
    .option("mode", "DROPMALFORMED")\
    .load(DATA_LOCATION)

In [5]:
# Display log file schema.
print(taar_logs.schema)
# Display one exampel row of log data.
print("\n" + str(taar_logs.take(1)))


StructType(List(StructField(timestamp,StringType,true),StructField(severity,IntegerType,true),StructField(type,StringType,true),StructField(fields,StringType,true),StructField(date,StringType,true)))

[Row(timestamp=u'2018-03-30 00:00:23.000', severity=6, type=u'taar.recommenders.ensemble_recommender', fields=u"{message=client_id: [00000000-0000-0000-0000-000000000000], ensemble_weight: [{'similarity': 0.09216174, 'collaborative': 2.16759527, 'legacy': 0.05516607, 'locale': 2.09866473}], guids: [['uBlock0@raymondhill.net', '{73a6fe31-595d-460b-a920-fcc0f8843232}', 'firefox@ghostery.com', 'firefoxdav@icloud.com', 'ich@maltegoetz.de', 'idsafe@norton.com', 'nortonsafeweb@symantec.com', '{d04b0b40-3dab-4f0b-97a6-04ec3eddbfb0}', 'artur.dubovoy@gmail.com', '{a0d7ccb3-214d-498b-b4aa-0e8fda9a7bf7}']], recommender=null, client_id=null, lang=null, limit=null, num_recommendations=null, maximum_similarity=null}", date=u'2018-03-29')]

In [6]:
# Convert text timestamp to actual timestamp object.
time_format = "yyyy-MM-dd HH:mm:ss.SSS"
taar_logs_timestamps = taar_logs.withColumn("parsed_time", F.to_timestamp("timestamp", time_format)
    .cast("double")
    .cast("timestamp")).drop("timestamp")

print(taar_logs_timestamps.schema)
print("\n")
print(taar_logs_timestamps.take(1))


StructType(List(StructField(severity,IntegerType,true),StructField(type,StringType,true),StructField(fields,StringType,true),StructField(date,StringType,true),StructField(parsed_time,TimestampType,true)))


[Row(severity=6, type=u'taar.recommenders.ensemble_recommender', fields=u"{message=client_id: [00000000-0000-0000-0000-000000000000], ensemble_weight: [{'similarity': 0.09216174, 'collaborative': 2.16759527, 'legacy': 0.05516607, 'locale': 2.09866473}], guids: [['uBlock0@raymondhill.net', '{73a6fe31-595d-460b-a920-fcc0f8843232}', 'firefox@ghostery.com', 'firefoxdav@icloud.com', 'ich@maltegoetz.de', 'idsafe@norton.com', 'nortonsafeweb@symantec.com', '{d04b0b40-3dab-4f0b-97a6-04ec3eddbfb0}', 'artur.dubovoy@gmail.com', '{a0d7ccb3-214d-498b-b4aa-0e8fda9a7bf7}']], recommender=null, client_id=null, lang=null, limit=null, num_recommendations=null, maximum_similarity=null}", date=u'2018-03-29', parsed_time=datetime.datetime(2018, 3, 30, 0, 0, 23))]

In [7]:
# Define a utility for writing results of this analysis to an accessible s3 bucket.
def write_to_s3(bucket_name, filename, data, aws_access_key_id=None, aws_secret_access_key=None):
    """ write list as CSV to s3
    params: bucket_name, str, name of bucket
    filename, str, name of file (prefix + file name)
    return: nothing
    """
    s3 = boto3.Session(aws_access_key_id=aws_access_key_id,
                       aws_secret_access_key=aws_secret_access_key).resource('s3')
    obj = s3.Object(bucket_name, filename)
    obj.put(Body=json.dumps(data, ensure_ascii=False).encode('utf8'))

In [8]:
def is_log_type_recommendation(r):
    return "taar.recommenders." in r["type"]
    
def is_log_type_ensemble(r):
    return "ensemble_recommender" in r["type"]

def valid_uuid_as_field(r):
  reg_comp = re.compile("[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}");
  return reg_comp.findall(r['fields'])

def manual_dedup(p):
  zes = "00000000-0000-0000-0000-000000000000"
  a = set()
  for c in p:
    if len(c) == 1:
      if c != zes:
        a |= set(c)
    else:
      for g in c:
        if g != zes:
          a |= set(g)
  uuid_list = list(a)
  return uuid_list

In [9]:
# Filter out log data from outside experiment time
# 2018-03-12 begin date
# 2018-04-18 end date
print("lines of log data for TAAR service: " + str(taar_logs_timestamps.count()))
taar_logs_time_filtered = taar_logs_timestamps.where((taar_logs_timestamps.parsed_time > dt.datetime(2018, 3, 12, 0, 0, 0)) & (taar_logs_timestamps.parsed_time < dt.datetime(2018, 4, 23, 0, 0, 0)))
print("lines of log data after date filtering to study period: " + str(taar_logs_time_filtered.count()))


lines of log data for TAAR service: 903766
lines of log data after date filtering to study period: 807734

In [10]:
# Find clients that had data retrieval failures
def is_dynamo_interaction(p):
  return 'taar.adapters.dynamo' in p["type"]

def is_client_data_fail(p):
  return "message=Error loading client data for" in p["fields"]
  
clients_with_lookup_fail = taar_logs_time_filtered.rdd\
  .filter(lambda p: is_dynamo_interaction(p))\
  .filter(lambda p: is_client_data_fail(p))\
  .map(lambda p: valid_uuid_as_field(p))

print("number of failed client lookups: " + str(clients_with_lookup_fail.count()))

unique_output_failed_lookup_clientIDs = clients_with_lookup_fail.toDF().distinct().collect()
print("post deduplication: " + str(len(unique_output_failed_lookup_clientIDs)))

# write the blacklist
write_to_s3("net-mozaws-prod-us-west-2-pipeline-analysis", "failed_dynamo_clients.csv", unique_output_failed_lookup_clientIDs)


number of failed client lookups: 24470
post deduplication: 21859

In [11]:
def is_linear_recomender(p):
  return 'taar.recommenders.recommendation_manager' in p["type"]

# Find clients successfully served by linear
client_ids_linear_serves = taar_logs_time_filtered.rdd\
  .filter(lambda p: not is_dynamo_interaction(p))\
  .filter(lambda p: not is_client_data_fail(p))\
  .filter(lambda p: is_linear_recomender(p))\
  .map(lambda p: valid_uuid_as_field(p))
  
print("number of linear taar service events: " + str(client_ids_linear_serves.count()))
unique_client_ids_linear_serves = client_ids_linear_serves.collect()

unique_client_ids_linear_serves = manual_dedup(unique_client_ids_linear_serves)
print("unique clients served by linear taar: " + str(len(unique_client_ids_linear_serves)))

write_to_s3("net-mozaws-prod-us-west-2-pipeline-analysis", "clients_served_linear.csv", unique_client_ids_linear_serves)


number of linear taar service events: 471583
unique clients served by linear taar: 175911

In [12]:
def is_ensemble_recommender(p):
  return 'recommenders.ensemble_recommender' in p["type"]

def valid_ensemble_uuid(p):
  reg_comp = re.compile("message=client_id: \\[")
  txt = reg_comp.split(p['fields'])
  return txt[1][0:36]
  
# find clients successfully served by ensemble
client_ids_ensemble_serves = taar_logs_time_filtered.rdd\
  .filter(lambda p: not is_dynamo_interaction(p))\
  .filter(lambda p: not is_client_data_fail(p))\
  .filter(lambda p: is_ensemble_recommender(p))\
  .map(lambda p: valid_ensemble_uuid(p))
    
print("number of ensemble taar service events: " + str(client_ids_ensemble_serves.count()))

unique_client_ids_ensemble_serves = list(set(client_ids_ensemble_serves.collect()))
print("unique clients served by ensemble taar: " + str(len(unique_client_ids_ensemble_serves)))

write_to_s3("net-mozaws-prod-us-west-2-pipeline-analysis", "clients_served_ensemble.csv", unique_client_ids_ensemble_serves)


number of ensemble taar service events: 287211
unique clients served by ensemble taar: 175321