Example: Covertype Data Set

The following example uses the (processed) Covertype dataset from UCI Machine Learning Repository.

It is a dataset with both categorical (wilderness_area and soil_type) and continuous (the rest) features. The target is the cover_type column:


In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

In [16]:
conf = SparkConf() #.set("spark.jars", "/Users/per0/wa/spark_wa/spark-tree-plotting/target/scala-2.11/spark-tree-plotting_0.2.jar")

sc = SparkContext.getOrCreate()

In [4]:
spark = SparkSession.builder.getOrCreate()

In [8]:
covertype_dataset = spark.read.parquet("covertype_dataset.snappy.parquet")

covertype_dataset.printSchema()


root
 |-- elevation: long (nullable = true)
 |-- aspect: long (nullable = true)
 |-- slope: long (nullable = true)
 |-- horizontal_distance_to_hydrology: long (nullable = true)
 |-- vertical_distance_to_hydrology: long (nullable = true)
 |-- horizontal_distance_to_roadways: long (nullable = true)
 |-- hillshade_9am: long (nullable = true)
 |-- hillshade_noon: long (nullable = true)
 |-- hillshade_3pm: long (nullable = true)
 |-- horizontal_distance_to_fire_points: long (nullable = true)
 |-- wilderness_area: string (nullable = true)
 |-- soil_type: string (nullable = true)
 |-- cover_type: string (nullable = true)

The 10 first rows:


In [20]:
covertype_dataset.limit(10).toPandas()


Out[20]:
elevation aspect slope horizontal_distance_to_hydrology vertical_distance_to_hydrology horizontal_distance_to_roadways hillshade_9am hillshade_noon hillshade_3pm horizontal_distance_to_fire_points wilderness_area soil_type cover_type
0 2596 51 3 258 0 510 221 232 148 6279 rawah wilderness area Soil_Type_7745 Aspen
1 2590 56 2 212 -6 390 220 235 151 6225 rawah wilderness area Soil_Type_7745 Aspen
2 2804 139 9 268 65 3180 234 238 135 6121 rawah wilderness area Soil_Type_4744 Lodgepole Pine
3 2785 155 18 242 118 3090 238 238 122 6211 rawah wilderness area Soil_Type_7746 Lodgepole Pine
4 2595 45 2 153 -1 391 220 234 150 6172 rawah wilderness area Soil_Type_7745 Aspen
5 2579 132 6 300 -15 67 230 237 140 6031 rawah wilderness area Soil_Type_7745 Lodgepole Pine
6 2606 45 7 270 5 633 222 225 138 6256 rawah wilderness area Soil_Type_7745 Aspen
7 2605 49 4 234 7 573 222 230 144 6228 rawah wilderness area Soil_Type_7745 Aspen
8 2617 45 9 240 56 666 223 221 133 6244 rawah wilderness area Soil_Type_7745 Aspen
9 2612 59 10 247 11 636 228 219 124 6230 rawah wilderness area Soil_Type_7745 Aspen

In order for Spark's DecisionTreeClassifier to work with the categorical features (as well as the target), we first need to use pyspark.ml.feature.StringIndexers to generate a numeric representation for those columns:


In [9]:
from pyspark.ml.feature import StringIndexer

string_indexer_wilderness = StringIndexer(inputCol="wilderness_area", outputCol="wilderness_area_indexed")

string_indexer_soil = StringIndexer(inputCol="soil_type", outputCol="soil_type_indexed")
                                    
string_indexer_cover = StringIndexer(inputCol="cover_type", outputCol="cover_type_indexed")

To generate the new StringIndexerModels, we call .fit() on each StringIndexer instance:


In [10]:
string_indexer_wilderness_model = string_indexer_wilderness.fit(covertype_dataset)

string_indexer_soil_model = string_indexer_soil.fit(covertype_dataset)

string_indexer_cover_model = string_indexer_cover.fit(covertype_dataset)

And we create the new columns:


In [11]:
covertype_dataset_indexed_features = string_indexer_cover_model.transform(string_indexer_soil_model
                                                                          .transform(string_indexer_wilderness_model
                                                                                     .transform(covertype_dataset)
                                                                                    )
                                                                         )

New columns can be seen at the right:


In [9]:
covertype_dataset_indexed_features.limit(10).toPandas()


Out[9]:
elevation aspect slope horizontal_distance_to_hydrology vertical_distance_to_hydrology horizontal_distance_to_roadways hillshade_9am hillshade_noon hillshade_3pm horizontal_distance_to_fire_points wilderness_area soil_type cover_type wilderness_area_indexed soil_type_indexed cover_type_indexed
0 2596 51 3 258 0 510 221 232 148 6279 rawah wilderness area Soil_Type_7745 Aspen 0.0 0.0 5.0
1 2590 56 2 212 -6 390 220 235 151 6225 rawah wilderness area Soil_Type_7745 Aspen 0.0 0.0 5.0
2 2804 139 9 268 65 3180 234 238 135 6121 rawah wilderness area Soil_Type_4744 Lodgepole Pine 0.0 7.0 0.0
3 2785 155 18 242 118 3090 238 238 122 6211 rawah wilderness area Soil_Type_7746 Lodgepole Pine 0.0 6.0 0.0
4 2595 45 2 153 -1 391 220 234 150 6172 rawah wilderness area Soil_Type_7745 Aspen 0.0 0.0 5.0
5 2579 132 6 300 -15 67 230 237 140 6031 rawah wilderness area Soil_Type_7745 Lodgepole Pine 0.0 0.0 0.0
6 2606 45 7 270 5 633 222 225 138 6256 rawah wilderness area Soil_Type_7745 Aspen 0.0 0.0 5.0
7 2605 49 4 234 7 573 222 230 144 6228 rawah wilderness area Soil_Type_7745 Aspen 0.0 0.0 5.0
8 2617 45 9 240 56 666 223 221 133 6244 rawah wilderness area Soil_Type_7745 Aspen 0.0 0.0 5.0
9 2612 59 10 247 11 636 228 219 124 6230 rawah wilderness area Soil_Type_7745 Aspen 0.0 0.0 5.0

Now, we just have to VectorAssemble our features to create the feature vector:


In [12]:
from pyspark.ml.feature import VectorAssembler

feature_columns = ["elevation",
                   "aspect",
                   "slope",
                   "horizontal_distance_to_hydrology",
                   "vertical_distance_to_hydrology",
                   "horizontal_distance_to_roadways",
                   "hillshade_9am",
                   "hillshade_noon",
                   "hillshade_3pm",
                   "horizontal_distance_to_fire_points",
                   "wilderness_area_indexed",
                   "soil_type_indexed"]

feature_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

And we have our dataset prepared for ML:


In [11]:
covertype_dataset_prepared = feature_assembler.transform(covertype_dataset_indexed_features)

In [12]:
covertype_dataset_prepared.printSchema()


root
 |-- elevation: long (nullable = true)
 |-- aspect: long (nullable = true)
 |-- slope: long (nullable = true)
 |-- horizontal_distance_to_hydrology: long (nullable = true)
 |-- vertical_distance_to_hydrology: long (nullable = true)
 |-- horizontal_distance_to_roadways: long (nullable = true)
 |-- hillshade_9am: long (nullable = true)
 |-- hillshade_noon: long (nullable = true)
 |-- hillshade_3pm: long (nullable = true)
 |-- horizontal_distance_to_fire_points: long (nullable = true)
 |-- wilderness_area: string (nullable = true)
 |-- soil_type: string (nullable = true)
 |-- cover_type: string (nullable = true)
 |-- wilderness_area_indexed: double (nullable = false)
 |-- soil_type_indexed: double (nullable = false)
 |-- cover_type_indexed: double (nullable = false)
 |-- features: vector (nullable = true)

Let's build a simple pyspark.ml.classification.DecisionTreeClassifier:


In [13]:
# from pyspark.ml.classification import DecisionTreeClassifier

# dtree = DecisionTreeClassifier(featuresCol="features",
#                                labelCol="cover_type_indexed",
#                                maxDepth=3,
#                                maxBins=50)

We fit it, and we get our DecisionTreeClassificationModel:


In [14]:
# dtree_model = dtree.fit(covertype_dataset_prepared)

# dtree_model


Out[14]:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_99684a674979) of depth 3 with 11 nodes

In [1]:
from pyspark.ml.classification import DecisionTreeClassificationModel

In [5]:
dtree_model = DecisionTreeClassificationModel.load('tree_model')

The .toDebugString attribute prints the decision rules for the tree, but it is not very user-friendly:


In [6]:
print(dtree_model.toDebugString)


DecisionTreeClassificationModel (uid=DecisionTreeClassifier_f84d275537cd) of depth 3 with 7 nodes
  If (feature 0 <= 3050.5)
   If (feature 0 <= 2540.5)
    If (feature 10 in {0.0})
     Predict: 0.0
    Else (feature 10 not in {0.0})
     Predict: 2.0
   Else (feature 0 > 2540.5)
    Predict: 0.0
  Else (feature 0 > 3050.5)
   Predict: 1.0

