Decision Tree Classification


Create entry points to spark


In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

Decision tree classification with pyspark

Import data


In [2]:
cuse = spark.read.csv('data/cuse_binary.csv', header=True, inferSchema=True)
cuse.show(5)


+---+---------+---------+---+
|age|education|wantsMore|  y|
+---+---------+---------+---+
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
+---+---------+---------+---+
only showing top 5 rows

Process categorical columns

The following code does three things with pipeline:

  • StringIndexer all categorical columns
  • OneHotEncoder all categorical index columns
  • VectorAssembler all feature columns into one vector column

Categorical columns


In [3]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# categorical columns
categorical_columns = cuse.columns[0:3]

Build StringIndexer stages


In [4]:
stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]
# encode label column and add it to stringindexer_stages
stringindexer_stages += [StringIndexer(inputCol='y', outputCol='label')]

Build OneHotEncoder stages


In [5]:
onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]

Build VectorAssembler stage


In [6]:
feature_columns = ['onehot_' + c for c in categorical_columns]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

Build pipeline model


In [7]:
# all stages
all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)

Fit pipeline model


In [8]:
pipeline_model = pipeline.fit(cuse)

Transform data


In [9]:
final_columns = feature_columns + ['features', 'label']
cuse_df = pipeline_model.transform(cuse).\
            select(final_columns)
            
cuse_df.show(5)


+-------------+----------------+----------------+-------------------+-----+
|   onehot_age|onehot_education|onehot_wantsMore|           features|label|
+-------------+----------------+----------------+-------------------+-----+
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|
+-------------+----------------+----------------+-------------------+-----+
only showing top 5 rows

Split data into training and test datasets


In [10]:
training, test = cuse_df.randomSplit([0.8, 0.2], seed=1234)

Build cross-validation model

Estimator


In [11]:
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

Parameter grid


In [12]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(dt.maxDepth, [2,3,4,5]).\
    build()

Evaluator


In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")

Build cross-validation model


In [14]:
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

Fit cross-validation mode


In [15]:
cv_model = cv.fit(cuse_df)

Prediction


In [16]:
show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']
Prediction on training data

In [17]:
pred_training_cv = cv_model.transform(training)
pred_training_cv.select(show_columns).show(5, truncate=False)


+---------+-----+----------+-------------+----------------------------------------+
|features |label|prediction|rawPrediction|probability                             |
+---------+-----+----------+-------------+----------------------------------------+
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
+---------+-----+----------+-------------+----------------------------------------+
only showing top 5 rows

Prediction on test data

In [18]:
pred_test_cv = cv_model.transform(test)
pred_test_cv.select(show_columns).show(5, truncate=False)


+---------+-----+----------+-------------+----------------------------------------+
|features |label|prediction|rawPrediction|probability                             |
+---------+-----+----------+-------------+----------------------------------------+
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|
+---------+-----+----------+-------------+----------------------------------------+
only showing top 5 rows

Confusion matrix

Pyspark doesn’t have a function to calculate the confusion matrix automatically, but we can still easily get a confusion matrix with a combination use of several methods from the RDD class.


In [19]:
label_and_pred = cv_model.transform(cuse_df).select('label', 'prediction')
label_and_pred.rdd.zipWithIndex().countByKey()


Out[19]:
defaultdict(int,
            {Row(label=0.0, prediction=0.0): 897,
             Row(label=0.0, prediction=1.0): 203,
             Row(label=1.0, prediction=0.0): 270,
             Row(label=1.0, prediction=1.0): 237})

Parameters from the best model


In [23]:
print('The best MaxDepth is:', cv_model.bestModel._java_obj.getMaxDepth())


The best MaxDepth is: 3

Decision tree classification with R

Load R libraries

library(rpart)
library(caret)

Import data

cuse_binary = read.csv('data/cuse_binary.csv', header = TRUE)

Encode categorical variables

cuse_binary$age = factor(cuse_binary$age, 
                         levels = names(sort(table(cuse_binary$age), decreasing = TRUE)))
cuse_binary$education = factor(cuse_binary$education,
                               levels = names(sort(table(cuse_binary$education), decreasing = TRUE)))
cuse_binary$wantsMore = factor(cuse_binary$wantsMore,
                               levels = names(sort(table(cuse_binary$wantsMore), decreasing = TRUE)))


cuse_binary$y = factor(cuse_binary$y,
                               levels = names(sort(table(cuse_binary$y))))

Fit decision tree model

dt_fit = rpart(y ~ age + education + wantsMore, 
               data = cuse_binary, method = 'class')

Confusion matrix

pred_y = predict(dt_fit, type = 'class')
confusionMatrix(data = pred_y, reference = cuse_binary$y)

Result

Confusion Matrix and Statistics

          Reference
Prediction   1   0
         1 237 203
         0 270 897

               Accuracy : 0.7057          
                 95% CI : (0.6827, 0.7279)
    No Information Rate : 0.6845          
    P-Value [Acc > NIR] : 0.035460        

                  Kappa : 0.2934          
 Mcnemar's Test P-Value : 0.002408        

            Sensitivity : 0.4675          
            Specificity : 0.8155          
         Pos Pred Value : 0.5386          
         Neg Pred Value : 0.7686          
             Prevalence : 0.3155          
         Detection Rate : 0.1475          
   Detection Prevalence : 0.2738          
      Balanced Accuracy : 0.6415          

       'Positive' Class : 1

In [ ]: