Decision Tree Classification


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Random Forest Regression") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [2]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').load("./data/WineData.csv",header=True);

In [5]:
# Convert to float format
def string_to_float(x):
    return float(x)

# 
def condition(r):
    if (0<= r <= 4):
        label = "low" 
    elif(4< r <= 6):
        label = "medium"
    else: 
        label = "high" 
    return label

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: condition(x), StringType())

In [8]:
df = df.withColumn("quality", quality_udf("quality"))

In [9]:
df.printSchema()


root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: string (nullable = true)


In [10]:
df.show(4)


+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|   high|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|   high|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|   high|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|                60.0|  0.998|3.16|     0.58|    9.8|   high|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
only showing top 4 rows


In [11]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [12]:
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [13]:
data = transData(df)

In [14]:
data.show(3)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...| high|
|[7.8,0.88,0.0,2.6...| high|
|[7.8,0.76,0.04,2....| high|
+--------------------+-----+
only showing top 3 rows


In [15]:
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(data)
labelIndexer.transform(data).show(6)


+--------------------+-----+------------+
|            features|label|indexedLabel|
+--------------------+-----+------------+
|[7.4,0.7,0.0,1.9,...| high|         0.0|
|[7.8,0.88,0.0,2.6...| high|         0.0|
|[7.8,0.76,0.04,2....| high|         0.0|
|[11.2,0.28,0.56,1...| high|         0.0|
|[7.4,0.7,0.0,1.9,...| high|         0.0|
|[7.4,0.66,0.0,1.8...| high|         0.0|
+--------------------+-----+------------+
only showing top 6 rows


In [16]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
                              outputCol="indexedFeatures", \
                              maxCategories=4).fit(data)

featureIndexer.transform(data).show(6)


+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|[7.4,0.7,0.0,1.9,...| high|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...| high|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....| high|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...| high|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...| high|[7.4,0.7,0.0,1.9,...|
|[7.4,0.66,0.0,1.8...| high|[7.4,0.66,0.0,1.8...|
+--------------------+-----+--------------------+
only showing top 6 rows


In [17]:
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

In [18]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [19]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree,labelConverter])

In [33]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])

In [34]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [35]:
# Make predictions.
predictions = model.transform(testData)

In [36]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)


+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[4.6,0.52,0.15,2....| high|          high|
|[4.9,0.42,0.0,2.1...| high|          high|
|[5.0,0.4,0.5,4.3,...| high|          high|
|[5.0,0.42,0.24,2....| high|          high|
|[5.0,1.02,0.04,1....| high|          high|
+--------------------+-----+--------------+
only showing top 5 rows


In [37]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Predictions accuracy = %g, Test Error = %g" % (accuracy,(1.0 - accuracy)))


Predictions accuracy = 1, Test Error = 0

Cross-Validation


In [25]:
# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()

In [26]:
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
                                              predictionCol='prediction', metricName='f1')

In [27]:
# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [28]:
# Train model.  This also runs the indexers.
dTree_model = crossval.fit(data)

Fetch best model


In [29]:
# Fetch best model
tree_model = dTree_model.bestModel
tree_model


Out[29]:
PipelineModel_42b58bb72a70d7326481

In [30]:
# Make predictions.
predictions = tree_model.transform(data)

In [31]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)


+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[7.4,0.7,0.0,1.9,...| high|          high|
|[7.8,0.88,0.0,2.6...| high|          high|
|[7.8,0.76,0.04,2....| high|          high|
|[11.2,0.28,0.56,1...| high|          high|
|[7.4,0.7,0.0,1.9,...| high|          high|
+--------------------+-----+--------------+
only showing top 5 rows


In [32]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Predictions accuracy = %g, Test Error = %g" % (accuracy,(1.0 - accuracy)))


Predictions accuracy = 1, Test Error = 0

In [ ]: