In [ ]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
Utility function to create the appropriate data frame for classification algorithms in MLlib
In [ ]:
def mapLibSVM(row):
return (row[5],Vectors.dense(row[:3]))
create the dataframe from a csv
In [ ]:
df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("datasets/iris.data")
Classification algorithms requires numeric values for labels
In [ ]:
indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
indexer = indexer.fit(df).transform(df)
indexer.show()
In [ ]:
dfLabeled = indexer.rdd.map(mapLibSVM).toDF(["label", "features"])
dfLabeled.show()
train, test = dfLabeled.randomSplit([0.9, 0.1], seed=12345)
schema verification
In [ ]:
train.printSchema()
Instantiate the Logistic Regression and the pipeline.
In [ ]:
lr = LogisticRegression(labelCol="label", maxIter=10)
We use a ParamGridBuilder to construct a grid of parameters to search over.
TrainValidationSplit will try all combinations of values and determine best model using the evaluator.
In [ ]:
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.1, 0.001]) \
.build()
In this case the estimator is simply the linear regression.
A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
In [ ]:
tvs = TrainValidationSplit(estimator=lr,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(),
# 80% of the data will be used for training, 20% for validation.
trainRatio=0.9)
Fit the pipeline to training documents.
In [ ]:
model = tvs.fit(train)
Compute the predictions from the model
In [ ]:
result = model.transform(test)
predictions = result.select(["prediction", "label"])
predictions.show()
In [ ]:
# Instantiate metrics object
metrics = MulticlassMetrics(predictions.rdd)
# Overall statistics
print("Summary Stats")
print("Precision = %s" % metrics.precision())
print("Recall = %s" % metrics.recall())
print("F1 Score = %s" % metrics.fMeasure())
print("Accuracy = %s" % metrics.accuracy)
In [ ]:
In [ ]:
# Weighted stats
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)
In [ ]: