In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Random Forest Regression") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [3]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load("../data/WineData.csv",header=True);
In [4]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [5]:
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
In [6]:
data = transData(df)
In [7]:
data.show(3)
In [8]:
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
outputCol='indexedLabel').fit(data)
labelIndexer.transform(data).show(6)
In [9]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
outputCol="indexedFeatures", \
maxCategories=4).fit(data)
featureIndexer.transform(data).show(6)
In [10]:
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')
In [48]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
In [49]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree,labelConverter])
In [54]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
In [56]:
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
In [58]:
# Make predictions.
predictions = model.transform(testData)
In [59]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
In [66]:
# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()
In [67]:
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
predictionCol='prediction', metricName='f1')
In [68]:
# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=3)
In [69]:
# Train model. This also runs the indexers.
dTree_model = crossval.fit(data)
In [74]:
# Fetch best model
tree_model = dTree_model.bestModel
tree_model
Out[74]:
In [75]:
# Make predictions.
predictions = tree_model.transform(data)
In [76]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
In [ ]: