In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Random Forest Regression") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [2]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load("./data/WineData.csv",header=True);
In [5]:
# Convert to float format
def string_to_float(x):
return float(x)
#
def condition(r):
if (0<= r <= 4):
label = "low"
elif(4< r <= 6):
label = "medium"
else:
label = "high"
return label
In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType
string_to_float_udf = udf(string_to_float, DoubleType())
quality_udf = udf(lambda x: condition(x), StringType())
In [8]:
df = df.withColumn("quality", quality_udf("quality"))
In [9]:
df.printSchema()
In [10]:
df.show(4)
In [11]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [12]:
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
In [13]:
data = transData(df)
In [14]:
data.show(3)
In [15]:
# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
outputCol='indexedLabel').fit(data)
labelIndexer.transform(data).show(6)
In [16]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
outputCol="indexedFeatures", \
maxCategories=4).fit(data)
featureIndexer.transform(data).show(6)
In [17]:
# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')
In [18]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
In [19]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree,labelConverter])
In [33]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.6, 0.4])
In [34]:
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
In [35]:
# Make predictions.
predictions = model.transform(testData)
In [36]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
In [37]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Predictions accuracy = %g, Test Error = %g" % (accuracy,(1.0 - accuracy)))
In [25]:
# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()
In [26]:
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
predictionCol='prediction', metricName='f1')
In [27]:
# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=3)
In [28]:
# Train model. This also runs the indexers.
dTree_model = crossval.fit(data)
In [29]:
# Fetch best model
tree_model = dTree_model.bestModel
tree_model
Out[29]:
In [30]:
# Make predictions.
predictions = tree_model.transform(data)
In [31]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
In [32]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Predictions accuracy = %g, Test Error = %g" % (accuracy,(1.0 - accuracy)))
In [ ]: