In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark Random Forest Regression") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
df = spark.read.format('com.databricks.spark.csv').\
                               options(header='true', \
                               inferschema='true').load("../data/WineData.csv",header=True);

In [4]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [5]:
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [6]:
data = transData(df)


/Users/mingchen/anaconda2/lib/python2.7/site-packages/pytz/__init__.py:29: UserWarning: Module argparse was already imported from /Users/mingchen/anaconda2/lib/python2.7/argparse.pyc, but /Users/mingchen/anaconda2/lib/python2.7/site-packages/argparse-1.4.0-py2.7.egg is being added to sys.path
  from pkg_resources import resource_stream

In [7]:
data.show(3)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...|    5|
|[7.8,0.88,0.0,2.6...|    5|
|[7.8,0.76,0.04,2....|    5|
+--------------------+-----+
only showing top 3 rows


In [8]:
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(data)
labelIndexer.transform(data).show(6)


+--------------------+-----+------------+
|            features|label|indexedLabel|
+--------------------+-----+------------+
|[7.4,0.7,0.0,1.9,...|    5|         0.0|
|[7.8,0.88,0.0,2.6...|    5|         0.0|
|[7.8,0.76,0.04,2....|    5|         0.0|
|[11.2,0.28,0.56,1...|    6|         1.0|
|[7.4,0.7,0.0,1.9,...|    5|         0.0|
|[7.4,0.66,0.0,1.8...|    5|         0.0|
+--------------------+-----+------------+
only showing top 6 rows


In [9]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
                              outputCol="indexedFeatures", \
                              maxCategories=4).fit(data)

featureIndexer.transform(data).show(6)


+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|[7.4,0.7,0.0,1.9,...|    5|[7.4,0.7,0.0,1.9,...|
|[7.8,0.88,0.0,2.6...|    5|[7.8,0.88,0.0,2.6...|
|[7.8,0.76,0.04,2....|    5|[7.8,0.76,0.04,2....|
|[11.2,0.28,0.56,1...|    6|[11.2,0.28,0.56,1...|
|[7.4,0.7,0.0,1.9,...|    5|[7.4,0.7,0.0,1.9,...|
|[7.4,0.66,0.0,1.8...|    5|[7.4,0.66,0.0,1.8...|
+--------------------+-----+--------------------+
only showing top 6 rows


In [10]:
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

In [48]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [49]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree,labelConverter])

In [54]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [56]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

In [58]:
# Make predictions.
predictions = model.transform(testData)

In [59]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)


+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[5.0,0.38,0.01,1....|    6|             6|
|[5.0,0.4,0.5,4.3,...|    6|             6|
|[5.0,0.74,0.0,1.2...|    6|             6|
|[5.0,1.04,0.24,1....|    5|             7|
|[5.1,0.42,0.0,1.8...|    7|             6|
+--------------------+-----+--------------+
only showing top 5 rows

Cross-Validation


In [66]:
# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()

In [67]:
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
                                              predictionCol='prediction', metricName='f1')

In [68]:
# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

In [69]:
# Train model.  This also runs the indexers.
dTree_model = crossval.fit(data)

Fetch best model


In [74]:
# Fetch best model
tree_model = dTree_model.bestModel
tree_model


Out[74]:
PipelineModel_4224923515634da2a63b

In [75]:
# Make predictions.
predictions = tree_model.transform(data)

In [76]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)


+--------------------+-----+--------------+
|            features|label|predictedLabel|
+--------------------+-----+--------------+
|[7.4,0.7,0.0,1.9,...|    5|             5|
|[7.8,0.88,0.0,2.6...|    5|             5|
|[7.8,0.76,0.04,2....|    5|             5|
|[11.2,0.28,0.56,1...|    6|             6|
|[7.4,0.7,0.0,1.9,...|    5|             5|
+--------------------+-----+--------------+
only showing top 5 rows


In [ ]: