In [ ]:
import matplotlib.pyplot as plt
import numpy as np
def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
gridWidth=1.0):
"""Template for generating the plot layout."""
plt.close()
fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
ax.axes.tick_params(labelcolor='#999999', labelsize='10')
for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
axis.set_ticks_position('none')
axis.set_ticks(ticks)
axis.label.set_color('#999999')
if hideLabels: axis.set_ticklabels([])
plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
return fig, ax
def create2DGaussian(mn, sigma, cov, n):
"""Randomly sample points from a two-dimensional Gaussian distribution"""
np.random.seed(142)
return np.random.multivariate_normal(np.array([mn, mn]), np.array([[sigma, cov], [cov, sigma]]), n)
In [ ]:
dataRandom = create2DGaussian(mn=50, sigma=1, cov=0, n=100)
# generate layout and plot data
fig, ax = preparePlot(np.arange(46, 55, 2), np.arange(46, 55, 2))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45, 54.5), ax.set_ylim(45, 54.5)
plt.scatter(dataRandom[:,0], dataRandom[:,1], s=14**2, c='#d6ebf2', edgecolors='#8cbfd0', alpha=0.75)
pass
In [ ]:
dataCorrelated = create2DGaussian(mn=50, sigma=1, cov=.9, n=100)
# generate layout and plot data
fig, ax = preparePlot(np.arange(46, 55, 2), np.arange(46, 55, 2))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.scatter(dataCorrelated[:,0], dataCorrelated[:,1], s=14**2, c='#d6ebf2',
edgecolors='#8cbfd0', alpha=0.75)
pass
correlatedData
é uma RDD de NumPy arrays. Isso permite que façamos algumas operações mais facilmente. Por exemplo, se utilizarmos a transformação sum()
o PySpark irá fazer a soma dos valores de cada coluna.
In [ ]:
# EXERCICIO
correlatedData = sc.parallelize(dataCorrelated)
meanCorrelated = correlatedData.<COMPLETAR>
correlatedDataZeroMean = correlatedData.<COMPLETAR>
print meanCorrelated
print correlatedData.take(1)
print correlatedDataZeroMean.take(1)
In [ ]:
# TEST Interpreting PCA (1a)
from test_helper import Test
Test.assertTrue(np.allclose(meanCorrelated, [49.95739037, 49.97180477]),
'incorrect value for meanCorrelated')
Test.assertTrue(np.allclose(correlatedDataZeroMean.take(1)[0], [-0.28561917, 0.10351492]),
'incorrect value for correlatedDataZeroMean')
In [ ]:
# EXERCICIO
# Compute the covariance matrix using outer products and correlatedDataZeroMean
correlatedCov = (correlatedDataZeroMean
.<COMPLETAR>
.<COMPLETAR>
)/correlatedDataZeroMean.count()
print correlatedCov
In [ ]:
# TEST Sample covariance matrix (1b)
covResult = [[ 0.99558386, 0.90148989], [0.90148989, 1.08607497]]
Test.assertTrue(np.allclose(covResult, correlatedCov), 'incorrect value for correlatedCov')
In [ ]:
# EXERCICIO
def estimateCovariance(data):
"""Compute the covariance matrix for a given rdd.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
data (RDD of np.ndarray): An `RDD` consisting of NumPy arrays.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input `RDD`.
"""
meanVal = data.<COMPLETAR>
return (data
.<COMPLETAR>
.<COMPLETAR>
.<COMPLETAR>
)/data.count()
correlatedCovAuto= estimateCovariance(correlatedData)
print correlatedCovAuto
In [ ]:
# TEST Covariance function (1c)
correctCov = [[ 0.99558386, 0.90148989], [0.90148989, 1.08607497]]
Test.assertTrue(np.allclose(correctCov, correlatedCovAuto),
'incorrect value for correlatedCovAuto')
numpy.linalg
para calcular os autovalores e autovetores. Em seguida, ordene os autovetores baseado nos autovalores, do maior para o menor, gerando uma matriz em que os autovetores são as colunas (e a primeira coluna é o maior autovetor).topComponent
.
In [ ]:
# EXERCICIO
from numpy.linalg import eigh
# Calculate the eigenvalues and eigenvectors from correlatedCovAuto
eigVals, eigVecs = <COMPLETAR>
print 'eigenvalues: {0}'.format(eigVals)
print '\neigenvectors: \n{0}'.format(eigVecs)
# Use np.argsort to find the top eigenvector based on the largest eigenvalue
inds = <COMPLETAR>
topComponent = <COMPLETAR>
print '\ntop principal component: {0}'.format(topComponent)
In [ ]:
# TEST Eigendecomposition (1d)
def checkBasis(vectors, correct):
return np.allclose(vectors, correct) or np.allclose(np.negative(vectors), correct)
Test.assertTrue(checkBasis(topComponent, [0.68915649, 0.72461254]),
'incorrect value for topComponent')
In [ ]:
# EXERCICIO
# Use the topComponent and the data from correlatedData to generate PCA scores
correlatedDataScores = correlatedData.<COMPLETAR>
print 'one-dimensional data (first three):\n{0}'.format(np.asarray(correlatedDataScores.take(3)))
In [ ]:
# TEST PCA Scores (1e)
firstThree = [70.51682806, 69.30622356, 71.13588168]
Test.assertTrue(checkBasis(correlatedDataScores.take(3), firstThree),
'incorrect value for correlatedDataScores')
pca
e teste utilizando correlatedData
e $\scriptsize k = 2$. Dica: Use os resultados da Parte (1c), Parte (1d), e Parte (1e).
In [ ]:
# EXERCICIO
def pca(data, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
data (RDD of np.ndarray): An `RDD` consisting of NumPy arrays.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
<COMPLETAR>
# Return the `k` principal components, `k` scores, and all eigenvalues
return (vecs, data.map(lambda x: x.dot(vecs)), vals)
# Run pca on correlatedData with k = 2
topComponentsCorrelated, correlatedDataScoresAuto, eigenvaluesCorrelated = pca(correlatedData, 2)
# Note that the 1st principal component is in the first column
print 'topComponentsCorrelated: \n{0}'.format(topComponentsCorrelated)
print ('\ncorrelatedDataScoresAuto (first three): \n{0}'
.format('\n'.join(map(str, correlatedDataScoresAuto.take(3)))))
print '\neigenvaluesCorrelated: \n{0}'.format(eigenvaluesCorrelated)
# Create a higher dimensional test set
pcaTestData = sc.parallelize([np.arange(x, x + 4) for x in np.arange(0, 20, 4)])
componentsTest, testScores, eigenvaluesTest = pca(pcaTestData, 3)
print '\npcaTestData: \n{0}'.format(np.array(pcaTestData.collect()))
print '\ncomponentsTest: \n{0}'.format(componentsTest)
print ('\ntestScores (first three): \n{0}'
.format('\n'.join(map(str, testScores.take(3)))))
print '\neigenvaluesTest: \n{0}'.format(eigenvaluesTest)
In [ ]:
# TEST PCA Function (2a)
Test.assertTrue(checkBasis(topComponentsCorrelated.T,
[[0.68915649, 0.72461254], [-0.72461254, 0.68915649]]),
'incorrect value for topComponentsCorrelated')
firstThreeCorrelated = [[70.51682806, 69.30622356, 71.13588168], [1.48305648, 1.5888655, 1.86710679]]
Test.assertTrue(np.allclose(firstThreeCorrelated,
np.vstack(np.abs(correlatedDataScoresAuto.take(3))).T),
'incorrect value for firstThreeCorrelated')
Test.assertTrue(np.allclose(eigenvaluesCorrelated, [1.94345403, 0.13820481]),
'incorrect values for eigenvaluesCorrelated')
topComponentsCorrelatedK1, correlatedDataScoresK1, eigenvaluesCorrelatedK1 = pca(correlatedData, 1)
Test.assertTrue(checkBasis(topComponentsCorrelatedK1.T, [0.68915649, 0.72461254]),
'incorrect value for components when k=1')
Test.assertTrue(np.allclose([70.51682806, 69.30622356, 71.13588168],
np.vstack(np.abs(correlatedDataScoresK1.take(3))).T),
'incorrect value for scores when k=1')
Test.assertTrue(np.allclose(eigenvaluesCorrelatedK1, [1.94345403, 0.13820481]),
'incorrect values for eigenvalues when k=1')
Test.assertTrue(checkBasis(componentsTest.T[0], [ .5, .5, .5, .5]),
'incorrect value for componentsTest')
Test.assertTrue(np.allclose(np.abs(testScores.first()[0]), 3.),
'incorrect value for testScores')
Test.assertTrue(np.allclose(eigenvaluesTest, [ 128, 0, 0, 0 ]), 'incorrect value for eigenvaluesTest')
In [ ]:
# EXERCICIO
randomData = sc.parallelize(dataRandom)
# Use pca on randomData
topComponentsRandom, randomDataScoresAuto, eigenvaluesRandom = <COMPLETAR>
print 'topComponentsRandom: \n{0}'.format(topComponentsRandom)
print ('\nrandomDataScoresAuto (first three): \n{0}'
.format('\n'.join(map(str, randomDataScoresAuto.take(3)))))
print '\neigenvaluesRandom: \n{0}'.format(eigenvaluesRandom)
In [ ]:
# TEST PCA on `dataRandom` (2b)
Test.assertTrue(checkBasis(topComponentsRandom.T,
[[-0.2522559 , 0.96766056], [-0.96766056, -0.2522559]]),
'incorrect value for topComponentsRandom')
firstThreeRandom = [[36.61068572, 35.97314295, 35.59836628],
[61.3489929 , 62.08813671, 60.61390415]]
Test.assertTrue(np.allclose(firstThreeRandom, np.vstack(np.abs(randomDataScoresAuto.take(3))).T),
'incorrect value for randomDataScoresAuto')
Test.assertTrue(np.allclose(eigenvaluesRandom, [1.4204546, 0.99521397]),
'incorrect value for eigenvaluesRandom')
In [ ]:
def projectPointsAndGetLines(data, components, xRange):
"""Project original data onto first component and get line details for top two components."""
topComponent= components[:, 0]
slope1, slope2 = components[1, :2] / components[0, :2]
means = data.mean()[:2]
demeaned = data.map(lambda v: v - means)
projected = demeaned.map(lambda v: (v.dot(topComponent) /
topComponent.dot(topComponent)) * topComponent)
remeaned = projected.map(lambda v: v + means)
x1,x2 = zip(*remeaned.collect())
lineStartP1X1, lineStartP1X2 = means - np.asarray([xRange, xRange * slope1])
lineEndP1X1, lineEndP1X2 = means + np.asarray([xRange, xRange * slope1])
lineStartP2X1, lineStartP2X2 = means - np.asarray([xRange, xRange * slope2])
lineEndP2X1, lineEndP2X2 = means + np.asarray([xRange, xRange * slope2])
return ((x1, x2), ([lineStartP1X1, lineEndP1X1], [lineStartP1X2, lineEndP1X2]),
([lineStartP2X1, lineEndP2X1], [lineStartP2X2, lineEndP2X2]))
In [ ]:
((x1, x2), (line1X1, line1X2), (line2X1, line2X2)) = \
projectPointsAndGetLines(correlatedData, topComponentsCorrelated, 5)
# generate layout and plot data
fig, ax = preparePlot(np.arange(46, 55, 2), np.arange(46, 55, 2), figsize=(7, 7))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.plot(line1X1, line1X2, linewidth=3.0, c='#8cbfd0', linestyle='--')
plt.plot(line2X1, line2X2, linewidth=3.0, c='#d6ebf2', linestyle='--')
plt.scatter(dataCorrelated[:,0], dataCorrelated[:,1], s=14**2, c='#d6ebf2',
edgecolors='#8cbfd0', alpha=0.75)
plt.scatter(x1, x2, s=14**2, c='#62c162', alpha=.75)
pass
In [ ]:
((x1, x2), (line1X1, line1X2), (line2X1, line2X2)) = \
projectPointsAndGetLines(randomData, topComponentsRandom, 5)
# generate layout and plot data
fig, ax = preparePlot(np.arange(46, 55, 2), np.arange(46, 55, 2), figsize=(7, 7))
ax.set_xlabel(r'Simulated $x_1$ values'), ax.set_ylabel(r'Simulated $x_2$ values')
ax.set_xlim(45.5, 54.5), ax.set_ylim(45.5, 54.5)
plt.plot(line1X1, line1X2, linewidth=3.0, c='#8cbfd0', linestyle='--')
plt.plot(line2X1, line2X2, linewidth=3.0, c='#d6ebf2', linestyle='--')
plt.scatter(dataRandom[:,0], dataRandom[:,1], s=14**2, c='#d6ebf2',
edgecolors='#8cbfd0', alpha=0.75)
plt.scatter(x1, x2, s=14**2, c='#62c162', alpha=.75)
pass
In [ ]:
# EXERCICIO
def varianceExplained(data, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
data (RDD of np.ndarray): An RDD that contains NumPy arrays which store the
features for an observation.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
<COMPLETAR>
return eigenvalues[:k].sum()/eigenvalues.sum()
varianceRandom1 = varianceExplained(randomData, 1)
varianceCorrelated1 = varianceExplained(correlatedData, 1)
varianceRandom2 = varianceExplained(randomData, 2)
varianceCorrelated2 = varianceExplained(correlatedData, 2)
print ('Percentage of variance explained by the first component of randomData: {0:.1f}%'
.format(varianceRandom1 * 100))
print ('Percentage of variance explained by both components of randomData: {0:.1f}%'
.format(varianceRandom2 * 100))
print ('\nPercentage of variance explained by the first component of correlatedData: {0:.1f}%'.
format(varianceCorrelated1 * 100))
print ('Percentage of variance explained by both components of correlatedData: {0:.1f}%'
.format(varianceCorrelated2 * 100))
In [ ]:
# TEST Variance explained (2d)
Test.assertTrue(np.allclose(varianceRandom1, 0.588017172066), 'incorrect value for varianceRandom1')
Test.assertTrue(np.allclose(varianceCorrelated1, 0.933608329586),
'incorrect value for varianceCorrelated1')
Test.assertTrue(np.allclose(varianceRandom2, 1.0), 'incorrect value for varianceRandom2')
Test.assertTrue(np.allclose(varianceCorrelated2, 1.0), 'incorrect value for varianceCorrelated2')
In [ ]:
baseDir = os.path.join('Data')
inputPath = os.path.join('Aula04', 'millionsong.txt')
fileName = os.path.join(baseDir, inputPath)
numPartitions = 2
rawData = sc.textFile(fileName, numPartitions)
print rawData.first()
In [ ]:
from pyspark.mllib.regression import LabeledPoint
def parsePoint(line):
"""Converts a comma separated unicode string into a `LabeledPoint`.
Args:
line (unicode): Comma separated unicode string where the first element is the label and the
remaining elements are features.
Returns:
LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and
features.
"""
Point = map(float,line.split(','))
return LabeledPoint(Point[0]-1922,Point[1:])
baselineRL = 17.017
baselineInteract = 15.690
millionRDD = rawData.map(parsePoint)
print millionRDD.take(1)
In [ ]:
def squaredError(label, prediction):
"""Calculates the the squared error for a single prediction.
Args:
label (float): The correct value for this observation.
prediction (float): The predicted value for this observation.
Returns:
float: The difference between the `label` and `prediction` squared.
"""
return np.square(label-prediction)
def calcRMSE(labelsAndPreds):
"""Calculates the root mean squared error for an `RDD` of (label, prediction) tuples.
Args:
labelsAndPred (RDD of (float, float)): An `RDD` consisting of (label, prediction) tuples.
Returns:
float: The square root of the mean of the squared errors.
"""
return np.sqrt(labelsAndPreds.map(lambda rec: squaredError(rec[0],rec[1])).mean())
pca
, agora denominada pcaLP
para receber um RDD de LabeledPoints. Será necessário realizar duas alterações: o RDD a ser passado para estimateCovariance
deve ser uma transformação de data
para conter apenas os features
do LabeledPoint. O retorno da função deve transformar apenas o features
utilizando os autovetores.
In [ ]:
def pcaLP(data, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
data (RDD of np.ndarray): An `RDD` consisting of NumPy arrays.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
cov = estimateCovariance(data.map(lambda x: x.features))
eigVals, eigVecs = eigh(cov)
inds = np.argsort(-eigVals)
vecs = eigVecs[:,inds[:k]]
vals = eigVals[inds[:cov.shape[0]]]
# Return the `k` principal components, `k` scores, and all eigenvalues
return data.map(lambda x: LabeledPoint(x.label,x.features.dot(vecs)))
In [ ]:
for k in range(1,10):
varexp = varianceExplained(millionRDD.map(lambda x: x.features), k)
print 'Variation explained by {} components is {}'.format(k,varexp)
In [ ]:
from pyspark.mllib.regression import LinearRegressionWithSGD
# Values to use when training the linear regression model
numIters = 2000 # iterations
alpha = 1.0 # step
miniBatchFrac = 1.0 # miniBatchFraction
reg = 1e-1 # regParam
regType = None#'l2' # regType
useIntercept = True # intercept
In [ ]:
# TODO: Replace <FILL IN> with appropriate code
# Run pca using scaledData
pcaMillionRDD = pcaLP(millionRDD, 2)
weights = [.8, .1, .1]
seed = 42
parsedTrainData, parsedValData, parsedTestData = pcaMillionRDD.randomSplit(weights, seed)
pcaModel = LinearRegressionWithSGD.train(parsedTrainData, iterations = numIters, step = alpha, miniBatchFraction = 1.0,
regParam=reg,regType=regType, intercept=useIntercept)
labelsAndPreds = parsedValData.map(lambda lp: (lp.label, pcaModel.predict(lp.features)))
rmseValLPCA2 = calcRMSE(labelsAndPreds)
In [ ]:
pcaMillionRDD = pcaLP(millionRDD, 6)
weights = [.8, .1, .1]
seed = 42
parsedTrainData, parsedValData, parsedTestData = pcaMillionRDD.randomSplit(weights, seed)
pcaModel = LinearRegressionWithSGD.train(parsedTrainData, iterations = numIters, step = alpha, miniBatchFraction = 1.0,
regParam=reg,regType=regType, intercept=useIntercept)
labelsAndPreds = parsedValData.map(lambda lp: (lp.label, pcaModel.predict(lp.features)))
rmseValLPCA6 = calcRMSE(labelsAndPreds)
In [ ]:
pcaMillionRDD = pcaLP(millionRDD, 8)
weights = [.8, .1, .1]
seed = 42
parsedTrainData, parsedValData, parsedTestData = pcaMillionRDD.randomSplit(weights, seed)
pcaModel = LinearRegressionWithSGD.train(parsedTrainData, iterations = numIters, step = alpha, miniBatchFraction = 1.0,
regParam=reg,regType=regType, intercept=useIntercept)
labelsAndPreds = parsedValData.map(lambda lp: (lp.label, pcaModel.predict(lp.features)))
rmseValLPCA8 = calcRMSE(labelsAndPreds)
In [ ]:
print ('Validation RMSE:\n\tLinear Regression (orig.) = {0:.3f}\n\tLinear Regression Interact = {1:.3f}' +
'\n\tLR PCA (k=2) = {2:.3f}\n\tLR PCA (k=6) = {3:.3f}\n\tLR PCA (k=8) = {4:.3f}'
).format(baselineRL, baselineInteract, rmseValLPCA2, rmseValLPCA6, rmseValLPCA8)
In [ ]: