In [ ]:
# The entry-point to this analysis is at the very bottom of this file.
# Look for the call to DoUpdate().

In [ ]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
import operator
import json, time, sys
import datetime
from __future__ import division
from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client
from moztelemetry.dataset import Dataset
def fmt_date(d):
    return d.strftime("%Y%m%d")
def jstime(d):
    return time.mktime(d.timetuple())
def repartition(pipeline):
    return pipeline.repartition(MaxPartitions).cache()

%pylab inline

MaxPartitions = sc.defaultParallelism * 4

# Keep this small (0.00001) for fast backfill testing.
WeeklyFraction = 0.003

# Amount of days Telemetry keeps.
MaxHistoryInDays = datetime.timedelta(210)

# Bucket we'll drop files into on S3. If this is None, we won't attempt any
# S3 uploads, and the analysis will start from scratch.
S3_BUCKET = 's3://telemetry-public-analysis-2/gfx-telemetry/data/'
GITHUB_REPO = 'https://raw.githubusercontent.com/jrmuizel/moz-gfx-telemetry'

# Going forward we only care about sessions from Firefox 53+, since it
# is the first release to not support Windows XP and Vista, which disorts
# our statistics.
MinFirefoxVersion = '53'

# List of jobs allowed to have a first-run (meaning no S3 content).
BrandNewJobs = []

# If true, backfill up to MaxHistoryInDays rather than the last update.
ForceMaxBackfill = False

# When executed as a cron job, the "output" folder already exists.
# For consistency we  create it if it doesn't exist.
!mkdir -p output

In [ ]:
# Use this block to temporarily change parameters above.
#ForceMaxBackfill = True
#WeeklyFraction = 0.00001
#S3_BUCKET = None
#BrandNewJobs = []

In [ ]:
ArchKey =               'environment/build/architecture'
FxVersionKey =          'environment/build/version'
Wow64Key =              'environment/system/isWow64'
CpuKey =                'environment/system/cpu'
GfxAdaptersKey =        'environment/system/gfx/adapters'
GfxFeaturesKey =        'environment/system/gfx/features'
OSNameKey =             'environment/system/os/name'
OSVersionKey =          'environment/system/os/version'
OSServicePackMajorKey = 'environment/system/os/servicePackMajor'

In [ ]:
FirstValidDate = datetime.datetime.utcnow() - MaxHistoryInDays

In [ ]:
# Log spam eats up disk space, so we disable it.
def quiet_logs(sc):
  logger = sc._jvm.org.apache.log4j
  logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
  logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
quiet_logs(sc)

In [ ]:
# This is the entry-point to grabbing reduced, preformatted pings.
def FetchAndFormat(start_date, end_date):
    pings = GetRawPings(start_date, end_date)
    pings = ReduceRawPings(pings)
    pings = get_one_ping_per_client(pings)
    pings = pings.map(Validate)
    pings = pings.filter(lambda p: p.get('valid', False) == True)
    return pings.cache()
    
def GetRawPings(start_date, end_date):
    if isinstance(start_date, datetime.datetime):
        start_date = fmt_date(start_date)
    if isinstance(end_date, datetime.datetime):
        end_date = fmt_date(end_date)
    
    def match_date(date):
        return start_date <= date <= end_date
            
    ds = Dataset.from_source('telemetry')
    ds = ds.where(docType = 'main')
    ds = ds.where(appName = 'Firefox')
    ds = ds.where(appVersion = lambda version: version >= MinFirefoxVersion)
    ds = ds.where(submissionDate = match_date)
    return ds.records(sc, sample = WeeklyFraction)

def ReduceRawPings(pings):
    return get_pings_properties(pings, [
        'clientId',
        'creationDate',
        ArchKey,
        Wow64Key,
        CpuKey,
        FxVersionKey,
        GfxAdaptersKey,
        GfxFeaturesKey,
        OSNameKey,
        OSVersionKey,
        OSServicePackMajorKey,
    ])

# Transform each ping to make it easier to work with in later stages.
def Validate(p):
    try:
        name = p.get(OSNameKey) or 'w'
        version = p.get(OSVersionKey) or '0'
        if name == 'Linux':
            p['OSVersion'] = None
            p['OS'] = 'Linux'
            p['OSName'] = 'Linux'
        elif name == 'Windows_NT':
            spmaj = p.get(OSServicePackMajorKey) or '0'
            p['OSVersion'] = version + '.' + str(spmaj)
            p['OS'] = 'Windows-' + version + '.' + str(spmaj)
            p['OSName'] = 'Windows'
        elif name == 'Darwin':
            p['OSVersion'] = version
            p['OS'] = 'Darwin-' + version
            p['OSName'] = 'Darwin'
        else:
            p['OSVersion'] = version
            p['OS'] = '{0}-{1}'.format(name, version)
            p['OSName'] = name
    except:
        return p
    
    p['valid'] = True
    return p

In [ ]:
# Profiler for debugging. Use in a |with| clause.
class Prof(object):
    level = 0
    
    def __init__(self, name):
        self.name = name
    def __enter__(self):
        self.sout('Starting {0}... '.format(self.name))
        self.start = datetime.datetime.now()
        Prof.level += 1
        return None
    def __exit__(self, type, value, traceback):
        Prof.level -= 1
        self.end = datetime.datetime.now()
        self.sout('... {0}: {1}s'.format(self.name, (self.end - self.start).total_seconds()))
    def sout(self, s):
        sys.stdout.write(('##' * Prof.level) + ' ')
        sys.stdout.write(s)
        sys.stdout.write('\n')
        sys.stdout.flush()

In [ ]:
# Helpers.
def fix_vendor(vendorID):
    if vendorID == u'Intel Open Source Technology Center':
        return u'0x8086'
    return vendorID

def get_vendor(ping):
    try:
        adapter = ping[GfxAdaptersKey][0]
        return fix_vendor(adapter['vendorID'])
    except:
        return 'unknown'

In [ ]:
# A TrendBase encapsulates the data needed to visualize a trend.
# It has four functions:
#    prepare    (download from cache)
#    willUpdate (check if update is needed)
#    update     (add analysis data for a week of pings)
#    finish     (upload back to cache)
class TrendBase(object):
    def __init__(self, name):
        super(TrendBase, self).__init__()
        self.name = '{0}-v2.json'.format(name)
    
    # Called before analysis starts.
    def prepare(self):
        print('Preparing {0}'.format(self.name))
        return True
    
    # Called before querying pings for the week for the given date. Return
    # false to indicate that this should no longer receive updates.
    def willUpdate(self, date):
        raise Exception('Return true or false')
   
    def update(self, pings, **kwargs):
        raise Exception('NYI')
        
    def finish(self):
        pass

In [ ]:
# Given a list of trend objects, query weeks from the last sunday
# and iterating backwards until no trend object requires an update.
def DoUpdate(trends):
    root = TrendGroup('root', trends)
    root.prepare()
        
    # Start each analysis slice on a Sunday.
    latest = MostRecentSunday()
    end = latest
    
    while True:
        start = end - datetime.timedelta(7)
        assert latest.weekday() == 6
        
        if not root.willUpdate(start):
            break
        
        try:
            with Prof('fetch {0}'.format(start)) as _:
                pings = FetchAndFormat(start, end)
        except:
            if not ForceMaxBackfill:
                raise
        
        with Prof('compute {0}'.format(start)) as _:
            if not root.update(pings, start_date = start, end_date = end):
                break
            
        end = start
        
    root.finish()
    
def MostRecentSunday():
    now = datetime.datetime.utcnow()
    this_morning = datetime.datetime(now.year, now.month, now.day)
    if this_morning.weekday() == 6:
        return this_morning
    diff = datetime.timedelta(0 - this_morning.weekday() - 1)
    return this_morning + diff

In [ ]:
# A TrendGroup is a collection of TrendBase objects. It lets us
# group similar trends together. For example, if five trends all
# need to filter Windows pings, we can filter for Windows pings
# once and cache the result, rather than redo the filter each
# time.
#
# Trend groups keep an "active" list of trends that will probably
# need another update. If any trend stops requesting data, it is
# removed from the active list.
class TrendGroup(TrendBase):
    def __init__(self, name, trends):
        super(TrendGroup, self).__init__(name)
        self.trends = trends
        self.active = []
    
    def prepare(self):
        self.trends = [trend for trend in self.trends if trend.prepare()]
        self.active = self.trends[:]
        return len(self.trends) > 0
            
    def willUpdate(self, date):
        self.active = [trend for trend in self.active if trend.willUpdate(date)]
        return len(self.active) > 0
    
    def update(self, pings, **kwargs):
        pings = pings.cache()
        self.active = [trend for trend in self.active if trend.update(pings, **kwargs)]
        return len(self.active) > 0
            
    def finish(self):
        for trend in self.trends:
            trend.finish()
            
# A Trend object takes a new set of pings for a week's worth of data,
# analyzes it, and adds the result to the trend set. Trend sets are
# cached in S3 as JSON.
#
# If the latest entry in the cache covers less than a full week of
# data, the entry is removed so that week can be re-queried.
class Trend(TrendBase):
    def __init__(self, filename):
        super(Trend, self).__init__(filename)
        self.s3_path = os.path.join(S3_BUCKET, self.name) if S3_BUCKET else None
        self.local_path = os.path.join('output', self.name)
        self.cache = None
        self.lastFullWeek = None
        self.newDataPoints = []
        
    def query(self, pings):
        raise Exception('NYI')
        
    def willUpdate(self, date):
        if date < FirstValidDate:
            return False
        if self.lastFullWeek is not None and date <= self.lastFullWeek:
            return False
        return True
    
    def prepare(self):
        self.cache = self.fetch_json()
        if self.cache is None:
            self.cache = {
                'created': jstime(datetime.datetime.utcnow()),
                'trend': [],
            }
        
        # Make sure trends are sorted in ascending order.
        self.cache['trend'] = self.cache['trend'] or []            
        self.cache['trend'] = sorted(self.cache['trend'], key = lambda o: o['start'])
        
        if len(self.cache['trend']) and not ForceMaxBackfill:
            lastDataPoint = self.cache['trend'][-1]
            lastDataPointStart = datetime.datetime.utcfromtimestamp(lastDataPoint['start'])
            lastDataPointEnd = datetime.datetime.utcfromtimestamp(lastDataPoint['end'])
            print(lastDataPoint, lastDataPointStart, lastDataPointEnd)
            if lastDataPointEnd - lastDataPointStart < datetime.timedelta(7):
                # The last data point had less than a full week, so we stop at the
                # previous week, and remove the incomplete datapoint.
                self.lastFullWeek = lastDataPointStart - datetime.timedelta(7)
                self.cache['trend'].pop()
            else:
                # The last data point covered a full week, so that's our stopping
                # point.
                self.lastFullWeek = lastDataPointStart
                print(self.lastFullWeek)
        
        return True
    
    # Optional hook - transform pings before querying.
    def transformPings(self, pings):
        return pings
    
    def update(self, pings, start_date, end_date, **kwargs):
        with Prof('count {0}'.format(self.name)):
            pings = self.transformPings(pings)
            count = pings.count()
        if count == 0:
            print('WARNING: no pings in RDD')
            return False
        
        with Prof('query {0} (count: {1})'.format(self.name, count)):
            data = self.query(pings)
        
        self.newDataPoints.append({
            'start': jstime(start_date),
            'end': jstime(end_date),
            'total': count,
            'data': data,
        })
        return True
            
    def finish(self):
        # If we're doing a maximum backfill, remove points from the cache that are
        # after the least recent data point that we newly queried.
        if ForceMaxBackfill and len(self.newDataPoints):
            stopAt = self.newDataPoints[-1]['start']
            lastIndex = None
            for index, entry in enumerate(self.cache['trend']):
                if entry['start'] >= stopAt:
                    lastIndex = index
                    break
            if lastIndex is not None:
                self.cache['trend'] = self.cache['trend'][:lastIndex]
        
        # Note: the backfill algorithm in DoUpdate() walks in reverse, so dates
        # will be accumulated in descending order. The final list should be in
        # ascending order, so we reverse.
        self.cache['trend'] += self.newDataPoints[::-1]
        
        text = json.dumps(self.cache)
        
        with open(self.local_path, 'w') as fp:
            fp.write(text)
        
        if self.s3_path:
            try:
                os.system("aws s3 cp {0} {1}".format(self.local_path, self.s3_path))
            except Exception as e:
                print("Failed s3 upload: {0}".format(e))
            
    def fetch_json(self):
        if self.s3_path:
            try:
                os.system("aws s3 cp {0} {1}".format(self.s3_path, self.local_path))
                with open(self.local_path, 'r') as fp:
                    return json.load(fp)
            except:
                if self.name not in BrandNewJobs:
                    raise
                return None
        else:
            try:
                with open(self.local_path, 'r') as fp:
                    return json.load(fp)
            except:
                pass
        return None

In [ ]:
class FirefoxTrend(Trend):
    def __init__(self):
        super(FirefoxTrend, self).__init__('trend-firefox')
        
    def query(self, pings, **kwargs):
        def get_version(p):
            v = p.get(FxVersionKey, None)
            if v is None or not isinstance(v, basestring):
                return 'unknown'
            return v.split('.')[0]
        return pings.map(lambda p: (get_version(p),)).countByKey()

In [ ]:
class WindowsGroup(TrendGroup):
    def __init__(self, trends):
        super(WindowsGroup, self).__init__('Windows', trends)
        
    def update(self, pings, **kwargs):
        pings = pings.filter(lambda p: p['OSName'] == 'Windows')
        return super(WindowsGroup, self).update(pings, **kwargs)

class WinverTrend(Trend):
    def __init__(self):
        super(WinverTrend, self).__init__('trend-windows-versions')
        
    def query(self, pings):
        return pings.map(lambda p: (p['OSVersion'],)).countByKey()
    
class WinCompositorTrend(Trend):
    def __init__(self):
        super(WinCompositorTrend, self).__init__('trend-windows-compositors')
        
    def willUpdate(self, date):
        # This metric didn't ship until Firefox 43.
        if date < datetime.datetime(2015, 11, 15):
            return False
        return super(WinCompositorTrend, self).willUpdate(date)
        
    def query(self, pings):
        return pings.map(lambda p: (self.get_compositor(p),)).countByKey()
    
    @staticmethod
    def get_compositor(p):
        features = p.get(GfxFeaturesKey, None)
        if features is None:
            return 'none'
        return features.get('compositor', 'none')
    
class WinArchTrend(Trend):
    def __init__(self):
        super(WinArchTrend, self).__init__('trend-windows-arch')
        
    def query(self, pings):
        return pings.map(lambda p: (self.get_os_bits(p),)).countByKey()
    
    @staticmethod
    def get_os_bits(p):
        arch = p.get(ArchKey, 'unknown')
        if arch == 'x86-64':
            return '64'
        elif arch == 'x86':
            if p.get(Wow64Key, False):
                return '32_on_64'
            return '32'
        return 'unknown'

# This group restricts pings to Windows Vista+, and must be inside a
# group that restricts pings to Windows.
class WindowsVistaPlusGroup(TrendGroup):
    def __init__(self, trends):
        super(WindowsVistaPlusGroup, self).__init__('Windows Vista+', trends)
        
    def update(self, pings, **kwargs):
        pings = pings.filter(lambda p: not p['OSVersion'].startswith('5.1'))
        return super(WindowsVistaPlusGroup, self).update(pings, **kwargs)

class Direct2DTrend(Trend):
    def __init__(self):
        super(Direct2DTrend, self).__init__('trend-windows-d2d')
    
    def query(self, pings):
        return pings.map(lambda p: (self.get_d2d(p),)).countByKey()
    
    def willUpdate(self, date):
        # This metric didn't ship until Firefox 43.
        if date < datetime.datetime(2015, 11, 15):
            return False
        return super(Direct2DTrend, self).willUpdate(date)
    
    @staticmethod
    def get_d2d(p):
        try:
            status = p[GfxFeaturesKey]['d2d']['status']
            if status != 'available':
                return status
            return p[GfxFeaturesKey]['d2d']['version']
        except:
            return 'unknown'
        
class Direct3D11Trend(Trend):
    def __init__(self):
        super(Direct3D11Trend, self).__init__('trend-windows-d3d11')
    
    def query(self, pings):
        return pings.map(lambda p: (self.get_d3d11(p),)).countByKey()
    
    def willUpdate(self, date):
        # This metric didn't ship until Firefox 43.
        if date < datetime.datetime(2015, 11, 15):
            return False
        return super(Direct3D11Trend, self).willUpdate(date)
    
    @staticmethod
    def get_d3d11(p):
        try:
            d3d11 = p[GfxFeaturesKey]['d3d11']
            if d3d11['status'] != 'available':
                return d3d11['status']
            if d3d11.get('warp', False):
                return 'warp'
            return d3d11['version']
        except:
            return 'unknown'
        
class WindowsVendorTrend(Trend):
    def __init__(self):
        super(WindowsVendorTrend, self).__init__('trend-windows-vendors')
        
    def query(self, pings):
        return pings.map(lambda p: (get_vendor(p),)).countByKey()

In [ ]:
# Device generation trend - a little more complicated, since we download
# the generation database to produce a mapping.
class DeviceGenTrend(Trend):
    deviceMap = None
    
    def __init__(self, vendor, vendorName):
        super(DeviceGenTrend, self).__init__('trend-windows-device-gen-{0}'.format(vendorName))
        self.vendorBlock = None
        self.vendorID = vendor
        
    def prepare(self):
        # Grab the vendor -> device -> gen map.
        if not DeviceGenTrend.deviceMap:
            import urllib2
            obj = urllib2.urlopen('{0}/master/www/gfxdevices.json'.format(GITHUB_REPO))
            text = obj.read()
            DeviceGenTrend.deviceMap = json.loads(text)
        self.vendorBlock = DeviceGenTrend.deviceMap[self.vendorID]
        return super(DeviceGenTrend, self).prepare()
    
    def transformPings(self, pings):
        return pings.filter(lambda p: get_vendor(p) == self.vendorID)
        
    def query(self, pings):
        return pings.map(lambda p: (self.get_gen(p),)).countByKey()
    
    def get_gen(self, p):
        adapter = p[GfxAdaptersKey][0]
        deviceID = adapter.get('deviceID', 'unknown')
        if deviceID not in self.vendorBlock:
            return 'unknown'
        return self.vendorBlock[deviceID][0]

In [ ]:
DoUpdate([
    FirefoxTrend(),
    WindowsGroup([
        WinverTrend(),
        WinCompositorTrend(),
        WinArchTrend(),
        WindowsVendorTrend(),
        WindowsVistaPlusGroup([
            Direct2DTrend(),
            Direct3D11Trend(),
        ]),
        DeviceGenTrend(u'0x8086', 'intel'),
        DeviceGenTrend(u'0x10de', 'nvidia'),
        DeviceGenTrend(u'0x1002', 'amd'),
    ])
])