In [9]:
import numpy as np
import pyspark
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql import SQLContext,Row
from pyspark.mllib.classification import NaiveBayes
from pyspark.mllib.tree import DecisionTree

In [ ]:
sc = pyspark.SparkContext('local[*]')

In [13]:
rawdata=[
['sunny',85,85,'FALSE',0, 1],
['sunny',80,90,'TRUE',0, 1],
['overcast',83,86,'FALSE',1, 1],
['rainy',70,96,'FALSE',1, 1],
['rainy',68,80,'FALSE',1, 1],
['rainy',65,70,'TRUE',0, 1],
['overcast',64,65,'TRUE',1, 1],
['sunny',72,95,'FALSE',0, 1],
['sunny',69,70,'FALSE',1, 1],
['rainy',75,80,'FALSE',1, 1],
['sunny',75,70,'TRUE',1, 1],
['overcast',72,90,'TRUE',1, 1],
['overcast',81,75,'FALSE',1, 1],
['rainy',71,91,'TRUE',0, 1]
]

In [20]:
sqlContext = SQLContext(sc)

data_df=sqlContext.createDataFrame(rawdata,
   ['outlook','temp','humid','windy','play','mydummy'])

#transform categoricals into indicator variables
out2index={'sunny':[1,0,0],'overcast':[0,1,0],'rainy':[0,0,1]}

#make RDD of labeled vectors
def newrow(dfrow):
    outrow = list(out2index.get((dfrow[0])))  #get dictionary entry for outlook
    outrow.append(dfrow[1])   #temp
    outrow.append(dfrow[2])   #humidity
    if dfrow[3]=='TRUE':      #windy
        outrow.append(1)
    else:
        outrow.append(0)
    outrow.append(dfrow[5])
    return (LabeledPoint(dfrow[4],outrow))

datax_rdd=data_df.map(newrow)

In [24]:
#execute model, it can go in a single pass
nb_model = NaiveBayes.train(datax_rdd)

#Some info on model 
print(nb_model)
#some checks,get some of training data and test it:
datax_col=datax_rdd.collect()   #if datax_rdd was big, use sample or take

trainset_pred =[]
for x in datax_col:
    trainset_pred.append(nb_model.predict(x.features))

print(trainset_pred)

#to see class conditionals
#you might have to install scipy
#import scipy
#print 'Class Cond Probabilities, ie p(attr|class= 0 or 1) '
#print scipy.exp(my_nbmodel.theta)
#print scipy.exp(my_nbmodel.pi)

#get a confusion matrix
#the row is the true class label 0 or 1, columns are predicted label
#
nb_cf_mat=np.zeros([2,2])  #num of classes
for pnt in datax_col:
    predctn = nb_model.predict(np.array(pnt.features))
    nb_cf_mat[pnt.label][predctn]+=1

corrcnt=0
for i in range(2):
    corrcnt+=nb_cf_mat[i][i]
nb_per_corr=corrcnt/nb_cf_mat.sum()
print('Naive Bayes: Conf.Mat. and Per Corr')
print(nb_cf_mat)
print(nb_per_corr)


<pyspark.mllib.classification.NaiveBayesModel object at 0x7f3c81a75a90>
[1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0]
Naive Bayes: Conf.Mat. and Per Corr
[[ 3.  2.]
 [ 0.  9.]]
0.857142857143
/opt/conda/lib/python3.5/site-packages/ipykernel/__main__.py:28: DeprecationWarning: using a non-integer number instead of an integer will result in an error in the future

In [22]:
# Decision tree model
dt_model = DecisionTree.trainClassifier(datax_rdd,2,{},impurity='entropy',
          maxDepth=5,maxBins=32, minInstancesPerNode=2)  

#maxDepth and maxBins
#{} could be categorical feature list,
# To do regression, have no numclasses,and use trainRegression function
print(dt_model.toDebugString())

#results in this:
#DecisionTreeModel classifier of depth 3 with 9 nodes
#  If (feature 1 <= 0.0)
#   If (feature 4 <= 80.0)
#    If (feature 3 <= 68.0)
#     Predict: 0.0
#    Else (feature 3 > 68.0)
#     Predict: 1.0
#   Else (feature 4 > 80.0)
#    If (feature 0 <= 0.0)
#     Predict: 0.0
#    Else (feature 0 > 0.0)
#     Predict: 0.0
#  Else (feature 1 > 0.0)
#   Predict: 1.0

#notice number of nodes are the predict (leaf nodes) and the ifs
           
#some checks,get some of training data and test it:
datax_col=datax_rdd.collect()   #if datax_rdd was big, use sample or take

#redo the conf. matrix code (it would be more efficient to pass a model)
dt_cf_mat=np.zeros([2,2])  #num of classes
for pnt in datax_col:
    predctn = dt_model.predict(np.array(pnt.features))
    dt_cf_mat[pnt.label][predctn]+=1
corrcnt=0
for i in range(2): 
    corrcnt+=dt_cf_mat[i][i]
dt_per_corr=corrcnt/dt_cf_mat.sum()
print('Decision Tree: Conf.Mat. and Per Corr')
print(dt_cf_mat)
print(dt_per_corr)


DecisionTreeModel classifier of depth 3 with 9 nodes
  If (feature 1 <= 0.0)
   If (feature 4 <= 80.0)
    If (feature 3 <= 68.0)
     Predict: 0.0
    Else (feature 3 > 68.0)
     Predict: 1.0
   Else (feature 4 > 80.0)
    If (feature 0 <= 0.0)
     Predict: 0.0
    Else (feature 0 > 0.0)
     Predict: 0.0
  Else (feature 1 > 0.0)
   Predict: 1.0

Decision Tree: Conf.Mat. and Per Corr
[[ 5.  0.]
 [ 2.  7.]]
0.857142857143
/opt/conda/lib/python3.5/site-packages/ipykernel/__main__.py:35: DeprecationWarning: using a non-integer number instead of an integer will result in an error in the future

In [25]:
newpoint = np.array([1,0,0,68,79,0,1])
print(nb_model.predict(newpoint))
print(dt_model.predict(newpoint))


1.0
0.0

In [ ]: