The following example uses the (processed) Covertype dataset from UCI Machine Learning Repository.
It is a dataset with both categorical (wilderness_area and soil_type) and continuous (the rest) features. The target is the cover_type column:
In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
In [16]:
conf = SparkConf() #.set("spark.jars", "/Users/per0/wa/spark_wa/spark-tree-plotting/target/scala-2.11/spark-tree-plotting_0.2.jar")
sc = SparkContext.getOrCreate()
In [4]:
spark = SparkSession.builder.getOrCreate()
In [8]:
covertype_dataset = spark.read.parquet("covertype_dataset.snappy.parquet")
covertype_dataset.printSchema()
The 10 first rows:
In [20]:
covertype_dataset.limit(10).toPandas()
Out[20]:
In order for Spark's DecisionTreeClassifier to work with the categorical features (as well as the target), we first need to use pyspark.ml.feature.StringIndexers to generate a numeric representation for those columns:
In [9]:
from pyspark.ml.feature import StringIndexer
string_indexer_wilderness = StringIndexer(inputCol="wilderness_area", outputCol="wilderness_area_indexed")
string_indexer_soil = StringIndexer(inputCol="soil_type", outputCol="soil_type_indexed")
string_indexer_cover = StringIndexer(inputCol="cover_type", outputCol="cover_type_indexed")
To generate the new StringIndexerModels, we call .fit() on each StringIndexer instance:
In [10]:
string_indexer_wilderness_model = string_indexer_wilderness.fit(covertype_dataset)
string_indexer_soil_model = string_indexer_soil.fit(covertype_dataset)
string_indexer_cover_model = string_indexer_cover.fit(covertype_dataset)
And we create the new columns:
In [11]:
covertype_dataset_indexed_features = string_indexer_cover_model.transform(string_indexer_soil_model
.transform(string_indexer_wilderness_model
.transform(covertype_dataset)
)
)
New columns can be seen at the right:
In [9]:
covertype_dataset_indexed_features.limit(10).toPandas()
Out[9]:
Now, we just have to VectorAssemble our features to create the feature vector:
In [12]:
from pyspark.ml.feature import VectorAssembler
feature_columns = ["elevation",
"aspect",
"slope",
"horizontal_distance_to_hydrology",
"vertical_distance_to_hydrology",
"horizontal_distance_to_roadways",
"hillshade_9am",
"hillshade_noon",
"hillshade_3pm",
"horizontal_distance_to_fire_points",
"wilderness_area_indexed",
"soil_type_indexed"]
feature_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
And we have our dataset prepared for ML:
In [11]:
covertype_dataset_prepared = feature_assembler.transform(covertype_dataset_indexed_features)
In [12]:
covertype_dataset_prepared.printSchema()
Let's build a simple pyspark.ml.classification.DecisionTreeClassifier:
In [13]:
# from pyspark.ml.classification import DecisionTreeClassifier
# dtree = DecisionTreeClassifier(featuresCol="features",
# labelCol="cover_type_indexed",
# maxDepth=3,
# maxBins=50)
We fit it, and we get our DecisionTreeClassificationModel:
In [14]:
# dtree_model = dtree.fit(covertype_dataset_prepared)
# dtree_model
Out[14]:
In [1]:
from pyspark.ml.classification import DecisionTreeClassificationModel
In [5]:
dtree_model = DecisionTreeClassificationModel.load('tree_model')
The .toDebugString attribute prints the decision rules for the tree, but it is not very user-friendly:
In [6]:
print(dtree_model.toDebugString)
In [17]:
import sys
sys.path.insert(0, '/Users/per0/wa/spark_wa/spark-tree-plotting/python')
from spark_tree_plotting import plot_tree
tree_plot = plot_tree(dtree_model,
featureNames=feature_columns,
categoryNames={"wilderness_area_indexed":string_indexer_wilderness_model.labels,
"soil_type_indexed":string_indexer_soil_model.labels},
classNames=string_indexer_cover_model.labels,
filled=True, # With color!
roundedCorners=True, # Rounded corners in the nodes
roundLeaves=True # Leaves will be ellipses instead of rectangles
)
In [18]:
json_tree = sc._jvm.com.vfive.spark.ml.SparkMLTree(dtree_model._java_obj)
In [19]:
print(json_tree.toJsonPlotFormat())
In [ ]:
from IPython.display import Image
Image(tree_plot)