In [1]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Random Forest Regression") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
In [14]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load("./data/WineData.csv",header=True);
In [15]:
df.printSchema()
In [16]:
# convert the data to dense vector
#def transData(row):
# return Row(label=row["quality"],
# features=Vectors.dense([row["fixed acidity"],
# row["volatile acidity"],
# row["citric acid"],
# row["residual sugar"],
# row["chlorides"],
# row["free sulfur dioxide"],
# row["total sulfur dioxide"],
# row["residual sugar"],
# row["density"],
# row["pH"],
# row["sulphates"],
# row["alcohol"]
# ]))
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
In [18]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
#transformed = df.rdd.map(transData).toDF()
transformed= transData(df)
transformed.show(6)
In [19]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.7, 0.3])
In [20]:
# Train a RandomForest model.
rf = RandomForestRegressor()
model = rf.fit(trainingData)
In [21]:
model.getNumTrees
Out[21]:
In [22]:
# Make predictions.
predictions = model.transform(testData)
In [23]:
predictions.show(10)
In [24]:
# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)
In [25]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
In [27]:
df = spark.read.format('com.databricks.spark.csv').\
options(header='true', \
inferschema='true').load("./data/WineData.csv",header=True);
In [28]:
df.printSchema()
In [33]:
# convert the data to dense vector
def transData(data):
return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])
In [34]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
data= transData(df)
data.show(6)
In [35]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
labelIndexer.transform(data).show(6)
In [36]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =VectorIndexer(inputCol="features", \
outputCol="indexedFeatures", \
maxCategories=4).fit(data)
featureIndexer.transform(data).show(6)
In [37]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = transformed.randomSplit([0.7, 0.3])
In [38]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
In [39]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
In [40]:
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
In [41]:
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
In [42]:
# Make predictions.
predictions = model.transform(testData)
In [43]:
# Select example rows to display.
predictions.select("features","label","predictedLabel").show(5)
In [44]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
In [45]:
rfModel = model.stages[2]
rfModel
Out[45]:
In [46]:
rfModel.trees
Out[46]:
In [ ]: