We want to be able to store Socorro crash data in Parquet form so that it can be made accessible from re:dash.
See Bug 1273657 for more details
In [ ]:
!conda install boto3 --yes
In [ ]:
import logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
We create the pyspark datatype for representing the crash data in spark. This is a slightly modified version of peterbe/crash-report-struct-code.
In [ ]:
from pyspark.sql.types import *
def create_struct(schema):
""" Take a JSON schema and return a pyspark StructType of equivalent structure. """
replace_definitions(schema, schema['definitions'])
assert '$ref' not in str(schema), 're-write didnt work'
struct = StructType()
for row in get_rows(schema):
struct.add(row)
return struct
def replace_definitions(schema, definitions):
""" Replace references in the JSON schema with their definitions."""
if 'properties' in schema:
for prop, meta in schema['properties'].items():
replace_definitions(meta, definitions)
elif 'items' in schema:
if '$ref' in schema['items']:
ref = schema['items']['$ref'].split('/')[-1]
schema['items'] = definitions[ref]
replace_definitions(schema['items'], definitions)
else:
replace_definitions(schema['items'], definitions)
elif '$ref' in str(schema):
err_msg = "Reference not found for schema: {}".format(str(schema))
log.error(err_msg)
raise ValueError(err_msg)
def get_rows(schema):
""" Map the fields in a JSON schema to corresponding data structures in pyspark."""
if 'properties' not in schema:
err_msg = "Invalid JSON schema: properties field is missing."
log.error(err_msg)
raise ValueError(err_msg)
for prop in sorted(schema['properties']):
meta = schema['properties'][prop]
if 'string' in meta['type']:
logging.debug("{!r} allows the type to be String AND Integer".format(prop))
yield StructField(prop, StringType(), 'null' in meta['type'])
elif 'integer' in meta['type']:
yield StructField(prop, IntegerType(), 'null' in meta['type'])
elif 'boolean' in meta['type']:
yield StructField(prop, BooleanType(), 'null' in meta['type'])
elif meta['type'] == 'array' and 'items' not in meta:
# Assuming strings in the array
yield StructField(prop, ArrayType(StringType(), False), True)
elif meta['type'] == 'array' and 'items' in meta:
struct = StructType()
for row in get_rows(meta['items']):
struct.add(row)
yield StructField(prop, ArrayType(struct), True)
elif meta['type'] == 'object':
struct = StructType()
for row in get_rows(meta):
struct.add(row)
yield StructField(prop, struct, True)
else:
err_msg = "Invalid JSON schema: {}".format(str(meta)[:100])
log.error(err_msg)
raise ValueError(err_msg)
First fetch from the primary source in s3 as per bug 1312006. We fall back to the github location if this is not available.
In [ ]:
import boto3
import botocore
import json
import tempfile
import urllib2
def fetch_schema():
""" Fetch the crash data schema from an s3 location or github location. This
returns the corresponding JSON schema in a python dictionary. """
region = "us-west-2"
bucket = "org-mozilla-telemetry-crashes"
key = "crash_report.json"
fallback_url = "https://raw.githubusercontent.com/mozilla/socorro/master/socorro/schemas/crash_report.json"
try:
log.info("Fetching latest crash data schema from s3://{}/{}".format(bucket, key))
s3 = boto3.client('s3', region_name=region)
# download schema to memory via a file like object
resp = tempfile.TemporaryFile()
s3.download_fileobj(bucket, key, resp)
resp.seek(0)
except botocore.exceptions.ClientError as e:
log.warning(("Could not fetch schema from s3://{}/{}: {}\n"
"Fetching crash data schema from {}")
.format(bucket, key, e, fallback_url))
resp = urllib2.urlopen(fallback_url)
return json.load(resp)
In [ ]:
from datetime import datetime as dt, timedelta, date
from pyspark.sql import SQLContext
def daterange(start_date, end_date):
for n in range(int((end_date - start_date).days) + 1):
yield (end_date - timedelta(n)).strftime("%Y%m%d")
def import_day(d, schema, version):
"""Convert JSON data stored in an S3 bucket into parquet, indexed by crash_date."""
source_s3path = "s3://org-mozilla-telemetry-crashes/v1/crash_report"
dest_s3path = "s3://telemetry-parquet/socorro_crash/"
num_partitions = 10
log.info("Processing {}, started at {}".format(d, dt.utcnow()))
cur_source_s3path = "{}/{}".format(source_s3path, d)
cur_dest_s3path = "{}/v{}/crash_date={}".format(dest_s3path, version, d)
df = sqlContext.read.json(cur_source_s3path, schema=schema)
df.repartition(num_partitions).write.parquet(cur_dest_s3path, mode="overwrite")
def backfill(start_date_yyyymmdd, schema, version):
""" Import data from a start date to yesterday's date.
Example:
backfill("20160902", crash_schema, version)
"""
start_date = dt.strptime(start_date_yyyymmdd, "%Y%m%d")
end_date = dt.utcnow() - timedelta(1) # yesterday
for d in daterange(start_date, end_date):
try:
import_day(d)
except Exception as e:
log.error(e)
In [ ]:
from os import environ
# get the relevant date
yesterday = dt.strftime(dt.utcnow() - timedelta(1), "%Y%m%d")
target_date = environ.get('date', yesterday)
# fetch and generate the schema
schema_data = fetch_schema()
crash_schema = create_struct(schema_data)
version = schema_data.get('$target_version', 0) # default to v0
# process the data
import_day(target_date, crash_schema, version)