In [1]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
In [2]:
def mapLibSVM(row):
return (row[5],Vectors.dense(row[:3]))
In [3]:
df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("datasets/iris.data")
In [4]:
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
indexer = indexer.fit(df).transform(df)
indexer.show()
In [5]:
dfLabeled = indexer.rdd.map(mapLibSVM).toDF(["label", "features"])
dfLabeled.show()
train, test = dfLabeled.randomSplit([0.9, 0.1], seed=12345)
In [6]:
lr = LogisticRegression(labelCol="label", maxIter=15)
In [7]:
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.1, 0.001]) \
.build()
In [8]:
tvs = TrainValidationSplit(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
trainRatio=0.8)
In [9]:
cval = CrossValidator(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=10)
In [10]:
result_tvs = tvs.fit(train).transform(test)
result_cval = cval.fit(train).transform(test)
In [11]:
preds_tvs = result_tvs.select(["prediction", "label"])
preds_cval = result_cval.select(["prediction", "label"])
In [12]:
# Instânciação dos Objetos de Métrics
metrics_tvs = MulticlassMetrics(preds_tvs.rdd)
metrics_cval = MulticlassMetrics(preds_cval.rdd)
In [13]:
# Estatísticas Gerais para o Método TrainValidationSplit
print("Summary Stats")
print("F1 Score = %s" % metrics_tvs.fMeasure())
print("Accuracy = %s" % metrics_tvs.accuracy)
print("Weighted recall = %s" % metrics_tvs.weightedRecall)
print("Weighted precision = %s" % metrics_tvs.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics_tvs.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics_tvs.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics_tvs.weightedFalsePositiveRate)
In [14]:
# Estatísticas Gerais para o Método TrainValidationSplit
print("Summary Stats")
print("F1 Score = %s" % metrics_cval.fMeasure())
print("Accuracy = %s" % metrics_cval.accuracy)
print("Weighted recall = %s" % metrics_cval.weightedRecall)
print("Weighted precision = %s" % metrics_cval.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics_cval.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics_cval.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics_cval.weightedFalsePositiveRate)
Uma vez que ambos os modelos de CrossValidation usam o mesmo modelo de predição (a Regressão Logística), e contando com o fato de que o dataset é relativamente pequeno, é natural que ambos os métodos de CrossValidation encontrem o mesmo (ou aproximadamente igual) valor ótimo para os hyperparâmetros testados.
Por esse motivo, após descobrirem esse valor de hiperparâmetros, os dois modelos irão demonstrar resultados bastante similiares quando avaliados sobre o Conjunto de Treino (que também é o mesmo para os dois modelos).
In [15]:
from pyspark.ml.classification import RandomForestClassifier
In [16]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
In [17]:
paramGrid = ParamGridBuilder()\
.addGrid(rf.numTrees, [1, 100]) \
.build()
In [18]:
cval = CrossValidator(estimator=rf,
estimatorParamMaps=paramGrid,
evaluator=MulticlassClassificationEvaluator(),
numFolds=10)
In [19]:
results = cval.fit(train).transform(test)
In [20]:
predictions = results.select(["prediction", "label"])
In [21]:
# Instânciação dos Objetos de Métrics
metrics = MulticlassMetrics(predictions.rdd)
# Estatísticas Gerais para o Método TrainValidationSplit
print("Summary Stats")
print("F1 Score = %s" % metrics.fMeasure())
print("Accuracy = %s" % metrics.accuracy)
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)