Click-Through Rate Prediction Lab

This lab covers the steps for creating a click-through rate (CTR) prediction pipeline. You will work with the Criteo Labs dataset that was used for a recent Kaggle competition.

This lab will cover:

  • Part 1: Featurize categorical data using one-hot-encoding (OHE)

  • Part 2: Construct an OHE dictionary

  • Part 3: Parse CTR data and generate OHE features

    • Visualization 1: Feature frequency
  • Part 4: CTR prediction and logloss evaluation

    • Visualization 2: ROC curve
  • Part 5: Reduce feature dimension via feature hashing

    • Visualization 3: Hyperparameter heat map

Note that, for reference, you can look up the details of:

In [1]:
labVersion = 'cs190.1x-lab4-1.0.4'
print labVersion


Part 1: Featurize categorical data using one-hot-encoding

(1a) One-hot-encoding We would like to develop code to convert categorical features to numerical ones, and to build intuition, we will work with a sample unlabeled dataset with three data points, with each data point representing an animal. The first feature indicates the type of animal (bear, cat, mouse); the second feature describes the animal's color (black, tabby); and the third (optional) feature describes what the animal eats (mouse, salmon).

In a one-hot-encoding (OHE) scheme, we want to represent each tuple of (featureID, category) via its own binary feature. We can do this in Python by creating a dictionary that maps each tuple to a distinct integer, where the integer corresponds to a binary feature. To start, manually enter the entries in the OHE dictionary associated with the sample dataset by mapping the tuples to consecutive integers starting from zero, ordering the tuples first by featureID and next by category.

Later in this lab, we'll use OHE dictionaries to transform data points into compact lists of features that can be used in machine learning algorithms.

In [2]:
# Data for manual OHE
# Note: the first data point does not include any value for the optional third feature
sampleOne = [(0, 'mouse'), (1, 'black')]
sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
sampleThree =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]
sampleDataRDD = sc.parallelize([sampleOne, sampleTwo, sampleThree])
print sampleDataRDD.count()
# print sampleDataRDD.take(5)


In [3]:
# TODO: Replace <FILL IN> with appropriate code
sampleOHEDictManual = {}
sampleOHEDictManual[(0,'bear')] = 0
sampleOHEDictManual[(0,'cat')] = 1
sampleOHEDictManual[(0,'mouse')] = 2
sampleOHEDictManual[(1, 'black')] = 3
sampleOHEDictManual[(1, 'tabby')] = 4
sampleOHEDictManual[(2, 'mouse')] = 5
sampleOHEDictManual[(2, 'salmon')] = 6
print len(sampleOHEDictManual)


WARNING: If test_helper, required in the cell below, is not installed, follow the instructions here.

In [4]:
# TEST One-hot-encoding (1a)
from test_helper import Test

                        "incorrect value for sampleOHEDictManual[(0,'bear')]")
                        "incorrect value for sampleOHEDictManual[(0,'cat')]")
                        "incorrect value for sampleOHEDictManual[(0,'mouse')]")
                        "incorrect value for sampleOHEDictManual[(1,'black')]")
                        "incorrect value for sampleOHEDictManual[(1,'tabby')]")
                        "incorrect value for sampleOHEDictManual[(2,'mouse')]")
                        "incorrect value for sampleOHEDictManual[(2,'salmon')]")
Test.assertEquals(len(sampleOHEDictManual.keys()), 7,
                  'incorrect number of keys in sampleOHEDictManual')

1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.

(1b) Sparse vectors

Data points can typically be represented with a small number of non-zero OHE features relative to the total number of features that occur in the dataset. By leveraging this sparsity and using sparse vector representations of OHE data, we can reduce storage and computational burdens. Below are a few sample vectors represented as dense numpy arrays. Use SparseVector to represent them in a sparse fashion, and verify that both the sparse and dense representations yield the same results when computing dot products (we will later use MLlib to train classifiers via gradient descent, and MLlib will need to compute dot products between SparseVectors and dense parameter vectors).

Use SparseVector(size, *args) to create a new sparse vector where size is the length of the vector and args is either a dictionary, a list of (index, value) pairs, or two separate arrays of indices and values (sorted by index). You'll need to create a sparse vector representation of each dense vector aDense and bDense.

In [5]:
import numpy as np
from pyspark.mllib.linalg import SparseVector

In [6]:
# TODO: Replace <FILL IN> with appropriate code
aDense = np.array([0., 3., 0., 4.])
aSparse = SparseVector(len(aDense), range(0,len(aDense)), aDense)

bDense = np.array([0., 0., 0., 1.])
bSparse = SparseVector(len(bDense), range(0,len(bDense)), bDense)

w = np.array([0.4, 3.1, -1.4, -.5])
print aDense
print bDense
print aSparse
print bSparse

[ 0.  3.  0.  4.]
[ 0.  0.  0.  1.]

In [7]:
# TEST Sparse Vectors (1b)
Test.assertTrue(isinstance(aSparse, SparseVector), 'aSparse needs to be an instance of SparseVector')
Test.assertTrue(isinstance(bSparse, SparseVector), 'aSparse needs to be an instance of SparseVector')
Test.assertTrue( ==,
                'dot product of aDense and w should equal dot product of aSparse and w')
Test.assertTrue( ==,
                'dot product of bDense and w should equal dot product of bSparse and w')

1 test passed.
1 test passed.
1 test passed.
1 test passed.

(1c) OHE features as sparse vectors

Now let's see how we can represent the OHE features for points in our sample dataset. Using the mapping defined by the OHE dictionary from Part (1a), manually define OHE features for the three sample data points using SparseVector format. Any feature that occurs in a point should have the value 1.0. For example, the DenseVector for a point with features 2 and 4 would be [0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0].

In [8]:
# Reminder of the sample features
# sampleOne = [(0, 'mouse'), (1, 'black')]
# sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sampleThree =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

In [9]:
# TODO: Replace <FILL IN> with appropriate code
sampleOneOHEFeatManual = SparseVector(7, [2,3], np.array([1.0,1.0]))
sampleTwoOHEFeatManual = SparseVector(7, [1,4,5], np.array([1.0,1.0,1.0]))
sampleThreeOHEFeatManual = SparseVector(7, [0,3,6], np.array([1.0,1.0,1.0]))
print sampleOneOHEFeatManual
print sampleTwoOHEFeatManual
print sampleThreeOHEFeatManual


In [10]:
# TEST OHE Features as sparse vectors (1c)
Test.assertTrue(isinstance(sampleOneOHEFeatManual, SparseVector),
                'sampleOneOHEFeatManual needs to be a SparseVector')
Test.assertTrue(isinstance(sampleTwoOHEFeatManual, SparseVector),
                'sampleTwoOHEFeatManual needs to be a SparseVector')
Test.assertTrue(isinstance(sampleThreeOHEFeatManual, SparseVector),
                'sampleThreeOHEFeatManual needs to be a SparseVector')
                        'incorrect value for sampleOneOHEFeatManual')
                        'incorrect value for sampleTwoOHEFeatManual')
                        'incorrect value for sampleThreeOHEFeatManual')

1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.
1 test passed.

(1d) Define a OHE function

Next we will use the OHE dictionary from Part (1a) to programatically generate OHE features from the original categorical data. First write a function called oneHotEncoding that creates OHE feature vectors in SparseVector format. Then use this function to create OHE features for the first sample data point and verify that the result matches the result from Part (1c).

In [11]:
# TODO: Replace <FILL IN> with appropriate code
def oneHotEncoding_old(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

        You should ensure that the indices used to create a SparseVector are sorted.

        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and

        SparseVector: A SparseVector of length numOHEFeats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    newFeats = []
    idx = []
    for k,i in sorted(OHEDict.items(), key=lambda x: x[1]):
        if k in rawFeats:
            newFeats += [1.0]
            idx += [i]
    return SparseVector(numOHEFeats, idx, np.array(newFeats))

In [12]:
# TODO: Replace <FILL IN> with appropriate code
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

        You should ensure that the indices used to create a SparseVector are sorted.

        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and

        SparseVector: A SparseVector of length numOHEFeats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    newFeats = []
    idx = []
    for f in rawFeats:
        if f in OHEDict:
            newFeats += [1.0]
            idx += [OHEDict[f]]
    return SparseVector(numOHEFeats, sorted(idx), np.array(newFeats))

# Calculate the number of features in sampleOHEDictManual
numSampleOHEFeats = len(sampleOHEDictManual)

# Run oneHotEnoding on sampleOne
sampleOneOHEFeat = oneHotEncoding(sampleOne,sampleOHEDictManual,numSampleOHEFeats)

print sampleOneOHEFeat


In [13]:
# TEST Define an OHE Function (1d)
Test.assertTrue(sampleOneOHEFeat == sampleOneOHEFeatManual,
                'sampleOneOHEFeat should equal sampleOneOHEFeatManual')
Test.assertEquals(sampleOneOHEFeat, SparseVector(7, [2,3], [1.0,1.0]),
                  'incorrect value for sampleOneOHEFeat')
Test.assertEquals(oneHotEncoding([(1, 'black'), (0, 'mouse')], sampleOHEDictManual,
                                 numSampleOHEFeats), SparseVector(7, [2,3], [1.0,1.0]),
                  'incorrect definition for oneHotEncoding')

1 test passed.
1 test passed.
1 test passed.

(1e) Apply OHE to a dataset

Finally, use the function from Part (1d) to create OHE features for all 3 data points in the sample dataset.

In [14]:
# TODO: Replace <FILL IN> with appropriate code
def toOHE(row):
    return oneHotEncoding(row,sampleOHEDictManual,numSampleOHEFeats)

sampleOHEData =
print sampleOHEData.collect()

[SparseVector(7, {2: 1.0, 3: 1.0}), SparseVector(7, {1: 1.0, 4: 1.0, 5: 1.0}), SparseVector(7, {0: 1.0, 3: 1.0, 6: 1.0})]

In [15]:
# TEST Apply OHE to a dataset (1e)
sampleOHEDataValues = sampleOHEData.collect()
Test.assertTrue(len(sampleOHEDataValues) == 3, 'sampleOHEData should have three elements')
Test.assertEquals(sampleOHEDataValues[0], SparseVector(7, {2: 1.0, 3: 1.0}),
                  'incorrect OHE for first sample')
Test.assertEquals(sampleOHEDataValues[1], SparseVector(7, {1: 1.0, 4: 1.0, 5: 1.0}),
                  'incorrect OHE for second sample')
Test.assertEquals(sampleOHEDataValues[2], SparseVector(7, {0: 1.0, 3: 1.0, 6: 1.0}),
                  'incorrect OHE for third sample')

1 test passed.
1 test passed.
1 test passed.
1 test passed.

Part 2: Construct an OHE dictionary

(2a) Pair RDD of (featureID, category)

To start, create an RDD of distinct (featureID, category) tuples. In our sample dataset, the 7 items in the resulting RDD are (0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'), (1, 'tabby'), (2, 'mouse'), (2, 'salmon'). Notably 'black' appears twice in the dataset but only contributes one item to the RDD: (1, 'black'), while 'mouse' also appears twice and contributes two items: (0, 'mouse') and (2, 'mouse'). Use flatMap and distinct.

In [16]:
flat = sampleDataRDD.flatMap(lambda r: r).distinct()
print flat.count()
for i in flat.take(8):
    print i

(0, 'bear')
(2, 'mouse')
(2, 'salmon')
(1, 'tabby')
(0, 'mouse')
(0, 'cat')
(1, 'black')

In [17]:
# TODO: Replace <FILL IN> with appropriate code
sampleDistinctFeats = (sampleDataRDD.flatMap(lambda r: r).distinct())

In [18]:
# TEST Pair RDD of (featureID, category) (2a)
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'incorrect value for sampleDistinctFeats')

1 test passed.

(2b) OHE Dictionary from distinct features

Next, create an RDD of key-value tuples, where each (featureID, category) tuple in sampleDistinctFeats is a key and the values are distinct integers ranging from 0 to (number of keys - 1). Then convert this RDD into a dictionary, which can be done using the collectAsMap action. Note that there is no unique mapping from keys to values, as all we require is that each (featureID, category) key be mapped to a unique integer between 0 and the number of keys. In this exercise, any valid mapping is acceptable. Use zipWithIndex followed by collectAsMap.

In our sample dataset, one valid list of key-value tuples is: [((0, 'bear'), 0), ((2, 'salmon'), 1), ((1, 'tabby'), 2), ((2, 'mouse'), 3), ((0, 'mouse'), 4), ((0, 'cat'), 5), ((1, 'black'), 6)]. The dictionary defined in Part (1a) illustrates another valid mapping between keys and integers.

In [19]:
# TODO: Replace <FILL IN> with appropriate code
sampleOHEDict = sampleDistinctFeats.zipWithIndex().collectAsMap()
print sampleOHEDict

{(2, 'mouse'): 1, (0, 'cat'): 5, (0, 'bear'): 0, (2, 'salmon'): 2, (1, 'tabby'): 3, (1, 'black'): 6, (0, 'mouse'): 4}

In [20]:
# TEST OHE Dictionary from distinct features (2b)
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'sampleOHEDict has unexpected keys')
Test.assertEquals(sorted(sampleOHEDict.values()), range(7), 'sampleOHEDict has unexpected values')

1 test passed.
1 test passed.

(2c) Automated creation of an OHE dictionary

Now use the code from Parts (2a) and (2b) to write a function that takes an input dataset and outputs an OHE dictionary. Then use this function to create an OHE dictionary for the sample dataset, and verify that it matches the dictionary from Part (2b).

In [21]:
# TODO: Replace <FILL IN> with appropriate code
def createOneHotDict(inputData):
    """Creates a one-hot-encoder dictionary based on the input data.

        inputData (RDD of lists of (int, str)): An RDD of observations where each observation is
            made up of a list of (featureID, value) tuples.

        dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
            unique integers.
    flat = inputData.flatMap(lambda r: r).distinct()
    return flat.zipWithIndex().collectAsMap()

sampleOHEDictAuto = createOneHotDict(sampleDataRDD)
print sampleOHEDictAuto

{(2, 'mouse'): 1, (0, 'cat'): 5, (0, 'bear'): 0, (2, 'salmon'): 2, (1, 'tabby'): 3, (1, 'black'): 6, (0, 'mouse'): 4}

In [22]:
# TEST Automated creation of an OHE dictionary (2c)
                  [(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
                   (1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
                  'sampleOHEDictAuto has unexpected keys')
Test.assertEquals(sorted(sampleOHEDictAuto.values()), range(7),
                  'sampleOHEDictAuto has unexpected values')

1 test passed.
1 test passed.

Part 3: Parse CTR data and generate OHE features

Before we can proceed, you'll first need to obtain the data from Criteo. Here is the link to Criteo's data sharing agreement: After you accept the agreement, you can obtain the download URL by right-clicking on the "Download Sample" button and clicking "Copy link address" or "Copy Link Location", depending on your browser. Paste the URL into the # TODO cell below. The script below will download the file and make the sample dataset's contents available in the rawData variable.

Note that the download should complete within 30 seconds.

In [23]:
import os.path
baseDir = os.path.join('/Users/bill.walrond/Documents/dsprj/data')
inputPath = os.path.join('CS190_Mod4', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)

if os.path.isfile(fileName):
    rawData = (sc
               .textFile(fileName, 2)
               .map(lambda x: x.replace('\t', ',')))  # work with either ',' or '\t' separated data
    print rawData.take(1)
    print rawData.count()
    print 'Couldn\'t find filename:  %s' % fileName


In [24]:
# TODO: Replace <FILL IN> with appropriate code
import glob
from io import BytesIO
import os.path
import tarfile
import urllib
import urlparse

# Paste in url, url should end with: dac_sample.tar.gz
url = '<FILL IN>'

url = url.strip()

if 'rawData' in locals():
    print 'rawData already loaded.  Nothing to do.'
elif not url.endswith('dac_sample.tar.gz'):
    print 'Check your download url.  Are you downloading the Sample dataset?'
        tmp = BytesIO()
        urlHandle = urllib.urlopen(url)
        tarFile =

        dacSample = tarFile.extractfile('dac_sample.txt')
        dacSample = [unicode(x.replace('\n', '').replace('\t', ',')) for x in dacSample]
        rawData  = (sc
                    .parallelize(dacSample, 1)  # Create an RDD
                    .zipWithIndex()  # Enumerate lines
                    .map(lambda (v, i): (i, v))  # Use line index as key
                    .partitionBy(2, lambda i: not (i < 50026))  # Match sc.textFile partitioning
                    .map(lambda (i, v): v))  # Remove index
        print 'rawData loaded from url'
        print rawData.take(1)
    except IOError:
        print 'Unable to unpack: {0}'.format(url)

rawData already loaded.  Nothing to do.

(3a) Loading and splitting the data

We are now ready to start working with the actual CTR data, and our first task involves splitting it into training, validation, and test sets. Use the randomSplit method with the specified weights and seed to create RDDs storing each of these datasets, and then cache each of these RDDs, as we will be accessing them multiple times in the remainder of this lab. Finally, compute the size of each dataset.

In [25]:
# TODO: Replace <FILL IN> with appropriate code
weights = [.8, .1, .1]
seed = 42
# Use randomSplit with weights and seed
rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights, seed)
# Cache the data

nTrain = rawTrainData.count()
nVal = rawValidationData.count()
nTest = rawTestData.count()
print nTrain, nVal, nTest, nTrain + nVal + nTest
print rawTrainData.take(1)

79911 10075 10014 100000

In [26]:
# TEST Loading and splitting the data (3a)
Test.assertTrue(all([rawTrainData.is_cached, rawValidationData.is_cached, rawTestData.is_cached]),
                'you must cache the split data')
Test.assertEquals(nTrain, 79911, 'incorrect value for nTrain')
Test.assertEquals(nVal, 10075, 'incorrect value for nVal')
Test.assertEquals(nTest, 10014, 'incorrect value for nTest')

1 test passed.
1 test passed.
1 test passed.
1 test passed.

(3b) Extract features

We will now parse the raw training data to create an RDD that we can subsequently use to create an OHE dictionary. Note from the take() command in Part (3a) that each raw data point is a string containing several fields separated by some delimiter. For now, we will ignore the first field (which is the 0-1 label), and parse the remaining fields (or raw features). To do this, complete the implemention of the parsePoint function.

In [27]:
# TODO: Replace <FILL IN> with appropriate code
def parsePoint(point):
    """Converts a comma separated string into a list of (featureID, value) tuples.

        featureIDs should start at 0 and increase to the number of features - 1.

        point (str): A comma separated string where the first value is the label and the rest
            are features.

        list: A list of (featureID, value) tuples.
    # make a list of (featureID, value) tuples, skipping the first element (the label)
    return [(k,v) for k,v in enumerate(point[2:].split(','))]

parsedTrainFeat =
print parsedTrainFeat.count()
numCategories = (parsedTrainFeat
                 .flatMap(lambda x: x)
                 .map(lambda x: (x[0], 1))
                 .reduceByKey(lambda x, y: x + y)

print numCategories[2][1]


In [28]:
# TEST Extract features (3b)
Test.assertEquals(numCategories[2][1], 855, 'incorrect implementation of parsePoint')
Test.assertEquals(numCategories[32][1], 4, 'incorrect implementation of parsePoint')

1 test passed.
1 test passed.

(3c) Create an OHE dictionary from the dataset

Note that parsePoint returns a data point as a list of (featureID, category) tuples, which is the same format as the sample dataset studied in Parts 1 and 2 of this lab. Using this observation, create an OHE dictionary using the function implemented in Part (2c). Note that we will assume for simplicity that all features in our CTR dataset are categorical.

In [29]:
# TODO: Replace <FILL IN> with appropriate code
ctrOHEDict = createOneHotDict(parsedTrainFeat)
print 'Len of ctrOHEDict: {0}'.format(len(ctrOHEDict))
numCtrOHEFeats = len(ctrOHEDict.keys())
print numCtrOHEFeats
print ctrOHEDict.has_key((0, ''))

Len of ctrOHEDict: 233286

In [30]:
theItems = ctrOHEDict.items()
for i in range(0,9):
    print theItems[i]

((16, u'0e4666ea'), 116918)
((28, u'afc930ae'), 0)
((24, u'5af66cdb'), 116921)
((5, u'1454'), 116940)
((2, u'105'), 116924)
((33, u'59af4e91'), 116950)
((15, u'2f997378'), 42575)
((4, u'30340'), 116927)
((38, u'83a7395a'), 116928)

In [31]:
# TEST Create an OHE dictionary from the dataset (3c)
Test.assertEquals(numCtrOHEFeats, 233286, 'incorrect number of features in ctrOHEDict')
Test.assertTrue((0, '') in ctrOHEDict, 'incorrect features in ctrOHEDict')

1 test passed.
1 test passed.

(3d) Apply OHE to the dataset

Now let's use this OHE dictionary by starting with the raw training data and creating an RDD of LabeledPoint objects using OHE features. To do this, complete the implementation of the parseOHEPoint function. Hint: parseOHEPoint is an extension of the parsePoint function from Part (3b) and it uses the oneHotEncoding function from Part (1d).

In [32]:
from pyspark.mllib.regression import LabeledPoint
print rawTrainData.count()
r = rawTrainData.first()
l = parsePoint(r)
print 'Length of parsed list: %d' % len(l)
print 'Here\'s the list ...'
print l
sv = oneHotEncoding(l, ctrOHEDict, numCtrOHEFeats)
print 'Here\'s the sparsevector ...'
print sv
lp = LabeledPoint(float(r[:1]), sv)
print 'Here\'s the labeledpoint ...'
print lp

Length of parsed list: 39
Here's the list ...
[(0, u'1'), (1, u'1'), (2, u'5'), (3, u'0'), (4, u'1382'), (5, u'4'), (6, u'15'), (7, u'2'), (8, u'181'), (9, u'1'), (10, u'2'), (11, u''), (12, u'2'), (13, u'68fd1e64'), (14, u'80e26c9b'), (15, u'fb936136'), (16, u'7b4723c4'), (17, u'25c83c98'), (18, u'7e0ccccf'), (19, u'de7995b8'), (20, u'1f89b562'), (21, u'a73ee510'), (22, u'a8cd5504'), (23, u'b2cb9c98'), (24, u'37c9c164'), (25, u'2824a5f6'), (26, u'1adce6ef'), (27, u'8ba8b39a'), (28, u'891b62e7'), (29, u'e5ba7672'), (30, u'f54016b9'), (31, u'21ddcdc9'), (32, u'b1252a9d'), (33, u'07b5194c'), (34, u''), (35, u'3a171ecb'), (36, u'c5c50484'), (37, u'e8b83407'), (38, u'9727dd16')]
Here's the sparsevector ...
Here's the labeledpoint ...

In [33]:
# TODO: Replace <FILL IN> with appropriate code
def parseOHEPoint(point, OHEDict, numOHEFeats):
    """Obtain the label and feature vector for this raw observation.

        You must use the function `oneHotEncoding` in this implementation or later portions
        of this lab may not function as expected.

        point (str): A comma separated string where the first value is the label and the rest
            are features.
        OHEDict (dict of (int, str) to int): Mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The number of unique features in the training dataset.

        LabeledPoint: Contains the label for the observation and the one-hot-encoding of the
            raw features based on the provided OHE dictionary.
    # first, get the label
    label = float(point[:1])
    parsed = parsePoint(point)
    features = oneHotEncoding(parsed, OHEDict, numOHEFeats)
    # return parsed
    return LabeledPoint(label,features)
def toOHEPoint(point):
    return parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats)

rawTrainData = rawTrainData.repartition(8)
OHETrainData =
print OHETrainData.take(1)

# Check that oneHotEncoding function was used in parseOHEPoint
backupOneHot = oneHotEncoding
oneHotEncoding = None
withOneHot = False
try: parseOHEPoint(rawTrainData.take(1)[0], ctrOHEDict, numCtrOHEFeats)
except TypeError: withOneHot = True
oneHotEncoding = backupOneHot

[LabeledPoint(0.0, (233286,[4568,8862,11283,11683,26652,26784,31213,36164,39525,42637,47112,48536,51441,61786,66603,82057,88304,91113,98450,109699,110378,114867,117015,120260,121552,122902,122914,125571,132470,182289,184132,185498,185642,186074,187389,188357,190836,200292,204812],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]

In [34]:
# TEST Apply OHE to the dataset (3d)
numNZ = sum( x: len(x)).take(5))
numNZAlt = sum( lp: len(lp.features.indices)).take(5))
Test.assertEquals(numNZ, numNZAlt, 'incorrect implementation of parseOHEPoint')
Test.assertTrue(withOneHot, 'oneHotEncoding not present in parseOHEPoint')

1 test passed.
1 test passed.

Visualization 1: Feature frequency

We will now visualize the number of times each of the 233,286 OHE features appears in the training data. We first compute the number of times each feature appears, then bucket the features by these counts. The buckets are sized by powers of 2, so the first bucket corresponds to features that appear exactly once ( $ \scriptsize 2^0 $ ), the second to features that appear twice ( $ \scriptsize 2^1 $ ), the third to features that occur between three and four ( $ \scriptsize 2^2 $ ) times, the fifth bucket is five to eight ( $ \scriptsize 2^3 $ ) times and so on. The scatter plot below shows the logarithm of the bucket thresholds versus the logarithm of the number of features that have counts that fall in the buckets.

In [35]:
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1),("b", 1), ("b", 1), ("b", 1), ("b", 1)], 3)
y = x.reduceByKey(lambda accum, n: accum + n)

[('b', 5), ('a', 3)]

In [36]:
def bucketFeatByCount(featCount):
    """Bucket the counts by powers of two."""
    for i in range(11):
        size = 2 ** i
        if featCount <= size:
            return size
    return -1

featCounts = (OHETrainData
              .flatMap(lambda lp: lp.features.indices)
              .map(lambda x: (x, 1))
              .reduceByKey(lambda x, y: x + y))
featCountsBuckets = (featCounts
                     .map(lambda x: (bucketFeatByCount(x[1]), 1))
                     .filter(lambda (k, v): k != -1)
                     .reduceByKey(lambda x, y: x + y)
print featCountsBuckets

[(128, 1476), (1024, 255), (32, 4755), (64, 2627), (256, 748), (16, 7752), (8, 11440), (512, 414), (1, 162813), (2, 24076), (4, 16639)]

In [37]:
import matplotlib.pyplot as plt
%matplotlib inline

x, y = zip(*featCountsBuckets)
x, y = np.log(x), np.log(y)

def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
    """Template for generating the plot layout."""
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        if hideLabels: axis.set_ticklabels([])
    plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

# generate layout and plot data
fig, ax = preparePlot(np.arange(0, 10, 1), np.arange(4, 14, 2))
ax.set_xlabel(r'$\log_e(bucketSize)$'), ax.set_ylabel(r'$\log_e(countInBucket)$')
plt.scatter(x, y, s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
# display(fig)

(3e) Handling unseen features

We naturally would like to repeat the process from Part (3d), e.g., to compute OHE features for the validation and test datasets. However, we must be careful, as some categorical values will likely appear in new data that did not exist in the training data. To deal with this situation, update the oneHotEncoding() function from Part (1d) to ignore previously unseen categories, and then compute OHE features for the validation data.

In [38]:
# TODO: Replace <FILL IN> with appropriate code
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
    """Produce a one-hot-encoding from a list of features and an OHE dictionary.

        If a (featureID, value) tuple doesn't have a corresponding key in OHEDict it should be

        rawFeats (list of (int, str)): The features corresponding to a single observation.  Each
            feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
        OHEDict (dict): A mapping of (featureID, value) to unique integer.
        numOHEFeats (int): The total number of unique OHE features (combinations of featureID and

        SparseVector: A SparseVector of length numOHEFeats with indices equal to the unique
            identifiers for the (featureID, value) combinations that occur in the observation and
            with values equal to 1.0.
    newFeats = []
    idx = []
    for f in rawFeats:
        if f in OHEDict:
            newFeats += [1.0]
            idx += [OHEDict[f]]
    return SparseVector(numOHEFeats, sorted(idx), np.array(newFeats))

OHEValidationData = point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
print OHEValidationData.take(1)

[LabeledPoint(0.0, (233286,[7576,9187,15510,21585,31213,36164,39525,49198,61786,66603,67218,68211,68311,73035,76672,81329,81396,91981,96929,98450,109699,110946,117015,121552,141711,146496,147649,171128,184132,184687,185498,194763,198537,201039,210717,213588,222162,222626,227709],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]

In [39]:
# TEST Handling unseen features (3e)
numNZVal = (OHEValidationData
            .map(lambda lp: len(lp.features.indices))
Test.assertEquals(numNZVal, 372080, 'incorrect number of features')

1 test passed.

Part 4: CTR prediction and logloss evaluation

(4a) Logistic regression

We are now ready to train our first CTR classifier. A natural classifier to use in this setting is logistic regression, since it models the probability of a click-through event rather than returning a binary response, and when working with rare events, probabilistic predictions are useful.

First use LogisticRegressionWithSGD to train a model using OHETrainData with the given hyperparameter configuration. LogisticRegressionWithSGD returns a LogisticRegressionModel. Next, use the LogisticRegressionModel.weights and LogisticRegressionModel.intercept attributes to print out the model's parameters. Note that these are the names of the object's attributes and should be called using a syntax like model.weights for a given model.

In [40]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

# fixed hyperparameters
numIters = 50
stepSize = 10.
regParam = 1e-6
regType = 'l2'
includeIntercept = True

In [41]:
# TODO: Replace <FILL IN> with appropriate code
model0 = LogisticRegressionWithSGD.train(OHETrainData, 
sortedWeights = sorted(model0.weights)
print sortedWeights[:5], model0.intercept

[-0.45899236854998832, -0.37973707649113725, -0.36996558267199342, -0.36934962880693828, -0.3269794541587136] 0.564550840262

In [42]:
# TEST Logistic regression (4a)
Test.assertTrue(np.allclose(model0.intercept,  0.56455084025), 'incorrect value for model0.intercept')
                [-0.45899236853575609, -0.37973707648623956, -0.36996558266753304,
                 -0.36934962879928263, -0.32697945415010637]), 'incorrect value for model0.weights')

1 test passed.
1 test passed.

(4b) Log loss Throughout this lab, we will use log loss to evaluate the quality of models. Log loss is defined as: $$ \scriptsize \ell_{log}(p, y) = \begin{cases} -\log (p) & \text{if } y = 1 \\\ -\log(1-p) & \text{if } y = 0 \end{cases} $$ where $ \scriptsize p$ is a probability between 0 and 1 and $ \scriptsize y$ is a label of either 0 or 1. Log loss is a standard evaluation criterion when predicting rare-events such as click-through rate prediction (it is also the criterion used in the Criteo Kaggle competition).

Write a function to compute log loss, and evaluate it on some sample inputs.

In [43]:
# TODO: Replace <FILL IN> with appropriate code
from math import log

def computeLogLoss(p, y):
    """Calculates the value of log loss for a given probabilty and label.

        log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
        and when p is 1 we need to subtract a small value (epsilon) from it.

        p (float): A probabilty between 0 and 1.
        y (int): A label.  Takes on the values 0 and 1.

        float: The log loss value.
    epsilon = 10e-12
    if p not in [0.0,1.0]:
        logeval = p
    elif p == 0:
        logeval = p+epsilon
        logeval = p-epsilon
    if y == 1:
        return (-log(logeval))
    elif y == 0:
        return (-log(1-logeval))

print computeLogLoss(.5, 1)
print computeLogLoss(.5, 0)
print computeLogLoss(.99, 1)
print computeLogLoss(.99, 0)
print computeLogLoss(.01, 1)
print computeLogLoss(.01, 0)
print computeLogLoss(0, 1)
print computeLogLoss(1, 1)
print computeLogLoss(1, 0)


In [44]:
# TEST Log loss (4b)
Test.assertTrue(np.allclose([computeLogLoss(.5, 1), computeLogLoss(.01, 0), computeLogLoss(.01, 1)],
                            [0.69314718056, 0.0100503358535, 4.60517018599]),
                'computeLogLoss is not correct')
Test.assertTrue(np.allclose([computeLogLoss(0, 1), computeLogLoss(1, 1), computeLogLoss(1, 0)],
                            [25.3284360229, 1.00000008275e-11, 25.3284360229]),
                'computeLogLoss needs to bound p away from 0 and 1 by epsilon')

1 test passed.
1 test passed.

(4c) Baseline log loss

Next we will use the function we wrote in Part (4b) to compute the baseline log loss on the training data. A very simple yet natural baseline model is one where we always make the same prediction independent of the given datapoint, setting the predicted value equal to the fraction of training points that correspond to click-through events (i.e., where the label is one). Compute this value (which is simply the mean of the training labels), and then use it to compute the training log loss for the baseline model. The log loss for multiple observations is the mean of the individual log loss values.

In [45]:
# TODO: Replace <FILL IN> with appropriate code
# Note that our dataset has a very high click-through rate by design
# In practice click-through rate can be one to two orders of magnitude lower
classOneFracTrain = p: p.label).mean()
print classOneFracTrain

logLossTrBase = p: computeLogLoss(classOneFracTrain,p.label) ).mean()
print 'Baseline Train Logloss = {0:.6f}\n'.format(logLossTrBase)

Baseline Train Logloss = 0.535844

In [46]:
# TEST Baseline log loss (4c)
Test.assertTrue(np.allclose(classOneFracTrain, 0.22717773523), 'incorrect value for classOneFracTrain')
Test.assertTrue(np.allclose(logLossTrBase, 0.535844), 'incorrect value for logLossTrBase')

1 test passed.
1 test passed.

(4d) Predicted probability

In order to compute the log loss for the model we trained in Part (4a), we need to write code to generate predictions from this model. Write a function that computes the raw linear prediction from this logistic regression model and then passes it through a sigmoid function $ \scriptsize \sigma(t) = (1+ e^{-t})^{-1} $ to return the model's probabilistic prediction. Then compute probabilistic predictions on the training data.

Note that when incorporating an intercept into our predictions, we simply add the intercept to the value of the prediction obtained from the weights and features. Alternatively, if the intercept was included as the first weight, we would need to add a corresponding feature to our data where the feature has the value one. This is not the case here.

In [47]:
# TODO: Replace <FILL IN> with appropriate code
from math import exp #  exp(-t) = e^-t

def getP(x, w, intercept):
    """Calculate the probability for an observation given a set of weights and intercept.

        We'll bound our raw prediction between 20 and -20 for numerical purposes.

        x (SparseVector): A vector with values of 1.0 for features that exist in this
            observation and 0.0 otherwise.
        w (DenseVector): A vector of weights (betas) for the model.
        intercept (float): The model's intercept.

        float: A probability between 0 and 1.
    rawPrediction = + intercept

    # Bound the raw prediction value
    rawPrediction = min(rawPrediction, 20)
    rawPrediction = max(rawPrediction, -20)
    return ( 1 / (1 + exp(-1*rawPrediction)) )

trainingPredictions = p: getP(p.features,model0.weights, model0.intercept))

print trainingPredictions.take(5)

[0.17846102057862437, 0.5389775379363428, 0.20823838152654436, 0.15776692277487175, 0.7389514801598551]

In [48]:
# TEST Predicted probability (4d)
Test.assertTrue(np.allclose(trainingPredictions.sum(), 18135.4834348),
                'incorrect value for trainingPredictions')

1 test passed.

(4e) Evaluate the model

We are now ready to evaluate the quality of the model we trained in Part (4a). To do this, first write a general function that takes as input a model and data, and outputs the log loss. Then run this function on the OHE training data, and compare the result with the baseline log loss.

In [49]:
a = p: (getP(p.features, model0.weights, model0.intercept), p.label))
print a.count()
print a.take(5)
b = lp: computeLogLoss(lp[0],lp[1]))
print b.count()
print b.take(5)

[(0.17846102057862437, 0.0), (0.5389775379363428, 0.0), (0.20823838152654436, 0.0), (0.15776692277487175, 0.0), (0.7389514801598552, 1.0)]
[0.19657589354786234, 0.7743085125224802, 0.23349491924926188, 0.17169848923911749, 0.30252301626536376]

In [50]:
# TODO: Replace <FILL IN> with appropriate code
def evaluateResults(model, data):
    """Calculates the log loss for the data given the model.

        model (LogisticRegressionModel): A trained logistic regression model.
        data (RDD of LabeledPoint): Labels and features for each observation.

        float: Log loss for the data.
    # Run a map to create an RDD of (prediction, label) tuples
    preds_labels = p: (getP(p.features, model.weights, model.intercept), p.label))
    return lp: computeLogLoss(lp[0], lp[1])).mean()

logLossTrLR0 = evaluateResults(model0, OHETrainData)
print ('OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.6f}'
       .format(logLossTrBase, logLossTrLR0))

OHE Features Train Logloss:
	Baseline = 0.536
	LogReg = 0.456903

In [51]:
# TEST Evaluate the model (4e)
Test.assertTrue(np.allclose(logLossTrLR0, 0.456903), 'incorrect value for logLossTrLR0')

1 test passed.

(4f) Validation log loss

Next, following the same logic as in Parts (4c) and 4(e), compute the validation log loss for both the baseline and logistic regression models. Notably, the baseline model for the validation data should still be based on the label fraction from the training dataset.

In [59]:
# TODO: Replace <FILL IN> with appropriate code
logLossValBase = p: computeLogLoss(classOneFracTrain, p.label)).mean()

logLossValLR0 = evaluateResults(model0, OHEValidationData)
print ('OHE Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.6f}'
       .format(logLossValBase, logLossValLR0))

OHE Features Validation Logloss:
	Baseline = 0.528
	LogReg = 0.456957

In [60]:
# TEST Validation log loss (4f)
Test.assertTrue(np.allclose(logLossValBase, 0.527603), 'incorrect value for logLossValBase')
Test.assertTrue(np.allclose(logLossValLR0, 0.456957), 'incorrect value for logLossValLR0')

1 test passed.
1 test passed.

Visualization 2: ROC curve

We will now visualize how well the model predicts our target. To do this we generate a plot of the ROC curve. The ROC curve shows us the trade-off between the false positive rate and true positive rate, as we liberalize the threshold required to predict a positive outcome. A random model is represented by the dashed line.

In [61]:
labelsAndScores = lp:
                                            (lp.label, getP(lp.features, model0.weights, model0.intercept)))
labelsAndWeights = labelsAndScores.collect()
labelsAndWeights.sort(key=lambda (k, v): v, reverse=True)
labelsByWeight = np.array([k for (k, v) in labelsAndWeights])

length = labelsByWeight.size
truePositives = labelsByWeight.cumsum()
numPositive = truePositives[-1]
falsePositives = np.arange(1.0, length + 1, 1.) - truePositives

truePositiveRate = truePositives / numPositive
falsePositiveRate = falsePositives / (length - numPositive)

# Generate layout and plot data
fig, ax = preparePlot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate (Sensitivity)')
ax.set_xlabel('False Positive Rate (1 - Specificity)')
plt.plot(falsePositiveRate, truePositiveRate, color='#8cbfd0', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='#d6ebf2', linewidth=2.)  # Baseline model
# display(fig)

Part 5: Reduce feature dimension via feature hashing

(5a) Hash function

As we just saw, using a one-hot-encoding featurization can yield a model with good statistical accuracy. However, the number of distinct categories across all features is quite large -- recall that we observed 233K categories in the training data in Part (3c). Moreover, the full Kaggle training dataset includes more than 33M distinct categories, and the Kaggle dataset itself is just a small subset of Criteo's labeled data. Hence, featurizing via a one-hot-encoding representation would lead to a very large feature vector. To reduce the dimensionality of the feature space, we will use feature hashing.

Below is the hash function that we will use for this part of the lab. We will first use this hash function with the three sample data points from Part (1a) to gain some intuition. Specifically, run code to hash the three sample points using two different values for numBuckets and observe the resulting hashed feature dictionaries.

In [62]:
from collections import defaultdict
import hashlib

def hashFunction(numBuckets, rawFeats, printMapping=False):
    """Calculate a feature dictionary for an observation's features based on hashing.

        Use printMapping=True for debug purposes and to better understand how the hashing works.

        numBuckets (int): Number of buckets to use as features.
        rawFeats (list of (int, str)): A list of features for an observation.  Represented as
            (featureID, value) tuples.
        printMapping (bool, optional): If true, the mappings of featureString to index will be

        dict of int to float:  The keys will be integers which represent the buckets that the
            features have been hashed to.  The value for a given key will contain the count of the
            (featureID, value) tuples that have hashed to that key.
    mapping = {}
    for ind, category in rawFeats:
        featureString = category + str(ind)
        mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
    if(printMapping): print mapping
    sparseFeatures = defaultdict(float)
    for bucket in mapping.values():
        sparseFeatures[bucket] += 1.0
    return dict(sparseFeatures)

# Reminder of the sample values:
# sampleOne = [(0, 'mouse'), (1, 'black')]
# sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sampleThree =  [(0, 'bear'), (1, 'black'), (2, 'salmon')]

In [63]:
# TODO: Replace <FILL IN> with appropriate code
# Use four buckets
sampOneFourBuckets = hashFunction(4, sampleOne, True)
sampTwoFourBuckets = hashFunction(4, sampleTwo, True)
sampThreeFourBuckets = hashFunction(4, sampleThree, True)

# Use one hundred buckets
sampOneHundredBuckets = hashFunction(100, sampleOne, True)
sampTwoHundredBuckets = hashFunction(100, sampleTwo, True)
sampThreeHundredBuckets = hashFunction(100, sampleThree, True)

print '\t\t 4 Buckets \t\t\t 100 Buckets'
print 'SampleOne:\t {0}\t\t {1}'.format(sampOneFourBuckets, sampOneHundredBuckets)
print 'SampleTwo:\t {0}\t\t {1}'.format(sampTwoFourBuckets, sampTwoHundredBuckets)
print 'SampleThree:\t {0}\t {1}'.format(sampThreeFourBuckets, sampThreeHundredBuckets)

{'black1': 2, 'mouse0': 3}
{'cat0': 0, 'tabby1': 0, 'mouse2': 2}
{'bear0': 0, 'black1': 2, 'salmon2': 1}
{'black1': 14, 'mouse0': 31}
{'cat0': 40, 'tabby1': 16, 'mouse2': 62}
{'bear0': 72, 'black1': 14, 'salmon2': 5}
		 4 Buckets 			 100 Buckets
SampleOne:	 {2: 1.0, 3: 1.0}		 {14: 1.0, 31: 1.0}
SampleTwo:	 {0: 2.0, 2: 1.0}		 {40: 1.0, 16: 1.0, 62: 1.0}
SampleThree:	 {0: 1.0, 1: 1.0, 2: 1.0}	 {72: 1.0, 5: 1.0, 14: 1.0}

In [64]:
# TEST Hash function (5a)
Test.assertEquals(sampOneFourBuckets, {2: 1.0, 3: 1.0}, 'incorrect value for sampOneFourBuckets')
Test.assertEquals(sampThreeHundredBuckets, {72: 1.0, 5: 1.0, 14: 1.0},
                  'incorrect value for sampThreeHundredBuckets')

1 test passed.
1 test passed.

(5b) Creating hashed features

Next we will use this hash function to create hashed features for our CTR datasets. First write a function that uses the hash function from Part (5a) with numBuckets = $ \scriptsize 2^{15} \approx 33K $ to create a LabeledPoint with hashed features stored as a SparseVector. Then use this function to create new training, validation and test datasets with hashed features. Hint: parsedHashPoint is similar to parseOHEPoint from Part (3d).

In [92]:
feats = [(k,v) for k,v in enumerate(rawTrainData.take(1)[0][2:].split(','))]
print feats
hashDict = hashFunction(2 ** 15, feats)
print hashDict
print len(hashDict)
print 2**15

[(0, u''), (1, u'893'), (2, u''), (3, u''), (4, u'4392'), (5, u''), (6, u'0'), (7, u'0'), (8, u'0'), (9, u''), (10, u'0'), (11, u''), (12, u''), (13, u'68fd1e64'), (14, u'2c16a946'), (15, u'a9a87e68'), (16, u'2e17d6f6'), (17, u'25c83c98'), (18, u'fe6b92e5'), (19, u'2e8a689b'), (20, u'0b153874'), (21, u'a73ee510'), (22, u'efea433b'), (23, u'e51ddf94'), (24, u'a30567ca'), (25, u'3516f6e6'), (26, u'07d13a8f'), (27, u'18231224'), (28, u'52b8680f'), (29, u'1e88c74f'), (30, u'74ef3502'), (31, u''), (32, u''), (33, u'6b3a5ca6'), (34, u''), (35, u'3a171ecb'), (36, u'9117a34a'), (37, u''), (38, u'')]
{2817: 1.0, 23554: 1.0, 20868: 1.0, 4613: 1.0, 11017: 1.0, 26384: 1.0, 8209: 1.0, 21396: 1.0, 17176: 1.0, 3807: 1.0, 22943: 1.0, 11558: 1.0, 6952: 1.0, 24234: 1.0, 1580: 1.0, 10800: 1.0, 28211: 1.0, 5302: 1.0, 4667: 1.0, 19274: 1.0, 12610: 1.0, 11722: 1.0, 15951: 1.0, 26962: 1.0, 6357: 1.0, 25818: 1.0, 5725: 1.0, 23903: 1.0, 21008: 1.0, 13026: 1.0, 7781: 1.0, 19046: 1.0, 18023: 1.0, 20434: 1.0, 25533: 1.0, 2033: 1.0, 15091: 1.0, 28405: 1.0, 6007: 1.0}

In [96]:
# TODO: Replace <FILL IN> with appropriate code
def parseHashPoint(point, numBuckets):
    """Create a LabeledPoint for this observation using hashing.

        point (str): A comma separated string where the first value is the label and the rest are
        numBuckets: The number of buckets to hash to.

        LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
    label = float(point[:1])
    rawFeats = [(k,v) for k,v in enumerate(point[2:].split(','))]
    hashDict = hashFunction(numBuckets, rawFeats)
    return LabeledPoint(label,SparseVector(len(hashDict), sorted(hashDict.keys()), hashDict.values()))

numBucketsCTR = 2 ** 15
hashTrainData = r: parseHashPoint(r,numBucketsCTR))
hashValidationData = r: parseHashPoint(r,numBucketsCTR))
hashTestData = r: parseHashPoint(r,numBucketsCTR))

a = hashTrainData.take(1)
print a

[LabeledPoint(0.0, (39,[1580,2033,2817,3807,4613,4667,5302,5725,6007,6357,6952,7781,8209,10800,11017,11558,11722,12610,13026,15091,15951,17176,18023,19046,19274,20434,20868,21008,21396,22943,23554,23903,24234,25533,25818,26384,26962,28211,28405],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))]

In [97]:
# TEST Creating hashed features (5b)
hashTrainDataFeatureSum = sum(hashTrainData
                           .map(lambda lp: len(lp.features.indices))
print hashTrainDataFeatureSum
hashTrainDataLabelSum = sum(hashTrainData
                         .map(lambda lp: lp.label)
print hashTrainDataLabelSum
hashValidationDataFeatureSum = sum(hashValidationData
                                .map(lambda lp: len(lp.features.indices))
hashValidationDataLabelSum = sum(hashValidationData
                              .map(lambda lp: lp.label)
hashTestDataFeatureSum = sum(hashTestData
                          .map(lambda lp: len(lp.features.indices))
hashTestDataLabelSum = sum(hashTestData
                        .map(lambda lp: lp.label)

Test.assertEquals(hashTrainDataFeatureSum, 772, 'incorrect number of features in hashTrainData')
Test.assertEquals(hashTrainDataLabelSum, 24.0, 'incorrect labels in hashTrainData')
Test.assertEquals(hashValidationDataFeatureSum, 776,
                  'incorrect number of features in hashValidationData')
Test.assertEquals(hashValidationDataLabelSum, 16.0, 'incorrect labels in hashValidationData')
Test.assertEquals(hashTestDataFeatureSum, 774, 'incorrect number of features in hashTestData')
Test.assertEquals(hashTestDataLabelSum, 23.0, 'incorrect labels in hashTestData')

1 test failed. incorrect number of features in hashTrainData
1 test failed. incorrect labels in hashTrainData
1 test passed.
1 test passed.
1 test passed.
1 test passed.

(5c) Sparsity

Since we have 33K hashed features versus 233K OHE features, we should expect OHE features to be sparser. Verify this hypothesis by computing the average sparsity of the OHE and the hashed training datasets.

Note that if you have a SparseVector named sparse, calling len(sparse) returns the total number of features, not the number features with entries. SparseVector objects have the attributes indices and values that contain information about which features are nonzero. Continuing with our example, these can be accessed using sparse.indices and sparse.values, respectively.

In [107]:
s = sum( lp: len(lp.features.indices) / float(numBucketsCTR) ).collect()) / nTrain
# ratios.count()


In [108]:
# TODO: Replace <FILL IN> with appropriate code
def computeSparsity(data, d, n):
    """Calculates the average sparsity for the features in an RDD of LabeledPoints.

        data (RDD of LabeledPoint): The LabeledPoints to use in the sparsity calculation.
        d (int): The total number of features.
        n (int): The number of observations in the RDD.

        float: The average of the ratio of features in a point to total features.
    return sum( lp: len(lp.features.indices) / float(d) ).collect()) / n

averageSparsityHash = computeSparsity(hashTrainData, numBucketsCTR, nTrain)
averageSparsityOHE = computeSparsity(OHETrainData, numCtrOHEFeats, nTrain)

print 'Average OHE Sparsity: {0:.7e}'.format(averageSparsityOHE)
print 'Average Hash Sparsity: {0:.7e}'.format(averageSparsityHash)

Average OHE Sparsity: 1.6582419e-04
Average Hash Sparsity: 1.1805561e-03

In [109]:
# TEST Sparsity (5c)
Test.assertTrue(np.allclose(averageSparsityOHE, 1.6717677e-04),
                'incorrect value for averageSparsityOHE')
Test.assertTrue(np.allclose(averageSparsityHash, 1.1805561e-03),
                'incorrect value for averageSparsityHash')

1 test failed. incorrect value for averageSparsityOHE
1 test passed.

In [35]:

In [ ]: