Machine Learning in Spark

Data Types

MLLIB support RDD of numpy arrays, vectors, matrices, labeled points.

Supported Distributed Vectors:

  • dense vectors
  • sparse vectors

In [1]:
from pyspark import SparkContext 
sc = SparkContext('local','example')

In [2]:
from pyspark.mllib.linalg import Vectors

x = Vectors.dense([1,2,3,4])

x[0]


Out[2]:
1.0

In [6]:
x = [Vectors.dense([1,2,3,4,5]), Vectors.dense([6,7,8,9,10])]

xrdd = sc.parallelize(x, 2)

xrdd.glom().collect()


Out[6]:
[[DenseVector([1.0, 2.0, 3.0, 4.0, 5.0])],
 [DenseVector([6.0, 7.0, 8.0, 9.0, 10.0])]]

Example Labeled Points


In [7]:
from pyspark.mllib.regression import LabeledPoint as LP

pt = LP(1, Vectors.dense(2,-1,4))

print("Label: ", pt.label)
print("Feature Vector: ", pt.features)


Label:  1.0
Feature Vector:  [2.0,-1.0,4.0]

Example Creating a Word-count RDD


In [ ]:

MLLIB Classification Package

Naive Bayes

  • NB.train(Xrdd_train)
  • NB.train(Xrdd_test.features)
  • model.theta returns $\log P(\text{attribute | class})$
  • model.pi returns $\log$ of class priors $\log (\text{class})$

In [ ]:
from pysparl.mllib.classification import NaiveBayes as NB

nbmodel = NB.train(Xrdd_train)

testpred = NB.train(Xrdd_test.features)
Confusion matrix

In [ ]:
trainpred = nbmodel.predict(Xrdd_train.features)

cf_mat [Xrdd_train.label][trainpred] += 1

Decision Tree


In [ ]:
from pyspark.mllib.classification import DecisionTree

dtmodel = DecisionTree.trainClassifier(Xrdd_train, 
                                       numClasses = 2,
                                       impurity = 'entropy', ## options: gini or entropy
                                       maxDepth = 5,
                                       maxBins = 32,
                                       minInstancesPerNode = 2)