Hands-on!

Nessa prática, sugerimos alguns pequenos exemplos para você implementar sobre o Spark.

Logistic Regression com Cross-Validation

No exercício LogisticRegression foi utilizado TrainValidationSplit como abordagem de avaliação do modelo gerado. Atualize o exercício consideram CrossValidator e compare os resultados. Não esqueça de utilizar Pipeline.

Bibliotecas


In [1]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

Funções


In [2]:
def mapLibSVM(row): 
    return (row[5],Vectors.dense(row[:3]))

In [3]:
df = spark.read \
        .format("csv") \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .load("datasets/iris.data")

Convertendo a saída de categórica para numérica


In [4]:
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
indexer = indexer.fit(df).transform(df)
indexer.show()


+------------+-----------+------------+-----------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|      label|labelIndex|
+------------+-----------+------------+-----------+-----------+----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|       0.0|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|       0.0|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|       0.0|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|       0.0|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|       0.0|
|         5.4|        3.9|         1.7|        0.4|Iris-setosa|       0.0|
|         4.6|        3.4|         1.4|        0.3|Iris-setosa|       0.0|
|         5.0|        3.4|         1.5|        0.2|Iris-setosa|       0.0|
|         4.4|        2.9|         1.4|        0.2|Iris-setosa|       0.0|
|         4.9|        3.1|         1.5|        0.1|Iris-setosa|       0.0|
|         5.4|        3.7|         1.5|        0.2|Iris-setosa|       0.0|
|         4.8|        3.4|         1.6|        0.2|Iris-setosa|       0.0|
|         4.8|        3.0|         1.4|        0.1|Iris-setosa|       0.0|
|         4.3|        3.0|         1.1|        0.1|Iris-setosa|       0.0|
|         5.8|        4.0|         1.2|        0.2|Iris-setosa|       0.0|
|         5.7|        4.4|         1.5|        0.4|Iris-setosa|       0.0|
|         5.4|        3.9|         1.3|        0.4|Iris-setosa|       0.0|
|         5.1|        3.5|         1.4|        0.3|Iris-setosa|       0.0|
|         5.7|        3.8|         1.7|        0.3|Iris-setosa|       0.0|
|         5.1|        3.8|         1.5|        0.3|Iris-setosa|       0.0|
+------------+-----------+------------+-----------+-----------+----------+
only showing top 20 rows


In [5]:
dfLabeled = indexer.rdd.map(mapLibSVM).toDF(["label", "features"])
dfLabeled.show()

train, test = dfLabeled.randomSplit([0.9, 0.1], seed=12345)


+-----+-------------+
|label|     features|
+-----+-------------+
|  0.0|[5.1,3.5,1.4]|
|  0.0|[4.9,3.0,1.4]|
|  0.0|[4.7,3.2,1.3]|
|  0.0|[4.6,3.1,1.5]|
|  0.0|[5.0,3.6,1.4]|
|  0.0|[5.4,3.9,1.7]|
|  0.0|[4.6,3.4,1.4]|
|  0.0|[5.0,3.4,1.5]|
|  0.0|[4.4,2.9,1.4]|
|  0.0|[4.9,3.1,1.5]|
|  0.0|[5.4,3.7,1.5]|
|  0.0|[4.8,3.4,1.6]|
|  0.0|[4.8,3.0,1.4]|
|  0.0|[4.3,3.0,1.1]|
|  0.0|[5.8,4.0,1.2]|
|  0.0|[5.7,4.4,1.5]|
|  0.0|[5.4,3.9,1.3]|
|  0.0|[5.1,3.5,1.4]|
|  0.0|[5.7,3.8,1.7]|
|  0.0|[5.1,3.8,1.5]|
+-----+-------------+
only showing top 20 rows

Definição do Modelo Logístico


In [6]:
lr = LogisticRegression(labelCol="label", maxIter=15)

Cross-Validation - TrainValidationSplit e CrossValidator


In [7]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.001]) \
    .build()

In [8]:
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           trainRatio=0.8)

In [9]:
cval = CrossValidator(estimator=lr,
                      estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(),
                      numFolds=10)

Treino do Modelo e Predição do Teste


In [10]:
result_tvs = tvs.fit(train).transform(test)
result_cval = cval.fit(train).transform(test)

In [11]:
preds_tvs = result_tvs.select(["prediction", "label"])
preds_cval = result_cval.select(["prediction", "label"])

Avaliação dos Modelos


In [12]:
# Instânciação dos Objetos de Métrics
metrics_tvs = MulticlassMetrics(preds_tvs.rdd)
metrics_cval = MulticlassMetrics(preds_cval.rdd)

In [13]:
# Estatísticas Gerais para o Método TrainValidationSplit
print("Summary Stats")
print("F1 Score = %s" % metrics_tvs.fMeasure())
print("Accuracy = %s" % metrics_tvs.accuracy)
print("Weighted recall = %s" % metrics_tvs.weightedRecall)
print("Weighted precision = %s" % metrics_tvs.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics_tvs.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics_tvs.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics_tvs.weightedFalsePositiveRate)


Summary Stats
/home/minhotmog/spark-2.2.0-bin-hadoop2.7/python/pyspark/mllib/evaluation.py:262: UserWarning: Deprecated in 2.0.0. Use accuracy.
  warnings.warn("Deprecated in 2.0.0. Use accuracy.")
F1 Score = 0.9090909090909091
Accuracy = 0.9090909090909091
Weighted recall = 0.9090909090909092
Weighted precision = 0.9242424242424243
Weighted F(1) Score = 0.8980716253443526
Weighted F(0.5) Score = 0.9070010449320796
Weighted false positive rate = 0.07575757575757575

In [14]:
# Estatísticas Gerais para o Método TrainValidationSplit
print("Summary Stats")
print("F1 Score = %s" % metrics_cval.fMeasure())
print("Accuracy = %s" % metrics_cval.accuracy)
print("Weighted recall = %s" % metrics_cval.weightedRecall)
print("Weighted precision = %s" % metrics_cval.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics_cval.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics_cval.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics_cval.weightedFalsePositiveRate)


Summary Stats
F1 Score = 0.9090909090909091
Accuracy = 0.9090909090909091
Weighted recall = 0.9090909090909092
Weighted precision = 0.9242424242424243
Weighted F(1) Score = 0.8980716253443526
Weighted F(0.5) Score = 0.9070010449320796
Weighted false positive rate = 0.07575757575757575

Conclusão:

Uma vez que ambos os modelos de CrossValidation usam o mesmo modelo de predição (a Regressão Logística), e contando com o fato de que o dataset é relativamente pequeno, é natural que ambos os métodos de CrossValidation encontrem o mesmo (ou aproximadamente igual) valor ótimo para os hyperparâmetros testados.

Por esse motivo, após descobrirem esse valor de hiperparâmetros, os dois modelos irão demonstrar resultados bastante similiares quando avaliados sobre o Conjunto de Treino (que também é o mesmo para os dois modelos).


Random Forest

Use o exercício anterior como base, mas agora utilizando pyspark.ml.classification.RandomForestClassifier. Use Pipeline e CrossValidator para avaliar o modelo gerado.

Bibliotecas


In [15]:
from pyspark.ml.classification import RandomForestClassifier

Definição do Modelo de Árvores Randômicas


In [16]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

Cross-Validation - CrossValidator


In [17]:
paramGrid = ParamGridBuilder()\
    .addGrid(rf.numTrees, [1, 100]) \
    .build()

In [18]:
cval = CrossValidator(estimator=rf,
                      estimatorParamMaps=paramGrid,
                      evaluator=MulticlassClassificationEvaluator(),
                      numFolds=10)

Treino do Modelo e Predição do Teste


In [19]:
results = cval.fit(train).transform(test)

In [20]:
predictions = results.select(["prediction", "label"])

Avaliação do Modelo


In [21]:
# Instânciação dos Objetos de Métrics
metrics = MulticlassMetrics(predictions.rdd)

# Estatísticas Gerais para o Método TrainValidationSplit
print("Summary Stats")
print("F1 Score = %s" % metrics.fMeasure())
print("Accuracy = %s" % metrics.accuracy)
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)


Summary Stats
F1 Score = 1.0
Accuracy = 1.0
Weighted recall = 1.0
Weighted precision = 1.0
Weighted F(1) Score = 1.0
Weighted F(0.5) Score = 1.0
Weighted false positive rate = 0.0

Conclusão:

Uma vez que o RandomForest é um classificador relatiamente robusto, e o Iris é um problema relativamente simples, é notável que esse modelo é suficientemente capaz de perfeitamente predizer observações desse dataset.