In [9]:
import numpy as np
import pyspark
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import SQLContext,Row
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.tree import DecisionTree
In [ ]:
sc = pyspark.SparkContext('local[*]')
In [13]:
rawdata=[
['sunny',85,85,'FALSE',0, 1],
['sunny',80,90,'TRUE',0, 1],
['overcast',83,86,'FALSE',1, 1],
['rainy',70,96,'FALSE',1, 1],
['rainy',68,80,'FALSE',1, 1],
['rainy',65,70,'TRUE',0, 1],
['overcast',64,65,'TRUE',1, 1],
['sunny',72,95,'FALSE',0, 1],
['sunny',69,70,'FALSE',1, 1],
['rainy',75,80,'FALSE',1, 1],
['sunny',75,70,'TRUE',1, 1],
['overcast',72,90,'TRUE',1, 1],
['overcast',81,75,'FALSE',1, 1],
['rainy',71,91,'TRUE',0, 1]
]
In [20]:
sqlContext = SQLContext(sc)
data_df=sqlContext.createDataFrame(rawdata,
['outlook','temp','humid','windy','play','mydummy'])
#transform categoricals into indicator variables
out2index={'sunny':[1,0,0],'overcast':[0,1,0],'rainy':[0,0,1]}
#make RDD of labeled vectors
def newrow(dfrow):
outrow = list(out2index.get((dfrow[0]))) #get dictionary entry for outlook
outrow.append(dfrow[1]) #temp
outrow.append(dfrow[2]) #humidity
if dfrow[3]=='TRUE': #windy
outrow.append(1)
else:
outrow.append(0)
outrow.append(dfrow[5])
return (LabeledPoint(dfrow[4],outrow))
datax_rdd=data_df.map(newrow)
In [24]:
#execute model, it can go in a single pass
nb_model = NaiveBayes.train(datax_rdd)
#Some info on model
print(nb_model)
#some checks,get some of training data and test it:
datax_col=datax_rdd.collect() #if datax_rdd was big, use sample or take
trainset_pred =[]
for x in datax_col:
trainset_pred.append(nb_model.predict(x.features))
print(trainset_pred)
#to see class conditionals
#you might have to install scipy
#import scipy
#print 'Class Cond Probabilities, ie p(attr|class= 0 or 1) '
#print scipy.exp(my_nbmodel.theta)
#print scipy.exp(my_nbmodel.pi)
#get a confusion matrix
#the row is the true class label 0 or 1, columns are predicted label
#
nb_cf_mat=np.zeros([2,2]) #num of classes
for pnt in datax_col:
predctn = nb_model.predict(np.array(pnt.features))
nb_cf_mat[pnt.label][predctn]+=1
corrcnt=0
for i in range(2):
corrcnt+=nb_cf_mat[i][i]
nb_per_corr=corrcnt/nb_cf_mat.sum()
print('Naive Bayes: Conf.Mat. and Per Corr')
print(nb_cf_mat)
print(nb_per_corr)
In [22]:
# Decision tree model
dt_model = DecisionTree.trainClassifier(datax_rdd,2,{},impurity='entropy',
maxDepth=5,maxBins=32, minInstancesPerNode=2)
#maxDepth and maxBins
#{} could be categorical feature list,
# To do regression, have no numclasses,and use trainRegression function
print(dt_model.toDebugString())
#results in this:
#DecisionTreeModel classifier of depth 3 with 9 nodes
# If (feature 1 <= 0.0)
# If (feature 4 <= 80.0)
# If (feature 3 <= 68.0)
# Predict: 0.0
# Else (feature 3 > 68.0)
# Predict: 1.0
# Else (feature 4 > 80.0)
# If (feature 0 <= 0.0)
# Predict: 0.0
# Else (feature 0 > 0.0)
# Predict: 0.0
# Else (feature 1 > 0.0)
# Predict: 1.0
#notice number of nodes are the predict (leaf nodes) and the ifs
#some checks,get some of training data and test it:
datax_col=datax_rdd.collect() #if datax_rdd was big, use sample or take
#redo the conf. matrix code (it would be more efficient to pass a model)
dt_cf_mat=np.zeros([2,2]) #num of classes
for pnt in datax_col:
predctn = dt_model.predict(np.array(pnt.features))
dt_cf_mat[pnt.label][predctn]+=1
corrcnt=0
for i in range(2):
corrcnt+=dt_cf_mat[i][i]
dt_per_corr=corrcnt/dt_cf_mat.sum()
print('Decision Tree: Conf.Mat. and Per Corr')
print(dt_cf_mat)
print(dt_per_corr)
In [25]:
newpoint = np.array([1,0,0,68,79,0,1])
print(nb_model.predict(newpoint))
print(dt_model.predict(newpoint))
In [ ]: