Pyspark Random Forest Regression

1. Set up spark context and SparkSession


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Random Forest Regression") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

2. load dataset


In [14]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').load("./data/WineData.csv",header=True);

In [15]:
df.printSchema()


root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)


In [16]:
# convert the data to dense vector
#def transData(row):
#    return Row(label=row["quality"],
#               features=Vectors.dense([row["fixed acidity"],
#                                       row["volatile acidity"],
#                                       row["citric acid"],
#                                       row["residual sugar"],
#                                       row["chlorides"], 
#                                       row["free sulfur dioxide"],
#                                       row["total sulfur dioxide"],
#                                       row["residual sugar"],
#                                       row["density"],                                          
#                                       row["pH"],
#                                       row["sulphates"],
#                                       row["alcohol"]
#                                      ]))
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [18]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

#transformed = df.rdd.map(transData).toDF() 
transformed= transData(df)
transformed.show(6)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...|    5|
|[7.8,0.88,0.0,2.6...|    5|
|[7.8,0.76,0.04,2....|    5|
|[11.2,0.28,0.56,1...|    6|
|[7.4,0.7,0.0,1.9,...|    5|
|[7.4,0.66,0.0,1.8...|    5|
+--------------------+-----+
only showing top 6 rows


In [19]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.7, 0.3])

In [20]:
# Train a RandomForest model.
rf = RandomForestRegressor()
model = rf.fit(trainingData)

In [21]:
model.getNumTrees


Out[21]:
20

In [22]:
# Make predictions.
predictions = model.transform(testData)

In [23]:
predictions.show(10)


+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|[4.9,0.42,0.0,2.1...|    7| 6.489667556875804|
|[5.1,0.42,0.0,1.8...|    7| 6.267301910170284|
|[5.1,0.585,0.0,1....|    7|6.0526786505470245|
|[5.2,0.32,0.25,1....|    5| 5.257985010985523|
|[5.2,0.48,0.04,1....|    7| 5.943264423589821|
|[5.2,0.645,0.0,2....|    6| 5.909094836353475|
|[5.3,0.47,0.11,2....|    7|  6.41491729478567|
|[5.4,0.74,0.0,1.2...|    6| 5.855394156577842|
|[5.4,0.835,0.08,1...|    7| 5.989289110209313|
|[5.5,0.49,0.03,1....|    8| 6.245232161181737|
+--------------------+-----+------------------+
only showing top 10 rows


In [24]:
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)


+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 6.489667556875804|    7|[4.9,0.42,0.0,2.1...|
| 6.267301910170284|    7|[5.1,0.42,0.0,1.8...|
|6.0526786505470245|    7|[5.1,0.585,0.0,1....|
| 5.257985010985523|    5|[5.2,0.32,0.25,1....|
| 5.943264423589821|    7|[5.2,0.48,0.04,1....|
+------------------+-----+--------------------+
only showing top 5 rows


In [25]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)


Root Mean Squared Error (RMSE) on test data = 0.659148

Pyspark Random Forest Classifier


In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [27]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').load("./data/WineData.csv",header=True);

In [28]:
df.printSchema()


root
 |-- fixed acidity: double (nullable = true)
 |-- volatile acidity: double (nullable = true)
 |-- citric acid: double (nullable = true)
 |-- residual sugar: double (nullable = true)
 |-- chlorides: double (nullable = true)
 |-- free sulfur dioxide: double (nullable = true)
 |-- total sulfur dioxide: double (nullable = true)
 |-- density: double (nullable = true)
 |-- pH: double (nullable = true)
 |-- sulphates: double (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- quality: integer (nullable = true)


In [33]:
# convert the data to dense vector
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [34]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

data= transData(df)
data.show(6)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...|    5|
|[7.8,0.88,0.0,2.6...|    5|
|[7.8,0.76,0.04,2....|    5|
|[11.2,0.28,0.56,1...|    6|
|[7.4,0.7,0.0,1.9,...|    5|
|[7.4,0.66,0.0,1.8...|    5|
+--------------------+-----+
only showing top 6 rows


In [35]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
labelIndexer.transform(data).show(6)


+--------------------+-----+------------+
|            features|label|indexedLabel|
+--------------------+-----+------------+
|[7.4,0.7,0.0,1.9,...|    5|         0.0|
|[7.8,0.88,0.0,2.6...|    5|         0.0|
|[7.8,0.76,0.04,2....|    5|         0.0|
|[11.2,0.28,0.56,1...|    6|         1.0|
|[7.4,0.7,0.0,1.9,...|    5|         0.0|
|[7.4,0.66,0.0,1.8...|    5|         0.0|
+--------------------+-----+------------+
only showing top 6 rows


In [36]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
                              outputCol="indexedFeatures", \
                              maxCategories=4).fit(data)

featureIndexer.transform(data).show(6)


+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|[7.4,0.7,0.0,1.9,...|    5|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...|    5|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....|    5|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...|    6|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...|    5|[7.4,0.7,0.0,1.9,...|
|[7.4,0.66,0.0,1.8...|    5|[7.4,0.66,0.0,1.8...|
+--------------------+-----+--------------------+
only showing top 6 rows


In [37]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.7, 0.3])

In [38]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

In [39]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [40]:
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

In [41]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [42]:
# Make predictions.
predictions = model.transform(testData)

In [43]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)


+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[4.7,0.6,0.17,2.3...|    6|             5|
|[5.0,1.02,0.04,1....|    4|             5|
|[5.1,0.42,0.0,1.8...|    7|             6|
|[5.1,0.47,0.02,1....|    6|             6|
|[5.2,0.32,0.25,1....|    5|             5|
+--------------------+-----+--------------+
only showing top 5 rows


In [44]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))


Test Error = 0.402083

In [45]:
rfModel = model.stages[2]
rfModel


Out[45]:
RandomForestClassificationModel (uid=rfc_d3482d5f110a) with 10 trees

In [46]:
rfModel.trees


Out[46]:
[DecisionTreeClassificationModel (uid=dtc_83a486d1ea3f) of depth 5 with 59 nodes,
 DecisionTreeClassificationModel (uid=dtc_ecc86f56fd85) of depth 5 with 57 nodes,
 DecisionTreeClassificationModel (uid=dtc_c808860908b2) of depth 5 with 53 nodes,
 DecisionTreeClassificationModel (uid=dtc_9cc34e2a2a6f) of depth 5 with 61 nodes,
 DecisionTreeClassificationModel (uid=dtc_67e03b4e5ce4) of depth 5 with 61 nodes,
 DecisionTreeClassificationModel (uid=dtc_1d972cb63e4e) of depth 5 with 55 nodes,
 DecisionTreeClassificationModel (uid=dtc_8d0e0c19f6fa) of depth 5 with 59 nodes,
 DecisionTreeClassificationModel (uid=dtc_650db857e494) of depth 5 with 61 nodes,
 DecisionTreeClassificationModel (uid=dtc_cc2e16db50a1) of depth 5 with 61 nodes,
 DecisionTreeClassificationModel (uid=dtc_14ac6d89880a) of depth 5 with 59 nodes]

In [ ]: