Create entry points to spark


In [97]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

In [95]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

load iris data


In [5]:
iris = spark.read.csv('data/iris.csv', header=True, inferSchema=True)

In [7]:
iris.show(5)


+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows


In [8]:
iris.dtypes


Out[8]:
[('sepal_length', 'double'),
 ('sepal_width', 'double'),
 ('petal_length', 'double'),
 ('petal_width', 'double'),
 ('species', 'string')]

In [12]:
iris.describe().show()


+-------+------------------+-------------------+------------------+------------------+---------+
|summary|      sepal_length|        sepal_width|      petal_length|       petal_width|  species|
+-------+------------------+-------------------+------------------+------------------+---------+
|  count|               150|                150|               150|               150|      150|
|   mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|     null|
| stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|     null|
|    min|               4.3|                2.0|               1.0|               0.1|   setosa|
|    max|               7.9|                4.4|               6.9|               2.5|virginica|
+-------+------------------+-------------------+------------------+------------------+---------+

Merge features to create a features column


In [15]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row

In [19]:
iris2 = iris.rdd.map(lambda x: Row(features=Vectors.dense(x[:-1]), species=x[-1])).toDF()
iris2.show(5)


+-----------------+-------+
|         features|species|
+-----------------+-------+
|[5.1,3.5,1.4,0.2]| setosa|
|[4.9,3.0,1.4,0.2]| setosa|
|[4.7,3.2,1.3,0.2]| setosa|
|[4.6,3.1,1.5,0.2]| setosa|
|[5.0,3.6,1.4,0.2]| setosa|
+-----------------+-------+
only showing top 5 rows

Index label column with StringIndexer

Import libraries


In [21]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

Build pipeline

Try to use pipeline whenever you can to get used to this format.


In [22]:
stringindexer = StringIndexer(inputCol='species', outputCol='label')
stages = [stringindexer]
pipeline = Pipeline(stages=stages)

Transform data


In [24]:
iris_df = pipeline.fit(iris2).transform(iris2)
iris_df.show(5)


+-----------------+-------+-----+
|         features|species|label|
+-----------------+-------+-----+
|[5.1,3.5,1.4,0.2]| setosa|  2.0|
|[4.9,3.0,1.4,0.2]| setosa|  2.0|
|[4.7,3.2,1.3,0.2]| setosa|  2.0|
|[4.6,3.1,1.5,0.2]| setosa|  2.0|
|[5.0,3.6,1.4,0.2]| setosa|  2.0|
+-----------------+-------+-----+
only showing top 5 rows

Check the data one more time


In [25]:
iris_df.describe().show(5)


+-------+---------+------------------+
|summary|  species|             label|
+-------+---------+------------------+
|  count|      150|               150|
|   mean|     null|               1.0|
| stddev|     null|0.8192319205190403|
|    min|   setosa|               0.0|
|    max|virginica|               2.0|
+-------+---------+------------------+


In [26]:
iris_df.dtypes


Out[26]:
[('features', 'vector'), ('species', 'string'), ('label', 'double')]

Naive Bayes classification

Split data into training and test sets


In [27]:
train, test = iris_df.randomSplit([0.8, 0.2], seed=1234)

Build cross-validation model

Estimator


In [29]:
from pyspark.ml.classification import NaiveBayes
naivebayes = NaiveBayes(featuresCol="features", labelCol="label")

Parameter grid


In [31]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(naivebayes.smoothing, [0, 1, 2, 4, 8]).\
    build()

Evaluator

There are three categories in the label column. Therefore, we use MulticlassClassificationEvaluator


In [67]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator()

Build cross-validation model


In [68]:
from pyspark.ml.tuning import CrossValidator
crossvalidator = CrossValidator(estimator=naivebayes, estimatorParamMaps=param_grid, evaluator=evaluator)

Fit cross-validation model


In [69]:
crossvalidation_mode = crossvalidator.fit(train)

Prediction on training and test sets


In [70]:
pred_train = crossvalidation_mode.transform(train)
pred_train.show(5)


+-----------------+-------+-----+--------------------+--------------------+----------+
|         features|species|label|       rawPrediction|         probability|prediction|
+-----------------+-------+-----+--------------------+--------------------+----------+
|[4.4,3.2,1.3,0.2]| setosa|  2.0|[-12.271684174579...|[0.19444573537358...|       2.0|
|[4.5,2.3,1.3,0.3]| setosa|  2.0|[-11.119812088601...|[0.27298108810790...|       2.0|
|[4.6,3.1,1.5,0.2]| setosa|  2.0|[-12.527795092630...|[0.21626072148339...|       2.0|
|[4.6,3.2,1.4,0.2]| setosa|  2.0|[-12.570525272901...|[0.19958486427181...|       2.0|
|[4.6,3.4,1.4,0.3]| setosa|  2.0|[-13.131900980855...|[0.19913306361405...|       2.0|
+-----------------+-------+-----+--------------------+--------------------+----------+
only showing top 5 rows


In [71]:
pred_test = crossvalidation_mode.transform(test)
pred_test.show(5)


+-----------------+-------+-----+--------------------+--------------------+----------+
|         features|species|label|       rawPrediction|         probability|prediction|
+-----------------+-------+-----+--------------------+--------------------+----------+
|[4.3,3.0,1.1,0.1]| setosa|  2.0|[-11.379239696581...|[0.17893625340183...|       2.0|
|[4.4,2.9,1.4,0.2]| setosa|  2.0|[-11.901296005920...|[0.22552241734351...|       2.0|
|[4.4,3.0,1.3,0.2]| setosa|  2.0|[-11.944026186191...|[0.20863892696497...|       2.0|
|[4.8,3.1,1.6,0.2]| setosa|  2.0|[-12.826636190952...|[0.22170131985173...|       2.0|
|[5.0,3.3,1.4,0.2]| setosa|  2.0|[-13.089838835897...|[0.18446024811785...|       2.0|
+-----------------+-------+-----+--------------------+--------------------+----------+
only showing top 5 rows

Best model from cross validation


In [72]:
print("The parameter smoothing has best value:",
      crossvalidation_mode.bestModel._java_obj.getSmoothing())


The parameter smoothing has best value: 4.0

Prediction accurary

Four accuracy matrices are avaiable for this evaluator.

  • f1
  • weightedPrecision
  • weightedRecall
  • accuracy
Prediction accuracy on training data

In [90]:
print('training data (f1):', evaluator.setMetricName('f1').evaluate(pred_train), "\n",
     'training data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_train),"\n",
     'training data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_train),"\n",
     'training data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_train))


training data (f1): 0.9682539682539681 
 training data (weightedPrecision):  0.9682539682539681 
 training data (weightedRecall):  0.9682539682539681 
 training data (accuracy):  0.9682539682539683
Prediction accuracy on test data

In [89]:
print('test data (f1):', evaluator.setMetricName('f1').evaluate(pred_test), "\n",
     'test data (weightedPrecision): ', evaluator.setMetricName('weightedPrecision').evaluate(pred_test),"\n",
     'test data (weightedRecall): ', evaluator.setMetricName('weightedRecall').evaluate(pred_test),"\n",
     'test data (accuracy): ', evaluator.setMetricName('accuracy').evaluate(pred_test))


test data (f1): 0.958119658119658 
 test data (weightedPrecision):  0.9635416666666667 
 test data (weightedRecall):  0.9583333333333334 
 test data (accuracy):  0.9583333333333334

Confusion matrix

Confusion matrix on training data


In [92]:
train_conf_mat = pred_train.select('label', 'prediction')
train_conf_mat.rdd.zipWithIndex().countByKey()


Out[92]:
defaultdict(int,
            {Row(label=0.0, prediction=0.0): 41,
             Row(label=0.0, prediction=1.0): 2,
             Row(label=1.0, prediction=0.0): 2,
             Row(label=1.0, prediction=1.0): 41,
             Row(label=2.0, prediction=2.0): 40})

Confusion matrix on test data


In [93]:
test_conf_mat = pred_test.select('label', 'prediction')
test_conf_mat.rdd.zipWithIndex().countByKey()


Out[93]:
defaultdict(int,
            {Row(label=0.0, prediction=0.0): 7,
             Row(label=1.0, prediction=0.0): 1,
             Row(label=1.0, prediction=1.0): 6,
             Row(label=2.0, prediction=2.0): 10})

From the confusion matrices on both training and test data, we can see that there are only a few mismatches between prediction and label values.


In [ ]: