In [97]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')
In [95]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [5]:
iris = spark.read.csv('data/iris.csv', header=True, inferSchema=True)
In [7]:
iris.show(5)
In [8]:
iris.dtypes
Out[8]:
In [12]:
iris.describe().show()
In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
In [19]:
iris2 = iris.rdd.map(lambda x: Row(features=Vectors.dense(x[:-1]), species=x[-1])).toDF()
iris2.show(5)
In [21]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
In [22]:
stringindexer = StringIndexer(inputCol='species', outputCol='label')
stages = [stringindexer]
pipeline = Pipeline(stages=stages)
In [24]:
iris_df = pipeline.fit(iris2).transform(iris2)
iris_df.show(5)
In [25]:
iris_df.describe().show(5)
In [26]:
iris_df.dtypes
Out[26]:
In [27]:
train, test = iris_df.randomSplit([0.8, 0.2], seed=1234)
In [29]:
from pyspark.ml.classification import NaiveBayes
naivebayes = NaiveBayes(featuresCol="features", labelCol="label")
In [31]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
addGrid(naivebayes.smoothing, [0, 1, 2, 4, 8]).\
build()
In [67]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()
In [68]:
from pyspark.ml.tuning import CrossValidator
crossvalidator = CrossValidator(estimator=naivebayes, estimatorParamMaps=param_grid, evaluator=evaluator)
In [69]:
crossvalidation_mode = crossvalidator.fit(train)
In [70]:
pred_train = crossvalidation_mode.transform(train)
pred_train.show(5)
In [71]:
pred_test = crossvalidation_mode.transform(test)
pred_test.show(5)
In [72]:
print("The parameter smoothing has best value:",
crossvalidation_mode.bestModel._java_obj.getSmoothing())
In [90]:
print('training data (f1):', evaluator.setMetricName('f1').evaluate(pred_train), "\n",
'training data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_train),"\n",
'training data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_train),"\n",
'training data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_train))
In [89]:
print('test data (f1):', evaluator.setMetricName('f1').evaluate(pred_test), "\n",
'test data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_test),"\n",
'test data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_test),"\n",
'test data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_test))
In [92]:
train_conf_mat = pred_train.select('label', 'prediction')
train_conf_mat.rdd.zipWithIndex().countByKey()
Out[92]:
In [93]:
test_conf_mat = pred_test.select('label', 'prediction')
test_conf_mat.rdd.zipWithIndex().countByKey()
Out[93]:
From the confusion matrices on both training and test data, we can see that there are only a few mismatches between prediction and label values.
In [ ]: