In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Random Forest Regression") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').load("../data/WineData.csv",header=True);

In [4]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [5]:
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [6]:
data = transData(df)


/Users/mingchen/anaconda2/lib/python2.7/site-packages/pytz/__init__.py:29: UserWarning: Module argparse was already imported from /Users/mingchen/anaconda2/lib/python2.7/argparse.pyc, but /Users/mingchen/anaconda2/lib/python2.7/site-packages/argparse-1.4.0-py2.7.egg is being added to sys.path
  from pkg_resources import resource_stream

In [7]:
data.show(3)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...|    5|
|[7.8,0.88,0.0,2.6...|    5|
|[7.8,0.76,0.04,2....|    5|
+--------------------+-----+
only showing top 3 rows


In [8]:
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(data)
labelIndexer.transform(data).show(6)


+--------------------+-----+------------+
|            features|label|indexedLabel|
+--------------------+-----+------------+
|[7.4,0.7,0.0,1.9,...|    5|         0.0|
|[7.8,0.88,0.0,2.6...|    5|         0.0|
|[7.8,0.76,0.04,2....|    5|         0.0|
|[11.2,0.28,0.56,1...|    6|         1.0|
|[7.4,0.7,0.0,1.9,...|    5|         0.0|
|[7.4,0.66,0.0,1.8...|    5|         0.0|
+--------------------+-----+------------+
only showing top 6 rows


In [9]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
                              outputCol="indexedFeatures", \
                              maxCategories=4).fit(data)

featureIndexer.transform(data).show(6)


+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|[7.4,0.7,0.0,1.9,...|    5|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...|    5|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....|    5|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...|    6|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...|    5|[7.4,0.7,0.0,1.9,...|
|[7.4,0.66,0.0,1.8...|    5|[7.4,0.66,0.0,1.8...|
+--------------------+-----+--------------------+
only showing top 6 rows


In [12]:
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

In [13]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [14]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree,labelConverter])

In [15]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [16]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [17]:
# Make predictions.
predictions = model.transform(testData)

In [18]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)


+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[4.7,0.6,0.17,2.3...|    6|             6|
|[5.0,0.74,0.0,1.2...|    6|             6|
|[5.0,1.02,0.04,1....|    4|             5|
|[5.1,0.47,0.02,1....|    6|             6|
|[5.1,0.51,0.18,2....|    7|             7|
+--------------------+-----+--------------+
only showing top 5 rows

Cross-Validation


In [19]:
# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()

In [27]:
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
                                              predictionCol='prediction', metricName='f1')

In [28]:
# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [29]:
# Train model.  This also runs the indexers.
dTree_model = crossval.fit(data)

Fetch best model


In [30]:
# Fetch best model
tree_model = dTree_model.bestModel
tree_model


Out[30]:
PipelineModel_48d689a41ee635ca8083

In [37]:
# Make predictions.
predictions = tree_model.transform(data)
predictions_test = tree_model.transform(testData)

In [32]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)


+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[7.4,0.7,0.0,1.9,...|    5|             5|
|[7.8,0.88,0.0,2.6...|    5|             5|
|[7.8,0.76,0.04,2....|    5|             5|
|[11.2,0.28,0.56,1...|    6|             6|
|[7.4,0.7,0.0,1.9,...|    5|             5|
+--------------------+-----+--------------+
only showing top 5 rows


In [38]:
evaluator.evaluate(predictions)


Out[38]:
0.7549574920851149

In [39]:
evaluator.evaluate(predictions_test)


Out[39]:
0.7431072434683688

In [ ]: