Regressão Linear

Este notebook mostra uma implementação básica de Regressão Linear e o uso da biblioteca MLlib do PySpark para a tarefa de regressão na base de dados Million Song Dataset do repositório UCI Machine Learning Repository. Nosso objetivo é predizer o ano de uma música através dos seus atributos de áudio.

Neste notebook:

  • ####Parte 1: Leitura e parsing da base de dados
    • #### Visualização 1: Atributos
    • #### Visualização 2: Deslocamento das variáveis de interesse
  • ####Parte 2: Criar um preditor de referência
    • #### Visualização 3: Valores Preditos vs. Verdadeiros
  • ####Parte 3: Treinar e avaliar um modelo de regressão linear
    • #### Visualização 4: Erro de Treino
  • ####Parte 4: Treinar usando MLlib e ajustar os hiperparâmetros
    • #### Visualização 5: Predições do Melhor modelo
    • #### Visualização 6: Mapa de calor dos hiperparâmetros
  • ####Parte 5: Adicionando interações entre atributos
  • ####Parte 6: Aplicando na base de dados de Crimes de São Francisco

Para referência, consulte os métodos relevantes do PySpark em Spark's Python API e do NumPy em NumPy Reference

Parte 1: Leitura e parsing da base de dados

(1a) Verificando os dados disponíveis

Os dados da base que iremos utilizar estão armazenados em um arquivo texto. No primeiro passo vamos transformar os dados textuais em uma RDD e verificar a formatação dos mesmos. Altere a segunda célula para verificar quantas amostras existem nessa base de dados utilizando o método count method.

Reparem que o rótulo dessa base é o primeiro registro, representando o ano.


In [ ]:
# carregar base de dados
import os.path
fileName = os.path.join('Data', 'millionsong.txt')

numPartitions = 2
rawData = sc.textFile(fileName, numPartitions)

In [ ]:
# EXERCICIO
numPoints = rawData.<COMPLETAR>
print (numPoints)
samplePoints = rawData.take(5)
print (samplePoints)

In [ ]:
# TEST Load and check the data (1a)
assert numPoints==6724, 'incorrect value for numPoints'
print("OK")
assert len(samplePoints)==5, 'incorrect length for samplePoints'
print("OK")

(1b) Usando LabeledPoint

Na MLlib, bases de dados rotuladas devem ser armazenadas usando o objeto LabeledPoint. Escreva a função parsePoint que recebe como entrada uma amostra de dados, transforma os dados usandoo comando unicode.split, em seguida mapeando para float e retorna um LabeledPoint.

Aplique essa função na variável samplePoints da célula anterior e imprima os atributos e rótulo utilizando os atributos LabeledPoint.features e LabeledPoint.label. Finalmente, calcule o número de atributos nessa base de dados.


In [ ]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np

# Here is a sample raw data point:
# '2001.0,0.884,0.610,0.600,0.474,0.247,0.357,0.344,0.33,0.600,0.425,0.60,0.419'
# In this raw data point, 2001.0 is the label, and the remaining values are features

In [ ]:
# EXERCICIO
def parsePoint(line):
    """Converts a comma separated unicode string into a `LabeledPoint`.

    Args:
        line (unicode): Comma separated unicode string where the first element is the label and the
            remaining elements are features.

    Returns:
        LabeledPoint: The line is converted into a `LabeledPoint`, which consists of a label and
            features.
    """
    Point = <COMPLETAR>
    return LabeledPoint(<COMPLETAR>)

parsedSamplePoints = list(map(parsePoint,samplePoints))
firstPointFeatures = parsedSamplePoints[0].features
firstPointLabel = parsedSamplePoints[0].label
print (firstPointFeatures, firstPointLabel)

d = len(firstPointFeatures)
print (d)

In [ ]:
# TEST Using LabeledPoint (1b)
assert isinstance(firstPointLabel, float), 'label must be a float'
expectedX0 = [0.8841,0.6105,0.6005,0.4747,0.2472,0.3573,0.3441,0.3396,0.6009,0.4257,0.6049,0.4192]
assert np.allclose(expectedX0, firstPointFeatures, 1e-4, 1e-4), 'incorrect features for firstPointFeatures'
assert np.allclose(2001.0, firstPointLabel), 'incorrect label for firstPointLabel'
assert d == 12, 'incorrect number of features'
print("OK")

Visualização 1: Atributos

A próxima célula mostra uma forma de visualizar os atributos através de um mapa de calor. Nesse mapa mostramos os 50 primeiros objetos e seus atributos representados por tons de cinza, sendo o branco representando o valor 0 e o preto representando o valor 1.

Esse tipo de visualização ajuda a perceber a variação dos valores dos atributos. Pouca mudança de tons significa que os valores daquele atributo apresenta uma variância baixa.


In [ ]:
import matplotlib.pyplot as plt
import matplotlib.cm as cm
%matplotlib inline

sampleMorePoints = rawData.take(50)

parsedSampleMorePoints = map(parsePoint, sampleMorePoints)
dataValues = list(map(lambda lp: lp.features.toArray(), parsedSampleMorePoints))

def preparePlot(xticks, yticks, figsize=(10.5, 6), hideLabels=False, gridColor='#999999',
                gridWidth=1.0):
    """Template for generating the plot layout."""
    plt.close()
    fig, ax = plt.subplots(figsize=figsize, facecolor='white', edgecolor='white')
    ax.axes.tick_params(labelcolor='#999999', labelsize='10')
    for axis, ticks in [(ax.get_xaxis(), xticks), (ax.get_yaxis(), yticks)]:
        axis.set_ticks_position('none')
        axis.set_ticks(ticks)
        axis.label.set_color('#999999')
        if hideLabels: axis.set_ticklabels([])
    plt.grid(color=gridColor, linewidth=gridWidth, linestyle='-')
    map(lambda position: ax.spines[position].set_visible(False), ['bottom', 'top', 'left', 'right'])
    return fig, ax

# generate layout and plot
fig, ax = preparePlot(np.arange(.5, 11, 1), np.arange(.5, 49, 1), figsize=(8,7), hideLabels=True,
                      gridColor='#eeeeee', gridWidth=1.1)
image = plt.imshow(dataValues,interpolation='nearest', aspect='auto', cmap=cm.Greys)
for x, y, s in zip(np.arange(-.125, 12, 1), np.repeat(-.75, 12), [str(x) for x in range(12)]):
    plt.text(x, y, s, color='#999999', size='10')
plt.text(4.7, -3, 'Feature', color='#999999', size='11'), ax.set_ylabel('Observation')
pass

(1c) Deslocando os rótulos

Para melhor visualizar as soluções obtidas, calcular o erro de predição e visualizar a relação dos atributos com os rótulos, costuma-se deslocar os rótulos para iniciarem em zero.

Como primeiro passo, aplique a função parsePoint no RDD criado anteriormente, em seguida, crie uma RDD apenas com o .label de cada amostra. Finalmente, calcule os valores mínimos e máximos.


In [ ]:
# EXERCICIO
parsedDataInit = rawData.<COMPLETAR>
onlyLabels = parsedDataInit.<COMPLETAR>
minYear = onlyLabels.<COMPLETAR>
maxYear = onlyLabels.<COMPLETAR>
print maxYear, minYear

In [ ]:
# TEST Find the range (1c)
assert len(parsedDataInit.take(1)[0].features)==12, 'unexpected number of features in sample point'
sumFeatTwo = parsedDataInit.map(lambda lp: lp.features[2]).sum()
assert np.allclose(sumFeatTwo, 3158.96224351), 'parsedDataInit has unexpected values'
yearRange = maxYear - minYear
assert yearRange == 89, 'incorrect range for minYear to maxYear'
print("OK")

In [ ]:
# EXERCICIO: subtraia os labels do valor mínimo
parsedData = parsedDataInit.<COMPLETAR>

# Should be a LabeledPoint
print type(parsedData.take(1)[0])
# View the first point
print ('\n{0}'.format(parsedData.take(1)))

In [ ]:
# TEST Shift labels (1d)
oldSampleFeatures = parsedDataInit.take(1)[0].features
newSampleFeatures = parsedData.take(1)[0].features
assert np.allclose(oldSampleFeatures, newSampleFeatures), 'new features do not match old features'
sumFeatTwo = parsedData.map(lambda lp: lp.features[2]).sum()
assert np.allclose(sumFeatTwo, 3158.96224351), 'parsedData has unexpected values'
minYearNew = parsedData.map(lambda lp: lp.label).min()
maxYearNew = parsedData.map(lambda lp: lp.label).max()
assert minYearNew == 0, 'incorrect min year in shifted data'
assert maxYearNew == 89, 'incorrect max year in shifted data'
print("OK")

(1d) Conjuntos de treino, validação e teste

Como próximo passo, vamos dividir nossa base de dados em conjunto de treino, validação e teste conforme discutido em sala de aula. Use o método randomSplit method com os pesos (weights) e a semente aleatória (seed) especificados na célula abaixo parar criar a divisão das bases. Em seguida, utilizando o método cache() faça o pré-armazenamento da base processada.

Esse comando faz o processamento da base através das transformações e armazena em um novo RDD que pode ficar armazenado em memória, se couber, ou em um arquivo temporário.


In [ ]:
# EXERCICIO
weights = [.8, .1, .1]
seed = 42
parsedTrainData, parsedValData, parsedTestData = parsedData.<COMPLETAR>
parsedTrainData.<COMPLETAR>
parsedValData.<COMPLETAR>
parsedTestData.<COMPLETAR>
nTrain = parsedTrainData.count()
nVal = parsedValData.count()
nTest = parsedTestData.count()

print (nTrain, nVal, nTest, nTrain + nVal + nTest)
print (parsedData.count())

In [ ]:
# TEST Training, validation, and test sets (1e)
assert parsedTrainData.getNumPartitions() == numPartitions, 'parsedTrainData has wrong number of partitions'
assert parsedValData.getNumPartitions() == numPartitions, 'parsedValData has wrong number of partitions'
assert parsedTestData.getNumPartitions() == numPartitions,'parsedTestData has wrong number of partitions'
assert len(parsedTrainData.take(1)[0].features) == 12, 'parsedTrainData has wrong number of features'
sumFeatTwo = (parsedTrainData
              .map(lambda lp: lp.features[2])
              .sum())
sumFeatThree = (parsedValData
                .map(lambda lp: lp.features[3])
                .reduce(lambda x, y: x + y))
sumFeatFour = (parsedTestData
               .map(lambda lp: lp.features[4])
               .reduce(lambda x, y: x + y))
assert np.allclose([sumFeatTwo, sumFeatThree, sumFeatFour],2526.87757656, 297.340394298, 184.235876654), 'parsed Train, Val, Test data has unexpected values'
assert nTrain + nVal + nTest == 6724, 'unexpected Train, Val, Test data set size'
assert nTrain == 5359, 'unexpected value for nTrain'
assert nVal == 678, 'unexpected value for nVal'
assert nTest == 687, 'unexpected value for nTest'
print("OK")

Part 2: Criando o modelo de baseline

(2a) Rótulo médio

O baseline é útil para verificarmos que nosso modelo de regressão está funcionando. Ele deve ser um modelo bem simples que qualquer algoritmo possa fazer melhor.

Um baseline muito utilizado é fazer a mesma predição independente dos dados analisados utilizando o rótulo médio do conjunto de treino. Calcule a média dos rótulos deslocados para a base de treino, utilizaremos esse valor posteriormente para comparar o erro de predição. Use um método apropriado para essa tarefa, consulte o RDD API.


In [ ]:
# EXERCICIO
averageTrainYear = (parsedTrainData
                    .<COMPLETAR>
                    .<COMPLETAR>
                   )
print averageTrainYear

In [ ]:
# TEST Average label (2a)
assert np.allclose(averageTrainYear, 53.6792311), 'incorrect value for averageTrainYear'
print("OK")

(2b) Erro quadrático médio

Para comparar a performance em problemas de regressão, geralmente é utilizado o Erro Quadrático Médio (RMSE). Implemente uma função que calcula o RMSE a partir de um RDD de tuplas (rótulo, predição).


In [ ]:
# EXERCICIO
def squaredError(label, prediction):
    """Calculates the the squared error for a single prediction.

    Args:
        label (float): The correct value for this observation.
        prediction (float): The predicted value for this observation.

    Returns:
        float: The difference between the `label` and `prediction` squared.
    """
    return <COMPLETAR>

def calcRMSE(labelsAndPreds):
    """Calculates the root mean squared error for an `RDD` of (label, prediction) tuples.

    Args:
        labelsAndPred (RDD of (float, float)): An `RDD` consisting of (label, prediction) tuples.

    Returns:
        float: The square root of the mean of the squared errors.
    """
    return <COMPLETAR>

labelsAndPreds = sc.parallelize([(3., 1.), (1., 2.), (2., 2.)])
# RMSE = sqrt[((3-1)^2 + (1-2)^2 + (2-2)^2) / 3] = 1.291
exampleRMSE = calcRMSE(labelsAndPreds)
print (exampleRMSE)

In [ ]:
# TEST Root mean squared error (2b)
assert np.allclose(squaredError(3, 1), 4.), 'incorrect definition of squaredError'
assert np.allclose(exampleRMSE, 1.29099444874), 'incorrect value for exampleRMSE'
print("OK")

(2c) RMSE do baseline para os conjuntos de treino, validação e teste

Vamos calcular o RMSE para nossa baseline. Primeiro crie uma RDD de (rótulo, predição) para cada conjunto, e então chame a função calcRMSE.


In [ ]:
# EXERCICIO
labelsAndPredsTrain = parsedTrainData.<COMPLETAR>
rmseTrainBase = calcRMSE(labelsAndPredsTrain)

labelsAndPredsVal = parsedValData.<COMPLETAR>
rmseValBase = calcRMSE(labelsAndPredsVal)

labelsAndPredsTest = parsedTestData.<COMPLETAR>
rmseTestBase = calcRMSE(labelsAndPredsTest)

print ('Baseline Train RMSE = {0:.3f}'.format(rmseTrainBase))
print ('Baseline Validation RMSE = {0:.3f}'.format(rmseValBase))
print ('Baseline Test RMSE = {0:.3f}'.format(rmseTestBase))

In [ ]:
# TEST Training, validation and test RMSE (2c)
assert np.allclose([rmseTrainBase, rmseValBase, rmseTestBase],[21.506125957738682, 20.877445428452468, 21.260493955081916]), 'incorrect RMSE value'
print("OK")

Visualização 2: Predição vs. real

Vamos visualizar as predições no conjunto de validação. Os gráficos de dispersão abaixo plotam os pontos com a coordenada X sendo o valor predito pelo modelo e a coordenada Y o valor real do rótulo.

O primeiro gráfico mostra a situação ideal, um modelo que acerta todos os rótulos. O segundo gráfico mostra o desempenho do modelo baseline. As cores dos pontos representam o erro quadrático daquela predição, quanto mais próxima do laranja, maior o erro.


In [ ]:
from matplotlib.colors import ListedColormap, Normalize
from matplotlib.cm import get_cmap
cmap = get_cmap('YlOrRd')
norm = Normalize()

actual = np.asarray(parsedValData
                    .map(lambda lp: lp.label)
                    .collect())
error = np.asarray(parsedValData
                   .map(lambda lp: (lp.label, lp.label))
                   .map(lambda lp: squaredError(lp[0], lp[1]))
                   .collect())
clrs = cmap(np.asarray(norm(error)))[:,0:3]

fig, ax = preparePlot(np.arange(0, 100, 20), np.arange(0, 100, 20))
plt.scatter(actual, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=0.5)
ax.set_xlabel('Predicted'), ax.set_ylabel('Actual')
pass

In [ ]:
predictions = np.asarray(parsedValData
                         .map(lambda lp: averageTrainYear)
                         .collect())
error = np.asarray(parsedValData
                   .map(lambda lp: (lp.label, averageTrainYear))
                   .map(lambda lp: squaredError(lp[0], lp[1]))
                   .collect())
norm = Normalize()
clrs = cmap(np.asarray(norm(error)))[:,0:3]

fig, ax = preparePlot(np.arange(53.0, 55.0, 0.5), np.arange(0, 100, 20))
ax.set_xlim(53, 55)
plt.scatter(predictions, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=0.3)
ax.set_xlabel('Predicted'), ax.set_ylabel('Actual')

Parte 3: Treinando e avaliando o modelo de regressão linear

(3a) Gradiente do erro

Vamos implementar a regressão linear através do gradiente descendente.

Lembrando que para atualizar o peso da regressão linear fazemos: $$ \scriptsize \mathbf{w}_{i+1} = \mathbf{w}_i - \alpha_i \sum_j (\mathbf{w}_i^\top\mathbf{x}_j - y_j) \mathbf{x}_j \,.$$ onde $ \scriptsize i $ é a iteração do algoritmo, e $ \scriptsize j $ é o objeto sendo observado no momento.

Primeiro, implemente uma função que calcula esse gradiente do erro para certo objeto: $ \scriptsize (\mathbf{w}^\top \mathbf{x} - y) \mathbf{x} \, ,$ e teste a função em dois exemplos. Use o método DenseVector dot para representar a lista de atributos (ele tem funcionalidade parecida com o np.array()).


In [ ]:
from pyspark.mllib.linalg import DenseVector

In [ ]:
# EXERCICIO
def gradientSummand(weights, lp):
    """Calculates the gradient summand for a given weight and `LabeledPoint`.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        weights (DenseVector): An array of model weights (betas).
        lp (LabeledPoint): The `LabeledPoint` for a single observation.

    Returns:
        DenseVector: An array of values the same length as `weights`.  The gradient summand.
    """
    return <COMPLETAR>

exampleW = DenseVector([1, 1, 1])
exampleLP = LabeledPoint(2.0, [3, 1, 4])

summandOne = gradientSummand(exampleW, exampleLP)
print (summandOne)

exampleW = DenseVector([.24, 1.2, -1.4])
exampleLP = LabeledPoint(3.0, [-1.4, 4.2, 2.1])
summandTwo = gradientSummand(exampleW, exampleLP)
print (summandTwo)

In [ ]:
# TEST Gradient summand (3a)
assert np.allclose(summandOne, [18., 6., 24.]), 'incorrect value for summandOne'
assert np.allclose(summandTwo, [1.7304,-5.1912,-2.5956]), 'incorrect value for summandTwo'
print("OK")

(3b) Use os pesos para fazer a predição

Agora, implemente a função getLabeledPredictions que recebe como parâmetro o conjunto de pesos e um LabeledPoint e retorna uma tupla (rótulo, predição). Lembre-se que podemos predizer um rótulo calculando o produto interno dos pesos com os atributos.


In [ ]:
# EXERCICIO
def getLabeledPrediction(weights, observation):
    """Calculates predictions and returns a (label, prediction) tuple.

    Note:
        The labels should remain unchanged as we'll use this information to calculate prediction
        error later.

    Args:
        weights (np.ndarray): An array with one weight for each features in `trainData`.
        observation (LabeledPoint): A `LabeledPoint` that contain the correct label and the
            features for the data point.

    Returns:
        tuple: A (label, prediction) tuple.
    """
    return <COMPLETAR>

weights = np.array([1.0, 1.5])
predictionExample = sc.parallelize([LabeledPoint(2, np.array([1.0, .5])),
                                    LabeledPoint(1.5, np.array([.5, .5]))])
labelsAndPredsExample = predictionExample.map(lambda lp: getLabeledPrediction(weights, lp))
print (labelsAndPredsExample.collect())

In [ ]:
# TEST Use weights to make predictions (3b)
assert labelsAndPredsExample.collect() == [(2.0, 1.75), (1.5, 1.25)], 'incorrect definition for getLabeledPredictions'
print("OK")

(3c) Gradiente descendente

Finalmente, implemente o algoritmo gradiente descendente para regressão linear e teste a função em um exemplo.


In [ ]:
# EXERCICIO
def linregGradientDescent(trainData, numIters):
    """Calculates the weights and error for a linear regression model trained with gradient descent.

    Note:
        `DenseVector` behaves similarly to a `numpy.ndarray` and they can be used interchangably
        within this function.  For example, they both implement the `dot` method.

    Args:
        trainData (RDD of LabeledPoint): The labeled data for use in training the model.
        numIters (int): The number of iterations of gradient descent to perform.

    Returns:
        (np.ndarray, np.ndarray): A tuple of (weights, training errors).  Weights will be the
            final weights (one weight per feature) for the model, and training errors will contain
            an error (RMSE) for each iteration of the algorithm.
    """
    # The length of the training data
    n = trainData.<COMPLETAR>
    # The number of features in the training data
    d = <COMPLETAR>
    w = np.zeros(d)
    alpha = 1.0
    # We will compute and store the training error after each iteration
    errorTrain = np.zeros(numIters)
    for i in range(numIters):
        # Use getLabeledPrediction from (3b) with trainData to obtain an RDD of (label, prediction)
        # tuples.  Note that the weights all equal 0 for the first iteration, so the predictions will
        # have large errors to start.
        labelsAndPredsTrain = trainData.<COMPLETAR>
        errorTrain[i] = <COMPLETAR>

        # Calculate the `gradient`.  Make use of the `gradientSummand` function you wrote in (3a).
        # Note that `gradient` sould be a `DenseVector` of length `d`.
        gradient = trainData.<COMPLETAR>

        # Update the weights
        alpha_i = alpha / (n * np.sqrt(i+1))
        w -= alpha_i*gradient
    return w, errorTrain

# create a toy dataset with n = 10, d = 3, and then run 5 iterations of gradient descent
# note: the resulting model will not be useful; the goal here is to verify that
# linregGradientDescent is working properly
exampleN = 10
exampleD = 3
exampleData = (sc
               .parallelize(parsedTrainData.take(exampleN))
               .map(lambda lp: LabeledPoint(lp.label, lp.features[0:exampleD])))
print (exampleData.take(2))
exampleNumIters = 5
exampleWeights, exampleErrorTrain = linregGradientDescent(exampleData, exampleNumIters)
print (exampleWeights)

In [ ]:
# TEST Gradient descent (3c)
expectedOutput = [48.20389904,  34.53243006, 30.60284959]
assert np.allclose(exampleWeights, expectedOutput), 'value of exampleWeights is incorrect'
expectedError = [79.72766145,  33.64762907,   9.46281696,   9.45486926,   9.44889147]
assert np.allclose(exampleErrorTrain, expectedError),'value of exampleErrorTrain is incorrect'
print("OK")

(3d) Treinando o modelo na base de dados

Agora iremos treinar o modelo de regressão linear na nossa base de dados de treino e calcular o RMSE na base de validação. Lembrem-se que não devemos utilizar a base de teste até que o melhor parâmetro do modelo seja escolhido.

Para essa tarefa vamos utilizar as funções linregGradientDescent, getLabeledPrediction e calcRMSE já implementadas.


In [ ]:
# EXERCICIO
numIters = 50
weightsLR0, errorTrainLR0 = <COMPLETAR>

labelsAndPreds = parsedValData.<COMPLETAR>
rmseValLR0 = calcRMSE(labelsAndPreds)

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}'.format(rmseValBase, rmseValLR0))

In [ ]:
# TEST Train the model (3d)
expectedOutput = [ 22.64370481,  20.1815662,   -0.21620107,   8.53259099,   5.94821844,
  -4.50349235,  15.51511703,   3.88802901,   9.79146177,   5.74357056,
  11.19512589,   3.60554264]
assert np.allclose(weightsLR0, expectedOutput), 'incorrect value for weightsLR0'
print("OK")

Visualização 3: Erro de Treino

Vamos verificar o comportamento do algoritmo durante as iterações. Para isso vamos plotar um gráfico em que o eixo x representa a iteração e o eixo y o log do RMSE. O primeiro gráfico mostra as primeiras 50 iterações enquanto o segundo mostra as últimas 44 iterações. Note que inicialmente o erro cai rapidamente, quando então o gradiente descendente passa a fazer apenas pequenos ajustes.


In [ ]:
norm = Normalize()
clrs = cmap(np.asarray(norm(np.log(errorTrainLR0))))[:,0:3]

fig, ax = preparePlot(np.arange(0, 60, 10), np.arange(2, 6, 1))
ax.set_ylim(2, 6)
plt.scatter(list(range(0, numIters)), np.log(errorTrainLR0), s=14**2, c=clrs, edgecolors='#888888', alpha=0.75)
ax.set_xlabel('Iteration'), ax.set_ylabel(r'$\log_e(errorTrainLR0)$')
pass

In [ ]:
norm = Normalize()
clrs = cmap(np.asarray(norm(errorTrainLR0[6:])))[:,0:3]

fig, ax = preparePlot(np.arange(0, 60, 10), np.arange(17, 22, 1))
ax.set_ylim(17.8, 21.2)
plt.scatter(range(0, numIters-6), errorTrainLR0[6:], s=14**2, c=clrs, edgecolors='#888888', alpha=0.75)
ax.set_xticklabels(map(str, range(6, 66, 10)))
ax.set_xlabel('Iteration'), ax.set_ylabel(r'Training Error')
pass

Part 4: Treino utilizando MLlib e Busca em Grade (Grid Search)

(4a) LinearRegressionWithSGD

Nosso teste inicial já conseguiu obter um desempenho melhor que o baseline, mas vamos ver se conseguimos fazer melhor introduzindo a ordenada de origem da reta além de outros ajustes no algoritmo. MLlib LinearRegressionWithSGD implementa o mesmo algoritmo da parte (3b), mas de forma mais eficiente para o contexto distribuído e com várias funcionalidades adicionais.

Primeiro utilize a função LinearRegressionWithSGD para treinar um modelo com regularização L2 (Ridge) e com a ordenada de origem. Esse método retorna um LinearRegressionModel.

Em seguida, use os atributos weights e intercept para imprimir o modelo encontrado.


In [ ]:
from pyspark.mllib.regression import LinearRegressionWithSGD
# Values to use when training the linear regression model
numIters = 500  # iterations
alpha = 1.0  # step
miniBatchFrac = 1.0  # miniBatchFraction
reg = 1e-1  # regParam
regType = 'l2'  # regType
useIntercept = True  # intercept

In [ ]:
# EXERCICIO
firstModel = LinearRegressionWithSGD.train(parsedTrainData, iterations = numIters, step = alpha, miniBatchFraction = 1.0,
                                          regParam=reg,regType=regType, intercept=useIntercept)

# weightsLR1 stores the model weights; interceptLR1 stores the model intercept
weightsLR1 = firstModel.<COMPLETAR>
interceptLR1 = firstModel.<COMPLETAR>
print( weightsLR1, interceptLR1)

In [ ]:
# TEST LinearRegressionWithSGD (4a)
expectedIntercept = 13.332056210482524
expectedWeights = [15.9694010246,13.9897244172,0.669349383773,6.24618402989,4.00932179503,-2.30176663131,10.478805422,3.06385145385,7.14414111075,4.49826819526,7.87702565069,3.00732146613]
assert np.allclose(interceptLR1, expectedIntercept), 'incorrect value for interceptLR1'
assert np.allclose(weightsLR1, expectedWeights), 'incorrect value for weightsLR1'
print("OK")

(4b) Predição

Agora use o método LinearRegressionModel.predict() para fazer a predição de um objeto. Passe o atributo features de um LabeledPoint comp parâmetro.


In [ ]:
# EXERCICIO
samplePoint = parsedTrainData.take(1)[0]
samplePrediction = firstModel.<COMPLETAR>
print (samplePrediction)

In [ ]:
# TEST Predict (4b)
assert np.allclose(samplePrediction, 56.4065674104), 'incorrect value for samplePrediction'

(4c) Avaliar RMSE

Agora avalie o desempenho desse modelo no teste de validação. Use o método predict() para criar o RDD labelsAndPreds RDD, e então use a função calcRMSE() da Parte (2b) para calcular o RMSE.


In [ ]:
# EXERCICIO
labelsAndPreds = parsedValData.<COMPLETAR>
rmseValLR1 = calcRMSE(labelsAndPreds)

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}'.format(rmseValBase, rmseValLR0, rmseValLR1))

In [ ]:
# TEST Evaluate RMSE (4c)
assert np.allclose(rmseValLR1, 19.025), 'incorrect value for rmseValLR1'

(4d) Grid search

Já estamos superando o baseline em pelo menos dois anos na média, vamos ver se encontramos um conjunto de parâmetros melhor. Faça um grid search para encontrar um bom parâmetro de regularização. Tente valores para regParam dentro do conjunto 1e-10, 1e-5, e 1.


In [ ]:
# EXERCICIO
bestRMSE = rmseValLR1
bestRegParam = reg
bestModel = firstModel

numIters = 500
alpha = 1.0
miniBatchFrac = 1.0
for reg in <COMPLETAR>:
    model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha,
                                          miniBatchFrac, regParam=reg,
                                          regType='l2', intercept=True)
    labelsAndPreds = parsedValData.<COMPLETAR>
    rmseValGrid = calcRMSE(labelsAndPreds)
    print (rmseValGrid)

    if rmseValGrid < bestRMSE:
        bestRMSE = rmseValGrid
        bestRegParam = reg
        bestModel = model
rmseValLRGrid = bestRMSE

print ('Validation RMSE:\n\tBaseline = {0:.3f}\n\tLR0 = {1:.3f}\n\tLR1 = {2:.3f}\n\tLRGrid = {3:.3f}'.format(rmseValBase, rmseValLR0, rmseValLR1, rmseValLRGrid))

In [ ]:
# TEST Grid search (4d)
assert np.allclose(16.6813542516, rmseValLRGrid), 'incorrect value for rmseValLRGrid'

Visualização 5: Predições do melhor modelo

Agora, vamos criar um gráfico para verificar o desempenho do melhor modelo. Reparem nesse gráfico que a quantidade de pontos mais escuros reduziu bastante em relação ao baseline.


In [ ]:
predictions = np.asarray(parsedValData
                         .map(lambda lp: bestModel.predict(lp.features))
                         .collect())
actual = np.asarray(parsedValData
                    .map(lambda lp: lp.label)
                    .collect())
error = np.asarray(parsedValData
                   .map(lambda lp: (lp.label, bestModel.predict(lp.features)))
                   .map(lambda lp: squaredError(lp[0], lp[1]))
                   .collect())

norm = Normalize()
clrs = cmap(np.asarray(norm(error)))[:,0:3]

fig, ax = preparePlot(np.arange(0, 120, 20), np.arange(0, 120, 20))
ax.set_xlim(15, 82), ax.set_ylim(-5, 105)
plt.scatter(predictions, actual, s=14**2, c=clrs, edgecolors='#888888', alpha=0.75, linewidths=.5)
ax.set_xlabel('Predicted'), ax.set_ylabel(r'Actual')
pass

(4e) Grid Search para o valor de alfa e número de iterações

Agora, vamos verificar diferentes valores para alfa e número de iterações para perceber o impacto desses parâmetros em nosso modelo. Especificamente tente os valores 1e-5 e 10 para alpha e os valores 500 e 5 para número de iterações. Avalie todos os modelos no conjunto de valdação. Reparem que com um valor baixo de alpha, o algoritmo necessita de muito mais iterações para convergir ao ótimo, enquanto um valor muito alto para alpha, pode fazer com que o algoritmo não encontre uma solução.


In [ ]:
# EXERCICIO
reg = bestRegParam
modelRMSEs = []

for alpha in <COMPLETAR>:
    for numIters in <COMPLETAR>:
        model = LinearRegressionWithSGD.train(parsedTrainData, numIters, alpha,
                                              miniBatchFrac, regParam=reg,
                                              regType='l2', intercept=True)
        labelsAndPreds = parsedValData.map(lambda lp: (lp.label, model.predict(lp.features)))
        rmseVal = calcRMSE(labelsAndPreds)
        print ('alpha = {0:.0e}, numIters = {1}, RMSE = {2:.3f}'.format(alpha, numIters, rmseVal))
        modelRMSEs.append(rmseVal)

In [ ]:
# TEST Vary alpha and the number of iterations (4e)
expectedResults = sorted([57.487692757541318, 57.487692757541318, 352324534.65684682])
assert np.allclose(sorted(modelRMSEs)[:3], expectedResults), 'incorrect value for modelRMSEs'