In [1]:
import findspark
findspark.init()
import pyspark
sc = pyspark.SparkContext()
In [2]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from string import split,strip
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel, RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
Classify geographical locations according to their predicted tree cover:
In [3]:
#define a dictionary of cover types
CoverTypes={1.0: 'Spruce/Fir',
2.0: 'Lodgepole Pine',
3.0: 'Ponderosa Pine',
4.0: 'Cottonwood/Willow',
5.0: 'Aspen',
6.0: 'Douglas-fir',
7.0: 'Krummholz' }
print 'Tree Cover Types:'
CoverTypes
Out[3]:
In [4]:
# creating a directory called covtype, download and decompress covtype.data.gz into it
from os.path import exists
if not exists('covtype'):
print "creating directory covtype"
!mkdir covtype
%cd covtype
if not exists('covtype.data'):
if not exists('covtype.data.gz'):
print 'downloading covtype.data.gz'
!curl -O http://archive.ics.uci.edu/ml/machine-learning-databases/covtype/covtype.data.gz
print 'decompressing covtype.data.gz'
!gunzip -f covtype.data.gz
!ls -l
%cd ..
In [5]:
# Define the feature names
cols_txt="""
Elevation, Aspect, Slope, Horizontal_Distance_To_Hydrology,
Vertical_Distance_To_Hydrology, Horizontal_Distance_To_Roadways,
Hillshade_9am, Hillshade_Noon, Hillshade_3pm,
Horizontal_Distance_To_Fire_Points, Wilderness_Area (4 binarycolumns),
Soil_Type (40 binary columns), Cover_Type
"""
In [6]:
# Break up features that are made out of several binary features.
from string import split,strip
cols=[strip(a) for a in split(cols_txt,',')]
colDict={a:[a] for a in cols}
colDict['Soil_Type (40 binary columns)'] = ['ST_'+str(i) for i in range(40)]
colDict['Wilderness_Area (4 binarycolumns)'] = ['WA_'+str(i) for i in range(4)]
Columns=[]
for item in cols:
Columns=Columns+colDict[item]
print Columns
In [7]:
# Have a look at the first two lines of the data file
!head -2 covtype/covtype.data
In [8]:
# Read the file into an RDD
# If doing this on a real cluster, you need the file to be available on all nodes, ideally in HDFS.
path='covtype/covtype.data'
inputRDD=sc.textFile(path)
In [9]:
# Transform the text RDD into an RDD of LabeledPoints
Data=inputRDD.map(lambda line: [float(strip(x)) for x in line.split(',')])\
.map(lambda a: LabeledPoint(a[-1], a[0:-1]))
Out[9]:
In [11]:
# count the number of examples of each type
total=Data.cache().count()
print 'total data size=',total
counts=Data.map(lambda sample: (sample.label, 1)).reduceByKey(lambda x,y: x+y).collect()
counts.sort(key=lambda x:x[1], reverse=True)
print ' type (label): percent of total'
print '---------------------------------------------------------'
print '\n'.join(['%20s (%3.1f):\t%4.2f'%(CoverTypes[a[0]],a[0],100.0*a[1]/float(total)) for a in counts])
total data size= 581012
type (label): percent of total
---------------------------------------------------------
Lodgepole Pine (2.0): 48.76
Spruce/Fir (1.0): 36.46
Ponderosa Pine (3.0): 6.15
Krummholz (7.0): 3.53
Douglas-fir (6.0): 2.99
Aspen (5.0): 1.63
Cottonwood/Willow (4.0): 0.47
The implementation of BoostedGradientTrees in MLLib supports only binary problems. the CovTYpe
problem has
7 classes. To make the problem binary we choose the Lodgepole Pine
(label = 2.0). We therefor transform the dataset to a new dataset where the label is 1.0
is the class is Lodgepole Pine
and is 0.0
otherwise.
In [12]:
Label=2.0
mapping = {Label: 1.0}
Data=inputRDD.map(lambda line: [float(x) for x in line.split(',')])\
.map(lambda a: LabeledPoint(mapping.setdefault(a[-1], 0.0), a[0:-1]))
Out[12]:
In [13]:
Data1=Data.sample(False,0.1).cache()
(trainingData,testData)=Data1.randomSplit([0.7,0.3])
print 'Sizes: Data1=%d, trainingData=%d, testData=%d'%(Data1.count(),trainingData.cache().count(),testData.cache().count())
In [14]:
counts=testData.map(lambda lp:(lp.label,1)).reduceByKey(lambda x,y:x+y).collect()
counts.sort(key=lambda x:x[1],reverse=True)
counts
Out[14]:
Following this example from the mllib documentation
GradientBoostedTrees
is the class that implements the learning trainClassifier,trainClassifier(trainingData)
which takes as input a training set and generates an instance of GradientBoostedTreesModel
GradientBoostedTreesModel
represents the output of the boosting process: a linear combination of classification trees. The methods supported by this class are:save(sc, path)
: save the tree to a given filename, sc is the Spark Context.load(sc,path)
: The counterpart to save - load classifier from file.predict(X)
: predict on a single datapoint (the .features
field of a LabeledPont
) or an RDD of datapoints.toDebugString()
: print the classifier in a human readable format.
In [ ]:
from time import time
errors={}
for depth in [1,3,6,10]:
start=time()
catInfo = {}
for i in range(10,54):
catInfo[i] = 2
model=GradientBoostedTrees.trainClassifier(trainingData, categoricalFeaturesInfo={},
numIterations=10, maxDepth=depth, learningRate=0.25,
maxBins=54)
errors[depth]={}
dataSets={'train':trainingData,'test':testData}
for name in dataSets.keys(): # Calculate errors on train and test sets
data=dataSets[name]
Predicted=model.predict(data.map(lambda x: x.features))
LabelsAndPredictions = data.map(lambda lp: lp.label).zip(Predicted)
Err = LabelsAndPredictions.filter(lambda (v,p): v != p).count()/float(data.count())
errors[depth][name]=Err
print depth,errors[depth],int(time()-start),'seconds'
print errors
#.167, .148
In [16]:
B10 = errors
In [17]:
# Plot Train/test accuracy vs Depth of trees graph
%pylab inline
from plot_utils import *
make_figure([B10],['10Trees'],Title='Boosting using 10% of data')
Following this example from the mllib documentation
trainClassifier(data, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy='auto', impurity='gini', maxDepth=4, maxBins=32, seed=None)
Method to train a decision tree model for binary or multiclass classification.
Parameters:
Returns:
RandomForestModel that can be used for prediction
In [18]:
from time import time
errors={}
for depth in [1,3,6,10,15,20]:
start=time()
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
numTrees=10, featureSubsetStrategy="auto",
impurity='gini', maxDepth=depth)
#print model.toDebugString()
errors[depth]={}
dataSets={'train':trainingData,'test':testData}
for name in dataSets.keys(): # Calculate errors on train and test sets
data=dataSets[name]
Predicted=model.predict(data.map(lambda x: x.features))
LabelsAndPredictions = data.map(lambda lp: lp.label).zip(Predicted)
Err = LabelsAndPredictions.filter(lambda (v,p): v != p).count()/float(data.count())
errors[depth][name]=Err
print depth,errors[depth],int(time()-start),'seconds'
print errors
In [19]:
RF_10trees = errors
# Plot Train/test accuracy vs Depth of trees graph
make_figure([RF_10trees],['10Trees'],Title='Random Forests using 10% of data')
In [20]:
make_figure([RF_10trees, B10],['10Trees', 'GB'],Title='GBT vs RF')
In [ ]: