In [4]:
labVersion = 'cs190_week4_v_1_3'
(featureID, category)
via its own binary feature. We can do this in Python by creating a dictionary that maps each tuple to a distinct integer, where the integer corresponds to a binary feature. To start, manually enter the entries in the OHE dictionary associated with the sample dataset by mapping the tuples to consecutive integers starting from zero, ordering the tuples first by featureID and next by category.
In [5]:
sampleOne = [(0, 'mouse'), (1, 'black')]
sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
sampleThree = [(0, 'bear'), (1, 'black'), (2, 'salmon')]
sampleDataRDD = sc.parallelize([sampleOne, sampleTwo, sampleThree])
In [6]:
sampleOHEDictManual = {}
sampleOHEDictManual[(0,'bear')] = 0
sampleOHEDictManual[(0,'cat')] = 1
sampleOHEDictManual[(0,'mouse')] = 2
sampleOHEDictManual[(1, 'black')] = 3
sampleOHEDictManual[(1, 'tabby')] = 4
sampleOHEDictManual[(2, 'mouse')] = 5
sampleOHEDictManual[(2, 'salmon')] = 6
In [7]:
# TEST One-hot-encoding (1a)
from test_helper import Test
Test.assertEqualsHashed(sampleOHEDictManual[(0,'bear')],
'b6589fc6ab0dc82cf12099d1c2d40ab994e8410c',
"incorrect value for sampleOHEDictManual[(0,'bear')]")
Test.assertEqualsHashed(sampleOHEDictManual[(0,'cat')],
'356a192b7913b04c54574d18c28d46e6395428ab',
"incorrect value for sampleOHEDictManual[(0,'cat')]")
Test.assertEqualsHashed(sampleOHEDictManual[(0,'mouse')],
'da4b9237bacccdf19c0760cab7aec4a8359010b0',
"incorrect value for sampleOHEDictManual[(0,'mouse')]")
Test.assertEqualsHashed(sampleOHEDictManual[(1,'black')],
'77de68daecd823babbb58edb1c8e14d7106e83bb',
"incorrect value for sampleOHEDictManual[(1,'black')]")
Test.assertEqualsHashed(sampleOHEDictManual[(1,'tabby')],
'1b6453892473a467d07372d45eb05abc2031647a',
"incorrect value for sampleOHEDictManual[(1,'tabby')]")
Test.assertEqualsHashed(sampleOHEDictManual[(2,'mouse')],
'ac3478d69a3c81fa62e60f5c3696165a4e5e6ac4',
"incorrect value for sampleOHEDictManual[(2,'mouse')]")
Test.assertEqualsHashed(sampleOHEDictManual[(2,'salmon')],
'c1dfd96eea8cc2b62785275bca38ac261256e278',
"incorrect value for sampleOHEDictManual[(2,'salmon')]")
Test.assertEquals(len(sampleOHEDictManual.keys()), 7,
'incorrect number of keys in sampleOHEDictManual')
SparseVector(size, *args)
to create a new sparse vector where size is the length of the vector and args is either a dictionary, a list of (index, value) pairs, or two separate arrays of indices and values (sorted by index). You'll need to create a sparse vector representation of each dense vector aDense
and bDense
.
In [8]:
import numpy as np
from pyspark.mllib.linalg import SparseVector
In [9]:
aDense = np.array([0., 3., 0., 4.])
aSparse = SparseVector(4, xrange(0, 4), aDense)
bDense = np.array([0., 0., 0., 1.])
bSparse = SparseVector(4, xrange(0, 4), bDense)
w = np.array([0.4, 3.1, -1.4, -.5])
print aDense.dot(w)
print aSparse.dot(w)
print bDense.dot(w)
print bSparse.dot(w)
In [10]:
# TEST Sparse Vectors (1b)
Test.assertTrue(isinstance(aSparse, SparseVector), 'aSparse needs to be an instance of SparseVector')
Test.assertTrue(isinstance(bSparse, SparseVector), 'aSparse needs to be an instance of SparseVector')
Test.assertTrue(aDense.dot(w) == aSparse.dot(w),
'dot product of aDense and w should equal dot product of aSparse and w')
Test.assertTrue(bDense.dot(w) == bSparse.dot(w),
'dot product of bDense and w should equal dot product of bSparse and w')
DenseVector
for a point with features 2 and 4 would be [0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0]
.
In [11]:
# Reminder of the sample features
# sampleOne = [(0, 'mouse'), (1, 'black')]
# sampleTwo = [(0, 'cat'), (1, 'tabby'), (2, 'mouse')]
# sampleThree = [(0, 'bear'), (1, 'black'), (2, 'salmon')]
In [12]:
sampleOneOHEFeatManual = SparseVector(7, [ (2, 1), (3, 1) ])
sampleTwoOHEFeatManual = SparseVector(7, [ (1, 1), (4, 1), (5, 1) ])
sampleThreeOHEFeatManual = SparseVector(7, [ (0, 1), (3, 1), (6, 1) ])
In [13]:
# TEST OHE Features as sparse vectors (1c)
Test.assertTrue(isinstance(sampleOneOHEFeatManual, SparseVector),
'sampleOneOHEFeatManual needs to be a SparseVector')
Test.assertTrue(isinstance(sampleTwoOHEFeatManual, SparseVector),
'sampleTwoOHEFeatManual needs to be a SparseVector')
Test.assertTrue(isinstance(sampleThreeOHEFeatManual, SparseVector),
'sampleThreeOHEFeatManual needs to be a SparseVector')
Test.assertEqualsHashed(sampleOneOHEFeatManual,
'ecc00223d141b7bd0913d52377cee2cf5783abd6',
'incorrect value for sampleOneOHEFeatManual')
Test.assertEqualsHashed(sampleTwoOHEFeatManual,
'26b023f4109e3b8ab32241938e2e9b9e9d62720a',
'incorrect value for sampleTwoOHEFeatManual')
Test.assertEqualsHashed(sampleThreeOHEFeatManual,
'c04134fd603ae115395b29dcabe9d0c66fbdc8a7',
'incorrect value for sampleThreeOHEFeatManual')
oneHotEncoding
that creates OHE feature vectors in SparseVector
format. Then use this function to create OHE features for the first sample data point and verify that the result matches the result from Part (1c).
In [14]:
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
"""Produce a one-hot-encoding from a list of features and an OHE dictionary.
Note:
You should ensure that the indices used to create a SparseVector are sorted.
Args:
rawFeats (list of (int, str)): The features corresponding to a single observation. Each
feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
OHEDict (dict): A mapping of (featureID, value) to unique integer.
numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
value).
Returns:
SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
identifiers for the (featureID, value) combinations that occur in the observation and
with values equal to 1.0.
"""
features = []
for raw in rawFeats:
features.append((OHEDict[raw], 1))
return SparseVector(numOHEFeats, features)
numSampleOHEFeats = len(sampleOHEDictManual)
sampleOneOHEFeat = oneHotEncoding(sampleOne, sampleOHEDictManual, numSampleOHEFeats)
print sampleOneOHEFeat
In [15]:
# TEST Define an OHE Function (1d)
Test.assertTrue(sampleOneOHEFeat == sampleOneOHEFeatManual,
'sampleOneOHEFeat should equal sampleOneOHEFeatManual')
Test.assertEquals(sampleOneOHEFeat, SparseVector(7, [2,3], [1.0,1.0]),
'incorrect value for sampleOneOHEFeat')
Test.assertEquals(oneHotEncoding([(1, 'black'), (0, 'mouse')], sampleOHEDictManual,
numSampleOHEFeats), SparseVector(7, [2,3], [1.0,1.0]),
'incorrect definition for oneHotEncoding')
In [16]:
sampleOHEData = sampleDataRDD.map(lambda x: oneHotEncoding(x, sampleOHEDictManual, numSampleOHEFeats)).cache()
print sampleOHEData.collect()
In [17]:
# TEST Apply OHE to a dataset (1e)
sampleOHEDataValues = sampleOHEData.collect()
Test.assertTrue(len(sampleOHEDataValues) == 3, 'sampleOHEData should have three elements')
Test.assertEquals(sampleOHEDataValues[0], SparseVector(7, {2: 1.0, 3: 1.0}),
'incorrect OHE for first sample')
Test.assertEquals(sampleOHEDataValues[1], SparseVector(7, {1: 1.0, 4: 1.0, 5: 1.0}),
'incorrect OHE for second sample')
Test.assertEquals(sampleOHEDataValues[2], SparseVector(7, {0: 1.0, 3: 1.0, 6: 1.0}),
'incorrect OHE for third sample')
(featureID, category)
(featureID, category)
tuples. In our sample dataset, the 7 items in the resulting RDD are (0, 'bear')
, (0, 'cat')
, (0, 'mouse')
, (1, 'black')
, (1, 'tabby')
, (2, 'mouse')
, (2, 'salmon')
. Notably 'black'
appears twice in the dataset but only contributes one item to the RDD: (1, 'black')
, while 'mouse'
also appears twice and contributes two items: (0, 'mouse')
and (2, 'mouse')
. Use flatMap and distinct.
In [18]:
sampleDistinctFeats = sampleDataRDD.flatMap(lambda x: x).distinct()
In [19]:
# TEST Pair RDD of (featureID, category) (2a)
Test.assertEquals(sorted(sampleDistinctFeats.collect()),
[(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
(1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
'incorrect value for sampleDistinctFeats')
RDD
of key-value tuples, where each (featureID, category)
tuple in sampleDistinctFeats
is a key and the values are distinct integers ranging from 0 to (number of keys - 1). Then convert this RDD
into a dictionary, which can be done using the collectAsMap
action. Note that there is no unique mapping from keys to values, as all we require is that each (featureID, category)
key be mapped to a unique integer between 0 and the number of keys. In this exercise, any valid mapping is acceptable. Use zipWithIndex followed by collectAsMap.[((0, 'bear'), 0), ((2, 'salmon'), 1), ((1, 'tabby'), 2), ((2, 'mouse'), 3), ((0, 'mouse'), 4), ((0, 'cat'), 5), ((1, 'black'), 6)]
. The dictionary defined in Part (1a) illustrates another valid mapping between keys and integers.
In [20]:
sampleOHEDict = sampleDistinctFeats.zipWithIndex().collectAsMap()
print sampleOHEDict
In [21]:
# TEST OHE Dictionary from distinct features (2b)
Test.assertEquals(sorted(sampleOHEDict.keys()),
[(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
(1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
'sampleOHEDict has unexpected keys')
Test.assertEquals(sorted(sampleOHEDict.values()), range(7), 'sampleOHEDict has unexpected values')
In [23]:
def createOneHotDict(inputData):
"""Creates a one-hot-encoder dictionary based on the input data.
Args:
inputData (RDD of lists of (int, str)): An RDD of observations where each observation is
made up of a list of (featureID, value) tuples.
Returns:
dict: A dictionary where the keys are (featureID, value) tuples and map to values that are
unique integers.
"""
return inputData.flatMap(lambda x: x).distinct().zipWithIndex().collectAsMap()
sampleOHEDictAuto = createOneHotDict(sampleDataRDD)
print sampleOHEDictAuto
In [24]:
# TEST Automated creation of an OHE dictionary (2c)
Test.assertEquals(sorted(sampleOHEDictAuto.keys()),
[(0, 'bear'), (0, 'cat'), (0, 'mouse'), (1, 'black'),
(1, 'tabby'), (2, 'mouse'), (2, 'salmon')],
'sampleOHEDictAuto has unexpected keys')
Test.assertEquals(sorted(sampleOHEDictAuto.values()), range(7),
'sampleOHEDictAuto has unexpected values')
rawData
variable.# TODO
cell below. The file is 8.4 MB compressed. The script below will download the file to the virtual machine (VM) and then extract the data.# TODO
cell below.
In [25]:
# Run this code to view Criteo's agreement
from IPython.lib.display import IFrame
IFrame("http://labs.criteo.com/downloads/2014-kaggle-display-advertising-challenge-dataset/",
600, 350)
Out[25]:
In [26]:
import glob
import os.path
import tarfile
import urllib
import urlparse
# Paste url, url should end with: dac_sample.tar.gz
url = 'http://labs.criteo.com/wp-content/uploads/2015/04/dac_sample.tar.gz'
url = url.strip()
baseDir = os.path.join('data')
inputPath = os.path.join('cs190', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)
inputDir = os.path.split(fileName)[0]
def extractTar(check = False):
# Find the zipped archive and extract the dataset
tars = glob.glob('dac_sample*.tar.gz*')
if check and len(tars) == 0:
return False
if len(tars) > 0:
try:
tarFile = tarfile.open(tars[0])
except tarfile.ReadError:
if not check:
print 'Unable to open tar.gz file. Check your URL.'
return False
tarFile.extract('dac_sample.txt', path=inputDir)
print 'Successfully extracted: dac_sample.txt'
return True
else:
print 'You need to retry the download with the correct url.'
print ('Alternatively, you can upload the dac_sample.tar.gz file to your Jupyter root ' +
'directory')
return False
if os.path.isfile(fileName):
print 'File is already available. Nothing to do.'
elif extractTar(check = True):
print 'tar.gz file was already available.'
elif not url.endswith('dac_sample.tar.gz'):
print 'Check your download url. Are you downloading the Sample dataset?'
else:
# Download the file and store it in the same directory as this notebook
try:
urllib.urlretrieve(url, os.path.basename(urlparse.urlsplit(url).path))
except IOError:
print 'Unable to download and store: {0}'.format(url)
extractTar()
In [27]:
import os.path
baseDir = os.path.join('data')
inputPath = os.path.join('cs190', 'dac_sample.txt')
fileName = os.path.join(baseDir, inputPath)
if os.path.isfile(fileName):
rawData = (sc
.textFile(fileName, 2)
.map(lambda x: x.replace('\t', ','))) # work with either ',' or '\t' separated data
print rawData.take(1)
In [28]:
weights = [.8, .1, .1]
seed = 42
rawTrainData, rawValidationData, rawTestData = rawData.randomSplit(weights, seed=seed)
rawTrainData.cache()
rawValidationData.cache()
rawTestData.cache()
nTrain = rawTrainData.count()
nVal = rawValidationData.count()
nTest = rawTestData.count()
print nTrain, nVal, nTest, nTrain + nVal + nTest
print rawData.take(1)
In [29]:
# TEST Loading and splitting the data (3a)
Test.assertTrue(all([rawTrainData.is_cached, rawValidationData.is_cached, rawTestData.is_cached]),
'you must cache the split data')
Test.assertEquals(nTrain, 79911, 'incorrect value for nTrain')
Test.assertEquals(nVal, 10075, 'incorrect value for nVal')
Test.assertEquals(nTest, 10014, 'incorrect value for nTest')
take()
command in Part (3a) that each raw data point is a string containing several fields separated by some delimiter. For now, we will ignore the first field (which is the 0-1 label), and parse the remaining fields (or raw features). To do this, complete the implemention of the parsePoint
function.
In [30]:
def parsePoint(point):
"""Converts a comma separated string into a list of (featureID, value) tuples.
Note:
featureIDs should start at 0 and increase to the number of features - 1.
Args:
point (str): A comma separated string where the first value is the label and the rest
are features.
Returns:
list: A list of (featureID, value) tuples.
"""
features = point.split(",")[1:]
result = []
for i, f in enumerate(features):
result.append((i, f))
return result
parsedTrainFeat = rawTrainData.map(parsePoint)
numCategories = (parsedTrainFeat
.flatMap(lambda x: x)
.distinct()
.map(lambda x: (x[0], 1))
.reduceByKey(lambda x, y: x + y)
.sortByKey()
.collect())
print numCategories[2][1]
In [31]:
# TEST Extract features (3b)
Test.assertEquals(numCategories[2][1], 855, 'incorrect implementation of parsePoint')
Test.assertEquals(numCategories[32][1], 4, 'incorrect implementation of parsePoint')
(featureID, category)
tuples, which is the same format as the sample dataset studied in Parts 1 and 2 of this lab. Using this observation, create an OHE dictionary using the function implemented in Part (2c). Note that we will assume for simplicity that all features in our CTR dataset are categorical.
In [32]:
ctrOHEDict = createOneHotDict(parsedTrainFeat)
numCtrOHEFeats = len(ctrOHEDict.keys())
print numCtrOHEFeats
print ctrOHEDict[(0, '')]
In [33]:
# TEST Create an OHE dictionary from the dataset (3c)
Test.assertEquals(numCtrOHEFeats, 233286, 'incorrect number of features in ctrOHEDict')
Test.assertTrue((0, '') in ctrOHEDict, 'incorrect features in ctrOHEDict')
parseOHEPoint
function. Hint: parseOHEPoint
is an extension of the parsePoint
function from Part (3b) and it uses the oneHotEncoding
function from Part (1d).
In [34]:
from pyspark.mllib.regression import LabeledPoint
In [35]:
def parseOHEPoint(point, OHEDict, numOHEFeats):
"""Obtain the label and feature vector for this raw observation.
Note:
You must use the function `oneHotEncoding` in this implementation or later portions
of this lab may not function as expected.
Args:
point (str): A comma separated string where the first value is the label and the rest
are features.
OHEDict (dict of (int, str) to int): Mapping of (featureID, value) to unique integer.
numOHEFeats (int): The number of unique features in the training dataset.
Returns:
LabeledPoint: Contains the label for the observation and the one-hot-encoding of the
raw features based on the provided OHE dictionary.
"""
splited = point.split(",")
features = splited[1:]
result = []
for i, f in enumerate(features):
result.append((i, f))
return LabeledPoint(splited[0], oneHotEncoding(result, OHEDict, numOHEFeats))
OHETrainData = rawTrainData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHETrainData.cache()
print OHETrainData.take(1)
backupOneHot = oneHotEncoding
oneHotEncoding = None
withOneHot = False
try: parseOHEPoint(rawTrainData.take(1)[0], ctrOHEDict, numCtrOHEFeats)
except TypeError: withOneHot = True
oneHotEncoding = backupOneHot
In [36]:
# TEST Apply OHE to the dataset (3d)
numNZ = sum(parsedTrainFeat.map(lambda x: len(x)).take(5))
numNZAlt = sum(OHETrainData.map(lambda lp: len(lp.features.indices)).take(5))
Test.assertEquals(numNZ, numNZAlt, 'incorrect implementation of parseOHEPoint')
Test.assertTrue(withOneHot, 'oneHotEncoding not present in parseOHEPoint')
In [37]:
def bucketFeatByCount(featCount):
"""Bucket the counts by powers of two."""
for i in range(11):
size = 2 ** i
if featCount <= size:
return size
return -1
featCounts = (OHETrainData
.flatMap(lambda lp: lp.features.indices)
.map(lambda x: (x, 1))
.reduceByKey(lambda x, y: x + y))
featCountsBuckets = (featCounts
.map(lambda x: (bucketFeatByCount(x[1]), 1))
.filter(lambda (k, v): k != -1)
.reduceByKey(lambda x, y: x + y)
.collect())
print featCountsBuckets
In [38]:
import matplotlib.pyplot as plt
x, y = zip(*featCountsBuckets)
x, y = np.log(x), np.log(y)
def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
gridWidth=1.0):
"""Template for generating the plot layout."""
plt.close()
fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
ax.axes.tick_params(labelcolor='#999999', labelsize='10')
for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
axis.set_ticks_position('none')
axis.set_ticks(ticks)
axis.label.set_color('#999999')
if hideLabels: axis.set_ticklabels([])
plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
return fig, ax
# generate layout and plot data
fig, ax = preparePlot(np.arange(0, 10, 1), np.arange(4, 14, 2))
ax.set_xlabel(r'$\log_e(bucketSize)$'), ax.set_ylabel(r'$\log_e(countInBucket)$')
plt.scatter(x, y, s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
pass
oneHotEncoding()
function from Part (1d) to ignore previously unseen categories, and then compute OHE features for the validation data.
In [39]:
def oneHotEncoding(rawFeats, OHEDict, numOHEFeats):
"""Produce a one-hot-encoding from a list of features and an OHE dictionary.
Note:
If a (featureID, value) tuple doesn't have a corresponding key in OHEDict it should be
ignored.
Args:
rawFeats (list of (int, str)): The features corresponding to a single observation. Each
feature consists of a tuple of featureID and the feature's value. (e.g. sampleOne)
OHEDict (dict): A mapping of (featureID, value) to unique integer.
numOHEFeats (int): The total number of unique OHE features (combinations of featureID and
value).
Returns:
SparseVector: A SparseVector of length numOHEFeats with indicies equal to the unique
identifiers for the (featureID, value) combinations that occur in the observation and
with values equal to 1.0.
"""
features = []
for raw in rawFeats:
try:
features.append((OHEDict[raw], 1))
except KeyError:
pass
return SparseVector(numOHEFeats, features)
OHEValidationData = rawValidationData.map(lambda point: parseOHEPoint(point, ctrOHEDict, numCtrOHEFeats))
OHEValidationData.cache()
print OHEValidationData.take(1)
In [40]:
# TEST Handling unseen features (3e)
numNZVal = (OHEValidationData
.map(lambda lp: len(lp.features.indices))
.sum())
Test.assertEquals(numNZVal, 372080, 'incorrect number of features')
OHETrainData
with the given hyperparameter configuration. LogisticRegressionWithSGD
returns a LogisticRegressionModel. Next, use the LogisticRegressionModel.weights
and LogisticRegressionModel.intercept
attributes to print out the model's parameters. Note that these are the names of the object's attributes and should be called using a syntax like model.weights
for a given model
.
In [41]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
numIters = 50
stepSize = 10.
regParam = 1e-6
regType = 'l2'
includeIntercept = True
In [42]:
model0 = LogisticRegressionWithSGD.train(OHETrainData, iterations=numIters, step=stepSize,
regParam=regParam, regType=regType,
intercept=includeIntercept)
sortedWeights = sorted(model0.weights)
print sortedWeights[:5], model0.intercept
In [43]:
# TEST Logistic regression (4a)
Test.assertTrue(np.allclose(model0.intercept, 0.56455084025), 'incorrect value for model0.intercept')
Test.assertTrue(np.allclose(sortedWeights[0:5],
[-0.45899236853575609, -0.37973707648623956, -0.36996558266753304,
-0.36934962879928263, -0.32697945415010637]), 'incorrect value for model0.weights')
In [44]:
from math import log
def computeLogLoss(p, y):
"""Calculates the value of log loss for a given probabilty and label.
Note:
log(0) is undefined, so when p is 0 we need to add a small value (epsilon) to it
and when p is 1 we need to subtract a small value (epsilon) from it.
Args:
p (float): A probabilty between 0 and 1.
y (int): A label. Takes on the values 0 and 1.
Returns:
float: The log loss value.
"""
epsilon = 10e-12
if p == 0.0:
p += epsilon
if p == 1.0:
p -= epsilon
if y == 0.0:
return -log(1.0 - p)
return -log(p)
print computeLogLoss(.5, 1)
print computeLogLoss(.5, 0)
print computeLogLoss(.99, 1)
print computeLogLoss(.99, 0)
print computeLogLoss(.01, 1)
print computeLogLoss(.01, 0)
print computeLogLoss(0, 1)
print computeLogLoss(1, 1)
print computeLogLoss(1, 0)
In [45]:
# TEST Log loss (4b)
Test.assertTrue(np.allclose([computeLogLoss(.5, 1), computeLogLoss(.01, 0), computeLogLoss(.01, 1)],
[0.69314718056, 0.0100503358535, 4.60517018599]),
'computeLogLoss is not correct')
Test.assertTrue(np.allclose([computeLogLoss(0, 1), computeLogLoss(1, 1), computeLogLoss(1, 0)],
[25.3284360229, 1.00000008275e-11, 25.3284360229]),
'computeLogLoss needs to bound p away from 0 and 1 by epsilon')
In [46]:
# Note that our dataset has a very high click-through rate by design
# In practice click-through rate can be one to two orders of magnitude lower
classOneFracTrain = OHETrainData.map(lambda x: x.label).sum() / nTrain
print classOneFracTrain
logLossTrBase = OHETrainData.map(lambda x: computeLogLoss(classOneFracTrain, x.label)).sum() / nTrain
print 'Baseline Train Logloss = {0:.3f}\n'.format(logLossTrBase)
In [47]:
# TEST Baseline log loss (4c)
Test.assertTrue(np.allclose(classOneFracTrain, 0.22717773523), 'incorrect value for classOneFracTrain')
Test.assertTrue(np.allclose(logLossTrBase, 0.535844), 'incorrect value for logLossTrBase')
In [48]:
from math import exp
def getP(x, w, intercept):
"""Calculate the probability for an observation given a set of weights and intercept.
Note:
We'll bound our raw prediction between 20 and -20 for numerical purposes.
Args:
x (SparseVector): A vector with values of 1.0 for features that exist in this
observation and 0.0 otherwise.
w (DenseVector): A vector of weights (betas) for the model.
intercept (float): The model's intercept.
Returns:
float: A probability between 0 and 1.
"""
rawPrediction = x.dot(w) + intercept
rawPrediction = min(rawPrediction, 20)
rawPrediction = max(rawPrediction, -20)
return (1 + exp(-rawPrediction)) ** -1
trainingPredictions = OHETrainData.map(lambda x: getP(x.features, model0.weights, model0.intercept))
print trainingPredictions.take(5)
In [49]:
# TEST Predicted probability (4d)
Test.assertTrue(np.allclose(trainingPredictions.sum(), 18135.4834348),
'incorrect value for trainingPredictions')
In [50]:
def evaluateResults(model, data):
"""Calculates the log loss for the data given the model.
Args:
model (LogisticRegressionModel): A trained logistic regression model.
data (RDD of LabeledPoint): Labels and features for each observation.
Returns:
float: Log loss for the data.
"""
return (data
.map(lambda p: computeLogLoss(getP(p.features, model.weights, model.intercept), p.label))
.sum()) / data.count()
logLossTrLR0 = evaluateResults(model0, OHETrainData)
print 'OHE Features Train Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'.format(logLossTrBase, logLossTrLR0)
In [51]:
# TEST Evaluate the model (4e)
Test.assertTrue(np.allclose(logLossTrLR0, 0.456903), 'incorrect value for logLossTrLR0')
In [52]:
logLossValBase = OHEValidationData.map(lambda x: computeLogLoss(classOneFracTrain, x.label)).sum() / nVal
logLossValLR0 = evaluateResults(model0, OHEValidationData)
print 'OHE Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'.format(logLossValBase, logLossValLR0)
In [53]:
# TEST Validation log loss (4f)
Test.assertTrue(np.allclose(logLossValBase, 0.527603), 'incorrect value for logLossValBase')
Test.assertTrue(np.allclose(logLossValLR0, 0.456957), 'incorrect value for logLossValLR0')
In [54]:
labelsAndScores = OHEValidationData.map(lambda lp:
(lp.label, getP(lp.features, model0.weights, model0.intercept)))
labelsAndWeights = labelsAndScores.collect()
labelsAndWeights.sort(key=lambda (k, v): v, reverse=True)
labelsByWeight = np.array([k for (k, v) in labelsAndWeights])
length = labelsByWeight.size
truePositives = labelsByWeight.cumsum()
numPositive = truePositives[-1]
falsePositives = np.arange(1.0, length + 1, 1.) - truePositives
truePositiveRate = truePositives / numPositive
falsePositiveRate = falsePositives / (length - numPositive)
# Generate layout and plot data
fig, ax = preparePlot(np.arange(0., 1.1, 0.1), np.arange(0., 1.1, 0.1))
ax.set_xlim(-.05, 1.05), ax.set_ylim(-.05, 1.05)
ax.set_ylabel('True Positive Rate (Sensitivity)')
ax.set_xlabel('False Positive Rate (1 - Specificity)')
plt.plot(falsePositiveRate, truePositiveRate, color='#8cbfd0', linestyle='-', linewidth=3.)
plt.plot((0., 1.), (0., 1.), linestyle='--', color='#d6ebf2', linewidth=2.) # Baseline model
pass
numBuckets
and observe the resulting hashed feature dictionaries.
In [55]:
from collections import defaultdict
import hashlib
def hashFunction(numBuckets, rawFeats, printMapping=False):
"""Calculate a feature dictionary for an observation's features based on hashing.
Note:
Use printMapping=True for debug purposes and to better understand how the hashing works.
Args:
numBuckets (int): Number of buckets to use as features.
rawFeats (list of (int, str)): A list of features for an observation. Represented as
(featureID, value) tuples.
printMapping (bool, optional): If true, the mappings of featureString to index will be
printed.
Returns:
dict of int to float: The keys will be integers which represent the buckets that the
features have been hashed to. The value for a given key will contain the count of the
(featureID, value) tuples that have hashed to that key.
"""
mapping = {}
for ind, category in rawFeats:
featureString = category + str(ind)
mapping[featureString] = int(int(hashlib.md5(featureString).hexdigest(), 16) % numBuckets)
if(printMapping): print mapping
sparseFeatures = defaultdict(float)
for bucket in mapping.values():
sparseFeatures[bucket] += 1.0
return dict(sparseFeatures)
In [56]:
sampOneFourBuckets = hashFunction(4, sampleOne, True)
sampTwoFourBuckets = hashFunction(4, sampleTwo, True)
sampThreeFourBuckets = hashFunction(4, sampleThree, True)
sampOneHundredBuckets = hashFunction(100, sampleOne, True)
sampTwoHundredBuckets = hashFunction(100, sampleTwo, True)
sampThreeHundredBuckets = hashFunction(100, sampleThree, True)
print '\t\t 4 Buckets \t\t\t 100 Buckets'
print 'SampleOne:\t {0}\t\t {1}'.format(sampOneFourBuckets, sampOneHundredBuckets)
print 'SampleTwo:\t {0}\t\t {1}'.format(sampTwoFourBuckets, sampTwoHundredBuckets)
print 'SampleThree:\t {0}\t {1}'.format(sampThreeFourBuckets, sampThreeHundredBuckets)
In [57]:
# TEST Hash function (5a)
Test.assertEquals(sampOneFourBuckets, {2: 1.0, 3: 1.0}, 'incorrect value for sampOneFourBuckets')
Test.assertEquals(sampThreeHundredBuckets, {72: 1.0, 5: 1.0, 14: 1.0},
'incorrect value for sampThreeHundredBuckets')
LabeledPoint
with hashed features stored as a SparseVector
. Then use this function to create new training, validation and test datasets with hashed features. Hint: parsedHashPoint
is similar to parseOHEPoint
from Part (3d).
In [58]:
def parseHashPoint(point, numBuckets):
"""Create a LabeledPoint for this observation using hashing.
Args:
point (str): A comma separated string where the first value is the label and the rest are
features.
numBuckets: The number of buckets to hash to.
Returns:
LabeledPoint: A LabeledPoint with a label (0.0 or 1.0) and a SparseVector of hashed
features.
"""
splited = point.split(",")
features = splited[1:]
splited = point.split(",")
features = splited[1:]
result = []
for i, f in enumerate(features):
result.append((i, f))
return LabeledPoint(splited[0], SparseVector(numBuckets, hashFunction(numBuckets, result)))
numBucketsCTR = 2 ** 15
hashTrainData = rawTrainData.map(lambda l: parseHashPoint(l, numBucketsCTR))
hashTrainData.cache()
hashValidationData = rawValidationData.map(lambda l: parseHashPoint(l, numBucketsCTR))
hashValidationData.cache()
hashTestData = rawTestData.map(lambda l: parseHashPoint(l, numBucketsCTR))
hashTestData.cache()
print hashTrainData.take(1)
In [59]:
# TEST Creating hashed features (5b)
hashTrainDataFeatureSum = sum(hashTrainData
.map(lambda lp: len(lp.features.indices))
.take(20))
hashTrainDataLabelSum = sum(hashTrainData
.map(lambda lp: lp.label)
.take(100))
hashValidationDataFeatureSum = sum(hashValidationData
.map(lambda lp: len(lp.features.indices))
.take(20))
hashValidationDataLabelSum = sum(hashValidationData
.map(lambda lp: lp.label)
.take(100))
hashTestDataFeatureSum = sum(hashTestData
.map(lambda lp: len(lp.features.indices))
.take(20))
hashTestDataLabelSum = sum(hashTestData
.map(lambda lp: lp.label)
.take(100))
Test.assertEquals(hashTrainDataFeatureSum, 772, 'incorrect number of features in hashTrainData')
Test.assertEquals(hashTrainDataLabelSum, 24.0, 'incorrect labels in hashTrainData')
Test.assertEquals(hashValidationDataFeatureSum, 776,
'incorrect number of features in hashValidationData')
Test.assertEquals(hashValidationDataLabelSum, 16.0, 'incorrect labels in hashValidationData')
Test.assertEquals(hashTestDataFeatureSum, 774, 'incorrect number of features in hashTestData')
Test.assertEquals(hashTestDataLabelSum, 23.0, 'incorrect labels in hashTestData')
SparseVector
named sparse
, calling len(sparse)
returns the total number of features, not the number features with entries. SparseVector
objects have the attributes indices
and values
that contain information about which features are nonzero. Continuing with our example, these can be accessed using sparse.indices
and sparse.values
, respectively.
In [68]:
def computeSparsity(data, d, n):
"""Calculates the average sparsity for the features in an RDD of LabeledPoints.
Args:
data (RDD of LabeledPoint): The LabeledPoints to use in the sparsity calculation.
d (int): The total number of features.
n (int): The number of observations in the RDD.
Returns:
float: The average of the ratio of features in a point to total features.
"""
summed = data.map(lambda p: len(p.features.values)).sum()
return summed / float(d * n)
averageSparsityHash = computeSparsity(hashTrainData, numBucketsCTR, nTrain)
averageSparsityOHE = computeSparsity(OHETrainData, numCtrOHEFeats, nTrain)
print 'Average OHE Sparsity: {0:.7e}'.format(averageSparsityOHE)
print 'Average Hash Sparsity: {0:.7e}'.format(averageSparsityHash)
In [69]:
# TEST Sparsity (5c)
Test.assertTrue(np.allclose(averageSparsityOHE, 1.6717677e-04),
'incorrect value for averageSparsityOHE')
Test.assertTrue(np.allclose(averageSparsityHash, 1.1805561e-03),
'incorrect value for averageSparsityHash')
1
and 10
for stepSizes
and 1e-6
and 1e-3
for regParams
.
In [62]:
numIters = 500
regType = 'l2'
includeIntercept = True
bestModel = None
bestLogLoss = 1e10
In [63]:
stepSizes = [ 1, 10 ]
regParams = [ 1e-6, 1e-3 ]
for stepSize in stepSizes:
for regParam in regParams:
model = (LogisticRegressionWithSGD
.train(hashTrainData, numIters, stepSize, regParam=regParam, regType=regType,
intercept=includeIntercept))
logLossVa = evaluateResults(model, hashValidationData)
print '\tstepSize = {0:.1f}, regParam = {1:.0e}: logloss = {2:.3f}'.format(stepSize, regParam, logLossVa)
if (logLossVa < bestLogLoss):
bestModel = model
bestLogLoss = logLossVa
print 'Hashed Features Validation Logloss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'.format(logLossValBase, bestLogLoss)
In [64]:
# TEST Logistic model with hashed features (5d)
Test.assertTrue(np.allclose(bestLogLoss, 0.4481683608), 'incorrect value for bestLogLoss')
logLoss
.
In [65]:
from matplotlib.colors import LinearSegmentedColormap
# Saved parameters and results. Eliminate the time required to run 36 models
stepSizes = [3, 6, 9, 12, 15, 18]
regParams = [1e-7, 1e-6, 1e-5, 1e-4, 1e-3, 1e-2]
logLoss = np.array([[ 0.45808431, 0.45808493, 0.45809113, 0.45815333, 0.45879221, 0.46556321],
[ 0.45188196, 0.45188306, 0.4518941, 0.4520051, 0.45316284, 0.46396068],
[ 0.44886478, 0.44886613, 0.44887974, 0.44902096, 0.4505614, 0.46371153],
[ 0.44706645, 0.4470698, 0.44708102, 0.44724251, 0.44905525, 0.46366507],
[ 0.44588848, 0.44589365, 0.44590568, 0.44606631, 0.44807106, 0.46365589],
[ 0.44508948, 0.44509474, 0.44510274, 0.44525007, 0.44738317, 0.46365405]])
numRows, numCols = len(stepSizes), len(regParams)
logLoss = np.array(logLoss)
logLoss.shape = (numRows, numCols)
fig, ax = preparePlot(np.arange(0, numCols, 1), np.arange(0, numRows, 1), figsize=(8, 7),
hideLabels=True, gridWidth=0.)
ax.set_xticklabels(regParams), ax.set_yticklabels(stepSizes)
ax.set_xlabel('Regularization Parameter'), ax.set_ylabel('Step Size')
colors = LinearSegmentedColormap.from_list('blue', ['#0022ff', '#000055'], gamma=.2)
image = plt.imshow(logLoss,interpolation='nearest', aspect='auto',
cmap = colors)
pass
In [70]:
logLossTest = evaluateResults(bestModel, hashTestData)
logLossTestBaseline = hashTestData.map(lambda x: computeLogLoss(classOneFracTrain, x.label)).sum() / nTest
print 'Hashed Features Test Log Loss:\n\tBaseline = {0:.3f}\n\tLogReg = {1:.3f}'.format(logLossTestBaseline, logLossTest)
In [71]:
# TEST Evaluate on the test set (5e)
Test.assertTrue(np.allclose(logLossTestBaseline, 0.537438),
'incorrect value for logLossTestBaseline')
Test.assertTrue(np.allclose(logLossTest, 0.455616931), 'incorrect value for logLossTest')