In [2]:
# carregar base de dados
import os.path
fileName = os.path.join('Data', 'millionsong.txt')
numPartitions = 2
rawData = sc.textFile(fileName, numPartitions)
In [4]:
# EXERCICIO
numPoints = rawData.count()
print (numPoints)
samplePoints = rawData.take(5)
print (samplePoints)
In [5]:
# TEST Load and check the data (1a)
assert numPoints==6724, 'incorrect value for numPoints'
print("OK")
assert len(samplePoints)==5, 'incorrect length for samplePoints'
print("OK")
LabeledPoint
parsePoint
que recebe como entrada uma amostra de dados, transforma os dados usandoo comando unicode.split, e retorna um LabeledPoint
.samplePoints
da célula anterior e imprima os atributos e rótulo utilizando os atributos LabeledPoint.features
e LabeledPoint.label
. Finalmente, calcule o número de atributos nessa base de dados.
In [6]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np
# Here is a sample raw data point:
# '2001.0,0.884,0.610,0.600,0.474,0.247,0.357,0.344,0.33,0.600,0.425,0.60,0.419'
# In this raw data point, 2001.0 is the label, and the remaining values are features
In [8]:
# EXERCICIO
def parsePoint(line):
"""Converts a comma separated unicode string into a `LabeledPoint`.
Args:
line (unicode): Comma separated unicode string where the first element is the label and the
remaining elements are features.
Returns:
LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and
features.
"""
Point = list(map(float,line.split(',')))
return LabeledPoint(Point[0],Point[1:])
parsedSamplePoints = list(map(parsePoint,samplePoints))
firstPointFeatures = parsedSamplePoints[0].features
firstPointLabel = parsedSamplePoints[0].label
print (firstPointFeatures, firstPointLabel)
d = len(firstPointFeatures)
print (d)
In [9]:
# TEST Using LabeledPoint (1b)
assert isinstance(firstPointLabel, float), 'label must be a float'
expectedX0 = [0.8841,0.6105,0.6005,0.4747,0.2472,0.3573,0.3441,0.3396,0.6009,0.4257,0.6049,0.4192]
assert np.allclose(expectedX0, firstPointFeatures, 1e-4, 1e-4), 'incorrect features for firstPointFeatures'
assert np.allclose(2001.0, firstPointLabel), 'incorrect label for firstPointLabel'
assert d == 12, 'incorrect number of features'
print("OK")
In [12]:
import matplotlib.pyplot as plt
import matplotlib.cm as cm
%matplotlib inline
sampleMorePoints = rawData.take(50)
parsedSampleMorePoints = map(parsePoint, sampleMorePoints)
dataValues = list(map(lambda lp: lp.features.toArray(), parsedSampleMorePoints))
def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
gridWidth=1.0):
"""Template for generating the plot layout."""
plt.close()
fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
ax.axes.tick_params(labelcolor='#999999', labelsize='10')
for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
axis.set_ticks_position('none')
axis.set_ticks(ticks)
axis.label.set_color('#999999')
if hideLabels: axis.set_ticklabels([])
plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
return fig, ax
# generate layout and plot
fig, ax = preparePlot(np.arange(.5, 11, 1), np.arange(.5, 49, 1), figsize=(8,7), hideLabels=True,
gridColor='#eeeeee', gridWidth=1.1)
image = plt.imshow(dataValues,interpolation='nearest', aspect='auto', cmap=cm.Greys)
for x, y, s in zip(np.arange(-.125, 12, 1), np.repeat(-.75, 12), [str(x) for x in range(12)]):
plt.text(x, y, s, color='#999999', size='10')
plt.text(4.7, -3, 'Feature', color='#999999', size='11'), ax.set_ylabel('Observation')
pass
In [13]:
# EXERCICIO
parsedDataInit = rawData.map(parsePoint)
onlyLabels = parsedDataInit.map(lambda x: x.label)
minYear = onlyLabels.min()
maxYear = onlyLabels.max()
print (maxYear, minYear)
In [15]:
# TEST Find the range (1c)
assert len(parsedDataInit.take(1)[0].features)==12, 'unexpected number of features in sample point'
sumFeatTwo = parsedDataInit.map(lambda lp: lp.features[2]).sum()
assert np.allclose(sumFeatTwo, 3158.96224351), 'parsedDataInit has unexpected values'
yearRange = maxYear - minYear
assert yearRange == 89, 'incorrect range for minYear to maxYear'
print("OK")
In [17]:
# EXERCICIO: subtraia os labels do valor mínimo
parsedData = parsedDataInit.map(lambda p: LabeledPoint(p.label-minYear, p.features))
# Should be a LabeledPoint
print (type(parsedData.take(1)[0]))
# View the first point
print ('\n{0}'.format(parsedData.take(1)))
In [20]:
# TEST Shift labels (1d)
oldSampleFeatures = parsedDataInit.take(1)[0].features
newSampleFeatures = parsedData.take(1)[0].features
assert np.allclose(oldSampleFeatures, newSampleFeatures), 'new features do not match old features'
sumFeatTwo = parsedData.map(lambda lp: lp.features[2]).sum()
assert np.allclose(sumFeatTwo, 3158.96224351), 'parsedData has unexpected values'
minYearNew = parsedData.map(lambda lp: lp.label).min()
maxYearNew = parsedData.map(lambda lp: lp.label).max()
assert minYearNew == 0, 'incorrect min year in shifted data'
assert maxYearNew == 89, 'incorrect max year in shifted data'
print("OK")
cache()
faça o pré-armazenamento da base processada.
In [22]:
# EXERCICIO
weights = [.8, .1, .1]
seed = 42
parsedTrainData, parsedValData, parsedTestData = parsedData.randomSplit(weights, seed)
parsedTrainData.cache()
parsedValData.cache()
parsedTestData.cache()
nTrain = parsedTrainData.count()
nVal = parsedValData.count()
nTest = parsedTestData.count()
print (nTrain, nVal, nTest, nTrain + nVal + nTest)
print (parsedData.count())
In [24]:
# TEST Training, validation, and test sets (1e)
assert parsedTrainData.getNumPartitions() == numPartitions, 'parsedTrainData has wrong number of partitions'
assert parsedValData.getNumPartitions() == numPartitions, 'parsedValData has wrong number of partitions'
assert parsedTestData.getNumPartitions() == numPartitions,'parsedTestData has wrong number of partitions'
assert len(parsedTrainData.take(1)[0].features) == 12, 'parsedTrainData has wrong number of features'
sumFeatTwo = (parsedTrainData
.map(lambda lp: lp.features[2])
.sum())
sumFeatThree = (parsedValData
.map(lambda lp: lp.features[3])
.reduce(lambda x, y: x + y))
sumFeatFour = (parsedTestData
.map(lambda lp: lp.features[4])
.reduce(lambda x, y: x + y))
assert np.allclose([sumFeatTwo, sumFeatThree, sumFeatFour],2526.87757656, 297.340394298, 184.235876654), 'parsed Train, Val, Test data has unexpected values'
assert nTrain + nVal + nTest == 6724, 'unexpected Train, Val, Test data set size'
assert nTrain == 5359, 'unexpected value for nTrain'
assert nVal == 678, 'unexpected value for nVal'
assert nTest == 687, 'unexpected value for nTest'
print("OK")
In [25]:
# EXERCICIO
averageTrainYear = (parsedTrainData
.map(lambda p: p.label)
.mean()
)
print (averageTrainYear)
In [27]:
# TEST Average label (2a)
assert np.allclose(averageTrainYear, 53.6792311), 'incorrect value for averageTrainYear'
print("OK")
In [28]:
# EXERCICIO
def squaredError(label, prediction):
"""Calculates the the squared error for a single prediction.
Args:
label (float): The correct value for this observation.
prediction (float): The predicted value for this observation.
Returns:
float: The difference between the `label` and `prediction` squared.
"""
return (label-prediction)**2
def calcRMSE(labelsAndPreds):
"""Calculates the root mean squared error for an `RDD` of (label, prediction) tuples.
Args:
labelsAndPred (RDD of (float, float)): An `RDD` consisting of (label, prediction) tuples.
Returns:
float: The square root of the mean of the squared errors.
"""
return np.sqrt(labelsAndPreds.map(lambda lp: squaredError(lp[0],lp[1])).mean())
labelsAndPreds = sc.parallelize([(3., 1.), (1., 2.), (2., 2.)])
# RMSE = sqrt[((3-1)^2 + (1-2)^2 + (2-2)^2) / 3] = 1.291
exampleRMSE = calcRMSE(labelsAndPreds)
print (exampleRMSE)
In [29]:
# TEST Root mean squared error (2b)
assert np.allclose(squaredError(3, 1), 4.), 'incorrect definition of squaredError'
assert np.allclose(exampleRMSE, 1.29099444874), 'incorrect value for exampleRMSE'
print("OK")
In [30]:
# EXERCICIO
labelsAndPredsTrain = parsedTrainData.map(lambda p: (p.label, averageTrainYear))
rmseTrainBase = calcRMSE(labelsAndPredsTrain)
labelsAndPredsVal = parsedValData.map(lambda p: (p.label, averageTrainYear))
rmseValBase = calcRMSE(labelsAndPredsVal)
labelsAndPredsTest = parsedTestData.map(lambda p: (p.label, averageTrainYear))
rmseTestBase = calcRMSE(labelsAndPredsTest)
print ('Baseline Train RMSE = {0:.3f}'.format(rmseTrainBase))
print ('Baseline Validation RMSE = {0:.3f}'.format(rmseValBase))
print ('Baseline Test RMSE = {0:.3f}'.format(rmseTestBase))
In [33]:
# TEST Training, validation and test RMSE (2c)
assert np.allclose([rmseTrainBase, rmseValBase, rmseTestBase],[21.506125957738682, 20.877445428452468, 21.260493955081916]), 'incorrect RMSE value'
print("OK")
In [35]:
from matplotlib.colors import ListedColormap, Normalize
from matplotlib.cm import get_cmap
cmap = get_cmap('YlOrRd')
norm = Normalize()
actual = np.asarray(parsedValData
.map(lambda lp: lp.label)
.collect())
error = np.asarray(parsedValData
.map(lambda lp: (lp.label, lp.label))
.map(lambda lp: squaredError(lp[0], lp[1]))
.collect())
clrs = cmap(np.asarray(norm(error)))[:,0:3]
fig, ax = preparePlot(np.arange(0, 100, 20), np.arange(0, 100, 20))
plt.scatter(actual, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=0.5)
ax.set_xlabel('Predicted'), ax.set_ylabel('Actual')
pass
In [36]:
predictions = np.asarray(parsedValData
.map(lambda lp: averageTrainYear)
.collect())
error = np.asarray(parsedValData
.map(lambda lp: (lp.label, averageTrainYear))
.map(lambda lp: squaredError(lp[0], lp[1]))
.collect())
norm = Normalize()
clrs = cmap(np.asarray(norm(error)))[:,0:3]
fig, ax = preparePlot(np.arange(53.0, 55.0, 0.5), np.arange(0, 100, 20))
ax.set_xlim(53, 55)
plt.scatter(predictions, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=0.3)
ax.set_xlabel('Predicted'), ax.set_ylabel('Actual')
Out[36]:
DenseVector
dot para representar a lista de atributos (ele tem funcionalidade parecida com o np.array()
).
In [37]:
from pyspark.mllib.linalg import DenseVector
In [38]:
# EXERCICIO
def gradientSummand(weights, lp):
"""Calculates the gradient summand for a given weight and `LabeledPoint`.
Note:
`DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
within this function. For example, they both implement the `dot` method.
Args:
weights (DenseVector): An array of model weights (betas).
lp (LabeledPoint): The `LabeledPoint` for a single observation.
Returns:
DenseVector: An array of values the same length as `weights`. The gradient summand.
"""
return DenseVector((weights.dot(lp.features) - lp.label)*lp.features)
exampleW = DenseVector([1, 1, 1])
exampleLP = LabeledPoint(2.0, [3, 1, 4])
summandOne = gradientSummand(exampleW, exampleLP)
print (summandOne)
exampleW = DenseVector([.24, 1.2, -1.4])
exampleLP = LabeledPoint(3.0, [-1.4, 4.2, 2.1])
summandTwo = gradientSummand(exampleW, exampleLP)
print (summandTwo)
In [39]:
# TEST Gradient summand (3a)
assert np.allclose(summandOne, [18., 6., 24.]), 'incorrect value for summandOne'
assert np.allclose(summandTwo, [1.7304,-5.1912,-2.5956]), 'incorrect value for summandTwo'
print("OK")
In [41]:
# EXERCICIO
def getLabeledPrediction(weights, observation):
"""Calculates predictions and returns a (label, prediction) tuple.
Note:
The labels should remain unchanged as we'll use this information to calculate prediction
error later.
Args:
weights (np.ndarray): An array with one weight for each features in `trainData`.
observation (LabeledPoint): A `LabeledPoint` that contain the correct label and the
features for the data point.
Returns:
tuple: A (label, prediction) tuple.
"""
return (observation.label, weights.dot(observation.features))
weights = np.array([1.0, 1.5])
predictionExample = sc.parallelize([LabeledPoint(2, np.array([1.0, .5])),
LabeledPoint(1.5, np.array([.5, .5]))])
labelsAndPredsExample = predictionExample.map(lambda lp: getLabeledPrediction(weights, lp))
print (labelsAndPredsExample.collect())
In [43]:
# TEST Use weights to make predictions (3b)
assert labelsAndPredsExample.collect() == [(2.0, 1.75), (1.5, 1.25)], 'incorrect definition for getLabeledPredictions'
print("OK")
In [50]:
# EXERCICIO
def linregGradientDescent(trainData, numIters):
"""Calculates the weights and error for a linear regression model trained with gradient descent.
Note:
`DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
within this function. For example, they both implement the `dot` method.
Args:
trainData (RDD of LabeledPoint): The labeled data for use in training the model.
numIters (int): The number of iterations of gradient descent to perform.
Returns:
(np.ndarray, np.ndarray): A tuple of (weights, training errors). Weights will be the
final weights (one weight per feature) for the model, and training errors will contain
an error (RMSE) for each iteration of the algorithm.
"""
# The length of the training data
n = trainData.count()
# The number of features in the training data
d = len(trainData.first().features)
w = np.zeros(d)
alpha = 1.0
# We will compute and store the training error after each iteration
errorTrain = np.zeros(numIters)
for i in range(numIters):
# Use getLabeledPrediction from (3b) with trainData to obtain an RDD of (label, prediction)
# tuples. Note that the weights all equal 0 for the first iteration, so the predictions will
# have large errors to start.
labelsAndPredsTrain = trainData.map(lambda l: getLabeledPrediction(w,l))
errorTrain[i] = calcRMSE(labelsAndPredsTrain)
# Calculate the `gradient`. Make use of the `gradientSummand` function you wrote in (3a).
# Note that `gradient` sould be a `DenseVector` of length `d`.
gradient = trainData.map(lambda l: gradientSummand(w, l)).sum()
# Update the weights
alpha_i = alpha / (n * np.sqrt(i+1))
w -= alpha_i*gradient
return w, errorTrain
# create a toy dataset with n = 10, d = 3, and then run 5 iterations of gradient descent
# note: the resulting model will not be useful; the goal here is to verify that
# linregGradientDescent is working properly
exampleN = 10
exampleD = 3
exampleData = (sc
.parallelize(parsedTrainData.take(exampleN))
.map(lambda lp: LabeledPoint(lp.label, lp.features[0:exampleD])))
print (exampleData.take(2))
exampleNumIters = 5
exampleWeights, exampleErrorTrain = linregGradientDescent(exampleData, exampleNumIters)
print (exampleWeights)
In [51]:
# TEST Gradient descent (3c)
expectedOutput = [48.20389904, 34.53243006, 30.60284959]
assert np.allclose(exampleWeights, expectedOutput), 'value of exampleWeights is incorrect'
expectedError = [79.72766145, 33.64762907, 9.46281696, 9.45486926, 9.44889147]
assert np.allclose(exampleErrorTrain, expectedError),'value of exampleErrorTrain is incorrect'
print("OK")
In [63]:
# EXERCICIO
numIters = 50
weightsLR0, errorTrainLR0 = linregGradientDescent(parsedTrainData, numIters)
labelsAndPreds = parsedValData.map(lambda lp: getLabeledPrediction(weightsLR0, lp))
rmseValLR0 = calcRMSE(labelsAndPreds)
print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}'.format(rmseValBase, rmseValLR0))
In [65]:
# TEST Train the model (3d)
expectedOutput = [ 22.64370481, 20.1815662, -0.21620107, 8.53259099, 5.94821844,
-4.50349235, 15.51511703, 3.88802901, 9.79146177, 5.74357056,
11.19512589, 3.60554264]
assert np.allclose(weightsLR0, expectedOutput), 'incorrect value for weightsLR0'
print("OK")
In [66]:
norm = Normalize()
clrs = cmap(np.asarray(norm(np.log(errorTrainLR0))))[:,0:3]
fig, ax = preparePlot(np.arange(0, 60, 10), np.arange(2, 6, 1))
ax.set_ylim(2, 6)
plt.scatter(list(range(0, numIters)), np.log(errorTrainLR0), s=14**2, c=clrs, edgecolors='#888888', alpha=0.75)
ax.set_xlabel('Iteration'), ax.set_ylabel(r'$\log_e(errorTrainLR0)$')
pass
In [67]:
norm = Normalize()
clrs = cmap(np.asarray(norm(errorTrainLR0[6:])))[:,0:3]
fig, ax = preparePlot(np.arange(0, 60, 10), np.arange(17, 22, 1))
ax.set_ylim(17.8, 21.2)
plt.scatter(range(0, numIters-6), errorTrainLR0[6:], s=14**2, c=clrs, edgecolors='#888888', alpha=0.75)
ax.set_xticklabels(map(str, range(6, 66, 10)))
ax.set_xlabel('Iteration'), ax.set_ylabel(r'Training Error')
pass
LinearRegressionWithSGD
In [70]:
from pyspark.mllib.regression import LinearRegressionWithSGD
# Values to use when training the linear regression model
numIters = 500 # iterations
alpha = 1.0 # step
miniBatchFrac = 1.0 # miniBatchFraction
reg = 1e-1 # regParam
regType = 'l2' # regType
useIntercept = True # intercept
In [72]:
# EXERCICIO
firstModel = LinearRegressionWithSGD.train(parsedTrainData, iterations = numIters, step = alpha, miniBatchFraction = 1.0,
regParam=reg,regType=regType, intercept=useIntercept)
# weightsLR1 stores the model weights; interceptLR1 stores the model intercept
weightsLR1 = firstModel.weights
interceptLR1 = firstModel.intercept
print (weightsLR1, interceptLR1)
In [74]:
# TEST LinearRegressionWithSGD (4a)
expectedIntercept = 13.332056210482524
expectedWeights = [15.9694010246,13.9897244172,0.669349383773,6.24618402989,4.00932179503,-2.30176663131,10.478805422,3.06385145385,7.14414111075,4.49826819526,7.87702565069,3.00732146613]
assert np.allclose(interceptLR1, expectedIntercept), 'incorrect value for interceptLR1'
assert np.allclose(weightsLR1, expectedWeights), 'incorrect value for weightsLR1'
print("OK")
features
de um LabeledPoint
comp parâmetro.
In [78]:
# EXERCICIO
samplePoint = parsedTrainData.take(1)[0]
samplePrediction = firstModel.predict(samplePoint.features)
print (samplePrediction)
In [ ]:
# TEST Predict (4b)
assert np.allclose(samplePrediction, 56.4065674104), 'incorrect value for samplePrediction'
In [81]:
# EXERCICIO
labelsAndPreds = parsedValData.map(lambda lp: (lp.label, firstModel.predict(lp.features)))
rmseValLR1 = calcRMSE(labelsAndPreds)
print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}'.format(rmseValBase, rmseValLR0, rmseValLR1))
In [82]:
# TEST Evaluate RMSE (4c)
assert np.allclose(rmseValLR1, 19.025), 'incorrect value for rmseValLR1'
In [84]:
# EXERCICIO
bestRMSE = rmseValLR1
bestRegParam = reg
bestModel = firstModel
numIters = 500
alpha = 1.0
miniBatchFrac = 1.0
for reg in [1e-10, 1e-5, 1.]:
model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha,
miniBatchFrac, regParam=reg,
regType='l2', intercept=True)
labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))
rmseValGrid = calcRMSE(labelsAndPreds)
print (rmseValGrid)
if rmseValGrid < bestRMSE:
bestRMSE = rmseValGrid
bestRegParam = reg
bestModel = model
rmseValLRGrid = bestRMSE
print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n\tLRGrid = {3:.3f}'.format(rmseValBase, rmseValLR0, rmseValLR1, rmseValLRGrid))
In [88]:
# TEST Grid search (4d)
assert np.allclose(16.6813542516, rmseValLRGrid), 'incorrect value for rmseValLRGrid'
In [90]:
predictions = np.asarray(parsedValData
.map(lambda lp: bestModel.predict(lp.features))
.collect())
actual = np.asarray(parsedValData
.map(lambda lp: lp.label)
.collect())
error = np.asarray(parsedValData
.map(lambda lp: (lp.label, bestModel.predict(lp.features)))
.map(lambda lp: squaredError(lp[0], lp[1]))
.collect())
norm = Normalize()
clrs = cmap(np.asarray(norm(error)))[:,0:3]
fig, ax = preparePlot(np.arange(0, 120, 20), np.arange(0, 120, 20))
ax.set_xlim(15, 82), ax.set_ylim(-5, 105)
plt.scatter(predictions, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=.5)
ax.set_xlabel('Predicted'), ax.set_ylabel(r'Actual')
pass
1e-5
e 10
para alpha
e os valores 500
e 5
para número de iterações. Avalie todos os modelos no conjunto de valdação. Reparem que com um valor baixo de alpha, o algoritmo necessita de muito mais iterações para convergir ao ótimo, enquanto um valor muito alto para alpha, pode fazer com que o algoritmo não encontre uma solução.
In [92]:
# EXERCICIO
reg = bestRegParam
modelRMSEs = []
for alpha in [1e-5, 10]:
for numIters in [500, 5]:
model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha,
miniBatchFrac, regParam=reg,
regType='l2', intercept=True)
labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))
rmseVal = calcRMSE(labelsAndPreds)
print ('alpha = {0:.0e}, numIters = {1}, RMSE = {2:.3f}'.format(alpha, numIters, rmseVal))
modelRMSEs.append(rmseVal)
In [94]:
# TEST Vary alpha and the number of iterations (4e)
expectedResults = sorted([57.487692757541318, 57.487692757541318, 352324534.65684682])
assert np.allclose(sorted(modelRMSEs)[:3], expectedResults), 'incorrect value for modelRMSEs'