Import Socorro crash data into the Data Platform

We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash.

See Bug 1273657 for more details


In [ ]:
!conda install boto3 --yes

In [ ]:
import logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

We create the pyspark datatype for representing the crash data in spark. This is a slightly modified version of peterbe/crash-report-struct-code.


In [ ]:
from pyspark.sql.types import *

def create_struct(schema):
    """ Take a JSON schema and return a pyspark StructType of equivalent structure. """
    
    replace_definitions(schema, schema['definitions'])
    assert '$ref' not in str(schema), 're-write didnt work'
    
    struct = StructType()
    for row in get_rows(schema):
        struct.add(row)

    return struct

def replace_definitions(schema, definitions):
    """ Replace references in the JSON schema with their definitions."""

    if 'properties' in schema:
        for prop, meta in schema['properties'].items():
            replace_definitions(meta, definitions)
    elif 'items' in schema:
        if '$ref' in schema['items']:
            ref = schema['items']['$ref'].split('/')[-1]
            schema['items'] = definitions[ref]
            replace_definitions(schema['items'], definitions)
        else:
            replace_definitions(schema['items'], definitions)
    elif '$ref' in str(schema):
        err_msg = "Reference not found for schema: {}".format(str(schema))
        log.error(err_msg)
        raise ValueError(err_msg)

def get_rows(schema):
    """ Map the fields in a JSON schema to corresponding data structures in pyspark."""
    
    if 'properties' not in schema:
        err_msg = "Invalid JSON schema: properties field is missing."
        log.error(err_msg)
        raise ValueError(err_msg)
        
    for prop in sorted(schema['properties']):
        meta = schema['properties'][prop]
        if 'string' in meta['type']:
            logging.debug("{!r} allows the type to be String AND Integer".format(prop))
            yield StructField(prop, StringType(), 'null' in meta['type'])
        elif 'integer' in meta['type']:
            yield StructField(prop, IntegerType(), 'null' in meta['type'])
        elif 'boolean' in meta['type']:
            yield StructField(prop, BooleanType(), 'null' in meta['type'])
        elif meta['type'] == 'array' and 'items' not in meta:
            # Assuming strings in the array
            yield StructField(prop, ArrayType(StringType(), False), True)
        elif meta['type'] == 'array' and 'items' in meta:
            struct = StructType()
            for row in get_rows(meta['items']):
                struct.add(row)
            yield StructField(prop, ArrayType(struct), True)
        elif meta['type'] == 'object':
            struct = StructType()
            for row in get_rows(meta):
                struct.add(row)
            yield StructField(prop, struct, True)
        else:
            err_msg = "Invalid JSON schema: {}".format(str(meta)[:100])
            log.error(err_msg)
            raise ValueError(err_msg)

First fetch from the primary source in s3 as per bug 1312006. We fall back to the github location if this is not available.


In [ ]:
import boto3
import botocore
import json
import tempfile
import urllib2

def fetch_schema():
    """ Fetch the crash data schema from an s3 location or github location. This
    returns the corresponding JSON schema in a python dictionary. """
    
    region = "us-west-2"
    bucket = "org-mozilla-telemetry-crashes"
    key = "crash_report.json"
    fallback_url = "https://raw.githubusercontent.com/mozilla/socorro/master/socorro/schemas/crash_report.json"

    try:
        log.info("Fetching latest crash data schema from s3://{}/{}".format(bucket, key))
        s3 = boto3.client('s3', region_name=region)
        # download schema to memory via a file like object
        resp = tempfile.TemporaryFile()
        s3.download_fileobj(bucket, key, resp)
        resp.seek(0)
    except botocore.exceptions.ClientError as e:
        log.warning(("Could not fetch schema from s3://{}/{}: {}\n"
                     "Fetching crash data schema from {}")
                    .format(bucket, key, e, fallback_url))
        resp = urllib2.urlopen(fallback_url)

    return json.load(resp)

Read crash data as json, convert it to parquet


In [ ]:
from datetime import datetime as dt, timedelta, date
from pyspark.sql import SQLContext


def daterange(start_date, end_date):
    for n in range(int((end_date - start_date).days) + 1):
        yield (end_date - timedelta(n)).strftime("%Y%m%d")

def import_day(d, schema, version):
    """Convert JSON data stored in an S3 bucket into parquet, indexed by crash_date."""
    source_s3path = "s3://org-mozilla-telemetry-crashes/v1/crash_report"
    dest_s3path = "s3://telemetry-parquet/socorro_crash/"
    num_partitions = 10
    
    log.info("Processing {}, started at {}".format(d, dt.utcnow()))
    cur_source_s3path = "{}/{}".format(source_s3path, d)
    cur_dest_s3path = "{}/v{}/crash_date={}".format(dest_s3path, version, d)
    
    df = sqlContext.read.json(cur_source_s3path, schema=schema)
    df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode="overwrite")

def backfill(start_date_yyyymmdd, schema, version):
    """ Import data from a start date to yesterday's date.
    Example:
        backfill("20160902", crash_schema, version)
    """
    start_date = dt.strptime(start_date_yyyymmdd, "%Y%m%d")
    end_date = dt.utcnow() - timedelta(1) # yesterday
    for d in daterange(start_date, end_date):
        try:
            import_day(d)
        except Exception as e:
            log.error(e)

In [ ]:
from os import environ

# get the relevant date
yesterday = dt.strftime(dt.utcnow() - timedelta(1), "%Y%m%d")
target_date = environ.get('date', yesterday)

# fetch and generate the schema
schema_data = fetch_schema()
crash_schema = create_struct(schema_data)
version = schema_data.get('$target_version', 0)  # default to v0

# process the data
import_day(target_date, crash_schema, version)