Getting crash-stats for OOM data to S3 (daily)


In [2]:
import json
import urllib2
import urllib
import math
import datetime as dt
from pyspark.sql.types import *

In [2]:
date_format = "%Y-%m-%d"
target_date = None
DEBUG = False


def safe_str(obj):
    """ Return the byte string representation of obj """
    if obj is None:
        return unicode("")
    return unicode(obj)


def safe_long(obj):
    """ Return the long representation of obj, or None """
    if obj is None:
        return None
    return long(obj)


def safe_int(obj):
    """ Return the int representation of obj, or None """
    if obj is None:
        return None
    return int(obj)

Because the crash-stats SuperSearch API automatically paginates, we will need to make similar requests in sequence.


In [4]:
def get_API_data(url, params, hdrs):
    """ Returns full API response via JSON """
    querystring = urllib.urlencode(params, doseq=True)
    full_url = url + '?' + querystring
    req = urllib2.Request(url=full_url, headers=hdrs)
    response = urllib2.urlopen(req)
    return json.loads(response.read())

In [6]:
def write_to_s3(lists, start_date, debug):
    s3_output = "s3://net-mozaws-prod-us-west-2-pipeline-analysis/"
    s3_output += "ddurst/crash_stats_oom/v1/submission=" + start_date
    schema = StructType([
        StructField("uuid", StringType(), False),
        StructField("date", TimestampType(), False),
        StructField("signature", StringType(), False),
        StructField("platform", StringType(), True),
        StructField("contains_memory_report", BooleanType(), True),
        StructField("oom_allocation_size", LongType(), True),
        StructField("system_memory_use_percentage", IntegerType(), True),
        StructField("total_virtual_memory", LongType(), True),
        StructField("available_virtual_memory", LongType(), True),
        StructField("total_page_file", LongType(), True),
        StructField("available_page_file", LongType(), True),
        StructField("total_physical_memory", LongType(), True),
        StructField("available_physical_memory", LongType(), True),
        StructField("largest_free_vm_block", StringType(), True),
        StructField("largest_free_vm_block_int", LongType(), True),
        StructField("tiny_block_size", LongType(), True),
        StructField("write_combine_size", LongType(), True),
        StructField("shutdown_progress", StringType(), True),
        StructField("ipc_channel_error", StringType(), True),
        StructField("user_comments", StringType(), True),
    ])
    grouped = sqlContext.createDataFrame(lists, schema)
    if debug:
        grouped.printSchema()
    else:
        grouped.coalesce(1).write.parquet(s3_output)

This is the main function to setup request data and loop through the required number of pages of results for the date specified (note: it's assumed this would be run once per previous day via cron or the like).


In [1]:
def get_crashstats_by_day(format, debug, specify_date=None):
    """ Get total crashstats data for a specified date.
    Will use previous day if not specified. """
    if specify_date:
        start_date = specify_date
    else:
        start_date = (dt.datetime.now() -
                      dt.timedelta(days=1)).strftime(format)
    stop_date = (dt.datetime.strptime(start_date, format) +
                 dt.timedelta(days=1)).strftime(format)

    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json; charset=UTF-8'
    }
    target = 'https://crash-stats.mozilla.com/api/SuperSearch/'
    per_page_default = 100

    param_data = {}
    param_data['release_channel'] = "beta"
    param_data['process_type'] = "content"
    param_data['_facets'] = "signature"
    param_data['_results_number'] = str(per_page_default)
    param_data['date'] = [">=" + start_date, "<" + stop_date]
    param_data['_columns'] = ["date",
                              "uuid",
                              "signature",
                              "oom_allocation_size",
                              "platform",
                              "contains_memory_report",
                              "system_memory_use_percentage",
                              "total_virtual_memory",
                              "available_virtual_memory",
                              "total_page_file",
                              "available_page_file",
                              "total_physical_memory",
                              "available_physical_memory",
                              "largest_free_vm_block",
                              "tiny_block_size",
                              "write_combine_size",
                              "shutdown_progress",
                              "ipc_channel_error",
                              "user_comments"]

    reqs = 0
    pages = 1
    offset = 0
    all_results = []

    # Access each page by offset and append results to the all_results list
    while (reqs < pages):
        offset = reqs * per_page_default
        param_data['_results_offset'] = str(offset)
        data = get_API_data(target, param_data, headers)
        # Determine the number of pages (only on first page request)
        if (reqs == 0):
            total_results = data["total"]
            total_pages = (int(math.ceil(total_results/100.0)) * 100) / 100
            if (total_pages > pages):
                pages = total_pages
        # Grab the 'hits' into lists
        for obj in data["hits"]:
            tmp = []
            tmp.append(safe_str(obj['uuid']))
            # Convert date
            tmp.append(dt.datetime.strptime(obj['date'], "%Y-%m-%dT%H:%M:%S.%f+00:00"))
            tmp.append(safe_str(obj['signature']))
            tmp.append(safe_str(obj['platform']))
            tmp.append(bool(obj['contains_memory_report']))
            tmp.append(safe_long(obj['oom_allocation_size']))
            tmp.append(safe_int(obj['system_memory_use_percentage']))
            tmp.append(safe_long(obj['total_virtual_memory']))
            tmp.append(safe_long(obj['available_virtual_memory']))
            tmp.append(safe_long(obj['total_page_file']))
            tmp.append(safe_long(obj['available_page_file']))
            tmp.append(safe_long(obj['total_physical_memory']))
            tmp.append(safe_long(obj['available_physical_memory']))
            tmp.append(safe_str(obj['largest_free_vm_block']))
            # Add field for non-hex largest_free_vm_block value
            if obj['largest_free_vm_block'] is not None:
                tmp.append(int(obj['largest_free_vm_block'], 0))
            else:
                tmp.append(None)
            tmp.append(safe_long(obj['tiny_block_size']))
            tmp.append(safe_long(obj['write_combine_size']))
            tmp.append(safe_str(obj['shutdown_progress']))
            tmp.append(safe_str(obj['ipc_channel_error']))
            # Handle possible newlines in user_comments
            if obj['user_comments'] is not None:
                tmp.append(obj['user_comments'].replace("\r\n", "|").replace("\r", "|").replace("\n", "|"))
            else:
                tmp.append(None)
            all_results.append(tmp)
        reqs += 1

    write_to_s3(all_results, start_date, debug)

Run:


In [12]:
get_crashstats_by_day(date_format, DEBUG, target_date)


root
 |-- uuid: string (nullable = false)
 |-- date: timestamp (nullable = false)
 |-- signature: string (nullable = false)
 |-- platform: string (nullable = true)
 |-- contains_memory_report: boolean (nullable = true)
 |-- oom_allocation_size: long (nullable = true)
 |-- system_memory_use_percentage: integer (nullable = true)
 |-- total_virtual_memory: long (nullable = true)
 |-- available_virtual_memory: long (nullable = true)
 |-- total_page_file: long (nullable = true)
 |-- available_page_file: long (nullable = true)
 |-- total_physical_memory: long (nullable = true)
 |-- available_physical_memory: long (nullable = true)
 |-- largest_free_vm_block: string (nullable = true)
 |-- largest_free_vm_block_int: long (nullable = true)
 |-- tiny_block_size: long (nullable = true)
 |-- write_combine_size: long (nullable = true)
 |-- shutdown_progress: string (nullable = true)
 |-- ipc_channel_error: string (nullable = true)
 |-- user_comments: string (nullable = true)