Perhaps spark_tree_plotting may be helpful here ;)


In [17]:
import sys
sys.path.insert(0, '/Users/per0/wa/spark_wa/spark-tree-plotting/python')
from spark_tree_plotting import plot_tree

tree_plot = plot_tree(dtree_model,
                      featureNames=feature_columns,
                      categoryNames={"wilderness_area_indexed":string_indexer_wilderness_model.labels,
                                     "soil_type_indexed":string_indexer_soil_model.labels},
                      classNames=string_indexer_cover_model.labels,
                      filled=True,          # With color!
                      roundedCorners=True,  # Rounded corners in the nodes
                      roundLeaves=True      # Leaves will be ellipses instead of rectangles
                     )


---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-17-89ad5e632136> in <module>
     10                       filled=True,          # With color!
     11                       roundedCorners=True,  # Rounded corners in the nodes
---> 12                       roundLeaves=True      # Leaves will be ellipses instead of rectangles
     13                      )

~/wa/spark_wa/spark-tree-plotting/python/spark_tree_plotting.py in plot_tree(DecisionTreeClassificationModel, featureNames, categoryNames, classNames, filled, roundedCorners, roundLeaves)
    434                                                filled=filled,
    435                                                roundedCorners=roundedCorners,
--> 436                                                roundLeaves=roundLeaves
    437                                                )
    438                                )

/usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pydot/__init__.py in graph_from_dot_data(data)
    216     """
    217 
--> 218     return dot_parser.parse_dot_data(data)
    219 
    220 

/usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pydot/dot_parser.py in parse_dot_data(data)
    520 
    521         tokens = graphparser.parseString(data)
--> 522 
    523         if len(tokens) == 1:
    524             return tokens[0]

/usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pyparsing.py in parseString(self, instring, parseAll)
   1816             instring = instring.expandtabs()
   1817         try:
-> 1818             loc, tokens = self._parse( instring, 0 )
   1819             if parseAll:
   1820                 loc = self.preParse( instring, loc )

/usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pyparsing.py in _parseNoCache(self, instring, loc, doActions, callPreParse)
   1593                 for fn in self.parseAction:
   1594                     try:
-> 1595                         tokens = fn( instring, tokensStart, retTokens )
   1596                     except IndexError as parse_action_exc:
   1597                         exc = ParseException("exception raised in parse action")

/usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pyparsing.py in wrapper(*args)
   1215         while 1:
   1216             try:
-> 1217                 ret = func(*args[limit[0]:])
   1218                 foundArity[0] = True
   1219                 return ret

/usr/local/anaconda3/envs/an37/lib/python3.7/site-packages/pydot/dot_parser.py in push_top_graph_stmt(str, loc, toks)
     79         if( isinstance(element, (ParseResults, tuple, list)) and
     80             len(element) == 1 and isinstance(element[0], str) ):
---> 81 
     82             element = element[0]
     83 

TypeError: isinstance() arg 2 must be a type or tuple of types

In [18]:
json_tree = sc._jvm.com.vfive.spark.ml.SparkMLTree(dtree_model._java_obj)

In [19]:
print(json_tree.toJsonPlotFormat())


{
  "featureIndex":0,
  "gain":0.08681394658400207,
  "impurity":0.6230942824070332,
  "threshold":3050.5,
  "nodeType":"internal",
  "splitType":"continuous",
  "prediction":0.0,
  "leftChild":{
    "featureIndex":0,
    "gain":0.08616165361635758,
    "impurity":0.5539261911259398,
    "threshold":2540.5,
    "nodeType":"internal",
    "splitType":"continuous",
    "prediction":0.0,
    "leftChild":{
      "featureIndex":10,
      "gain":0.04640621444482429,
      "impurity":0.6171371727013576,
      "nodeType":"internal",
      "splitType":"categorical",
      "leftCategories":[
        0.0
      ],
      "rightCategories":[
        1.0,
        2.0,
        3.0
      ],
      "prediction":2.0,
      "leftChild":{
        "impurity":0.18642232564845895,
        "nodeType":"leaf",
        "prediction":0.0
      },
      "rightChild":{
        "impurity":0.5893401621499551,
        "nodeType":"leaf",
        "prediction":2.0
      }
    },
    "rightChild":{
      "impurity":0.4430125702798494,
      "nodeType":"leaf",
      "prediction":0.0
    }
  },
  "rightChild":{
    "impurity":0.5109863148417016,
    "nodeType":"leaf",
    "prediction":1.0
  }
}

In [ ]:
from IPython.display import Image

Image(tree_plot)