In [ ]:
import json
import urllib2
import urllib
import math
import datetime as dt
from pyspark.sql.types import *
import boto3.s3
In [ ]:
date_format = "%Y-%m-%d"
target_date = None
DEBUG = False
def safe_str(obj):
""" Return the byte string representation of obj """
if obj is None:
return unicode("")
return unicode(obj)
def safe_long(obj):
""" Return the long representation of obj, or None """
if obj is None:
return None
return long(obj)
def safe_int(obj):
""" Return the int representation of obj, or None """
if obj is None:
return None
return int(obj)
def get_token_from_s3():
""" Get socorro token from data already saved to s3 """
data_bucket = "net-mozaws-prod-us-west-2-pipeline-analysis"
s3path = "ddurst/oom"
filename = "oom.tkn"
key = "{}/{}".format(s3path, filename)
s3 = boto3.resource('s3')
obj = s3.Object(data_bucket, key)
try:
token = obj.get()['Body'].read()
except:
token = None
return token
Because the crash-stats SuperSearch API automatically paginates, we will need to make similar requests in sequence.
In [ ]:
def get_API_data(url, params, hdrs):
""" Returns full API response via JSON """
querystring = urllib.urlencode(params, doseq=True)
full_url = url + '?' + querystring
req = urllib2.Request(url=full_url, headers=hdrs)
response = urllib2.urlopen(req)
return json.loads(response.read())
In [ ]:
def write_to_s3(lists, start_date, debug):
s3_output = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/"
s3_output += "ddurst/crash_stats_oom/v1/submission=" + start_date
schema = StructType([
StructField("uuid", StringType(), False),
StructField("date", TimestampType(), False),
StructField("signature", StringType(), False),
StructField("platform", StringType(), True),
StructField("contains_memory_report", BooleanType(), True),
StructField("oom_allocation_size", LongType(), True),
StructField("system_memory_use_percentage", IntegerType(), True),
StructField("total_virtual_memory", LongType(), True),
StructField("available_virtual_memory", LongType(), True),
StructField("total_page_file", LongType(), True),
StructField("available_page_file", LongType(), True),
StructField("total_physical_memory", LongType(), True),
StructField("available_physical_memory", LongType(), True),
StructField("largest_free_vm_block", StringType(), True),
StructField("largest_free_vm_block_int", LongType(), True),
StructField("tiny_block_size", LongType(), True),
StructField("write_combine_size", LongType(), True),
StructField("shutdown_progress", StringType(), True),
StructField("ipc_channel_error", StringType(), True),
StructField("user_comments", StringType(), True),
])
grouped = sqlContext.createDataFrame(lists, schema)
if debug:
grouped.printSchema()
else:
grouped.coalesce(1).write.parquet(s3_output)
This is the main function to setup request data and loop through the required number of pages of results for the date specified (note: it's assumed this would be run once per previous day via cron or the like).
In [ ]:
def get_crashstats_by_day(format, debug, specify_date=None):
""" Get total crashstats data for a specified date.
Will use previous day if not specified. """
if specify_date:
start_date = specify_date
else:
start_date = (dt.datetime.now() -
dt.timedelta(days=1)).strftime(format)
stop_date = (dt.datetime.strptime(start_date, format) +
dt.timedelta(days=1)).strftime(format)
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json; charset=UTF-8'
}
# Try to get the token, but if it fails try without it
token = get_token_from_s3()
if token:
headers['Auth-Token'] = token
target = 'https://crash-stats.mozilla.com/api/SuperSearch/'
per_page_default = 500
param_data = {}
param_data['release_channel'] = "beta"
param_data['process_type'] = "content"
param_data['_results_number'] = str(per_page_default)
param_data['date'] = [">=" + start_date, "<" + stop_date]
param_data['_columns'] = ["date",
"uuid",
"signature",
"oom_allocation_size",
"platform",
"contains_memory_report",
"system_memory_use_percentage",
"total_virtual_memory",
"available_virtual_memory",
"total_page_file",
"available_page_file",
"total_physical_memory",
"available_physical_memory",
"largest_free_vm_block",
"tiny_block_size",
"write_combine_size",
"shutdown_progress",
"ipc_channel_error",
"user_comments"]
reqs = 0
pages = 1
offset = 0
all_results = []
# Access each page by offset and append results to the all_results list
while (reqs < pages):
offset = reqs * per_page_default
param_data['_results_offset'] = str(offset)
data = get_API_data(target, param_data, headers)
# Determine the number of pages (only on first page request)
if (reqs == 0):
total_results = data["total"]
total_pages = (int(math.ceil(total_results/float(per_page_default))) * per_page_default) / per_page_default
if (total_pages > pages):
pages = total_pages
# Grab the 'hits' into lists
for obj in data["hits"]:
tmp = []
tmp.append(safe_str(obj['uuid']))
# Convert date
# Note: Apparently, if the dt has no nonzero microseconds, the '.%f' is omitted
# entirely, rather than using '.000000'.
try:
tmp.append(dt.datetime.strptime(obj['date'], "%Y-%m-%dT%H:%M:%S.%f+00:00"))
except:
tmp.append(dt.datetime.strptime(obj['date'], "%Y-%m-%dT%H:%M:%S+00:00"))
tmp.append(safe_str(obj['signature']))
tmp.append(safe_str(obj['platform']))
tmp.append(bool(obj['contains_memory_report']))
tmp.append(safe_long(obj['oom_allocation_size']))
tmp.append(safe_int(obj['system_memory_use_percentage']))
tmp.append(safe_long(obj['total_virtual_memory']))
tmp.append(safe_long(obj['available_virtual_memory']))
tmp.append(safe_long(obj['total_page_file']))
tmp.append(safe_long(obj['available_page_file']))
tmp.append(safe_long(obj['total_physical_memory']))
tmp.append(safe_long(obj['available_physical_memory']))
tmp.append(safe_str(obj['largest_free_vm_block']))
# Add field for non-hex largest_free_vm_block value
if obj['largest_free_vm_block'] is not None:
tmp.append(int(obj['largest_free_vm_block'], 0))
else:
tmp.append(None)
tmp.append(safe_long(obj['tiny_block_size']))
tmp.append(safe_long(obj['write_combine_size']))
tmp.append(safe_str(obj['shutdown_progress']))
tmp.append(safe_str(obj['ipc_channel_error']))
# Handle possible newlines in user_comments
if obj['user_comments'] is not None:
tmp.append(obj['user_comments'].replace("\r\n", "|").replace("\r", "|").replace("\n", "|"))
else:
tmp.append(None)
all_results.append(tmp)
reqs += 1
write_to_s3(all_results, start_date, debug)
In [ ]:
# Sometimes, when re-running this manually, this is handy.
def count_crashstats_by_day(format, debug, specify_date=None):
""" Get total crashstats data for a specified date.
Will use previous day if not specified. """
if specify_date:
start_date = specify_date
else:
start_date = (dt.datetime.now() -
dt.timedelta(days=1)).strftime(format)
stop_date = (dt.datetime.strptime(start_date, format) +
dt.timedelta(days=1)).strftime(format)
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json; charset=UTF-8'
}
# Try to get the token, but if it fails try without it
token = get_token_from_s3()
if token:
headers['Auth-Token'] = token
target = 'https://crash-stats.mozilla.com/api/SuperSearch/'
per_page_default = 500
param_data = {}
param_data['release_channel'] = "beta"
param_data['process_type'] = "content"
param_data['_results_number'] = str(per_page_default)
param_data['date'] = [">=" + start_date, "<" + stop_date]
param_data['_columns'] = ["date",
"uuid",
"signature",
"oom_allocation_size",
"platform",
"contains_memory_report",
"system_memory_use_percentage",
"total_virtual_memory",
"available_virtual_memory",
"total_page_file",
"available_page_file",
"total_physical_memory",
"available_physical_memory",
"largest_free_vm_block",
"tiny_block_size",
"write_combine_size",
"shutdown_progress",
"ipc_channel_error",
"user_comments"]
reqs = 0
pages = 1
offset = 0
all_results = []
# Access each page by offset and append results to the all_results list
offset = reqs * per_page_default
param_data['_results_offset'] = str(offset)
data = get_API_data(target, param_data, headers)
total_results = data["total"]
total_pages = (int(math.ceil(total_results/float(per_page_default))) * per_page_default) / per_page_default
return total_pages
Run:
In [ ]:
get_crashstats_by_day(date_format, DEBUG, target_date)
In [ ]: