In [1]:
from __future__ import division
import numpy as np

from pyspark import SparkConf
from pyspark import SparkContext

In [2]:
conf = SparkConf()
conf.setMaster('spark://ip-172-31-9-200:7077')
conf.setAppName('spark_analytics_chpt_4')
conf.set("spark.executor.memory", "10g")
sc = SparkContext(conf=conf)

In [3]:
raw_data = sc.textFile('covtype.data')

In [4]:
raw_data.count()


Out[4]:
581012

In [5]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

def to_float(s):
    try:
        return float(s)
    except:
        return float('nan')
    
def clean(line):
    for x in line.split(','):
        values = map(to_float, line.split(','))
        featureVector = Vectors.dense(values[:-1])
        label = values[-1] - 1
        return LabeledPoint(label, featureVector)

In [6]:
data = raw_data.map(clean)

In [7]:
data.take(5)


Out[7]:
[LabeledPoint(4.0, [2596.0,51.0,3.0,258.0,0.0,510.0,221.0,232.0,148.0,6279.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(4.0, [2590.0,56.0,2.0,212.0,-6.0,390.0,220.0,235.0,151.0,6225.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(1.0, [2804.0,139.0,9.0,268.0,65.0,3180.0,234.0,238.0,135.0,6121.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(1.0, [2785.0,155.0,18.0,242.0,118.0,3090.0,238.0,238.0,122.0,6211.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(4.0, [2595.0,45.0,2.0,153.0,-1.0,391.0,220.0,234.0,150.0,6172.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])]

In [8]:
training_data, cv_data, test_data = data.randomSplit([0.8, 0.1, 0.1])

In [9]:
training_data.cache()
cv_data.cache()
test_data.cache()


Out[9]:
PythonRDD[6] at RDD at PythonRDD.scala:43

In [10]:
training_data.count(), cv_data.count(), test_data.count()


Out[10]:
(464818, 58325, 57869)

Decision Tree


In [11]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel

In [12]:
model = DecisionTree.trainClassifier(training_data, 7, {}, 'gini', 4, 100)

In [13]:
predictions = model.predict(data.map(lambda x: x.features))
labels_and_predictions = data.map(lambda lp: lp.label).zip(predictions)

In [14]:
metrics = MulticlassMetrics(labels_and_predictions)

In [15]:
metrics.confusionMatrix()


Out[15]:
DenseMatrix(6, 6, [143414.0, 64794.0, 108.0, 10.0, 0.0, 3514.0, 56061.0, 222001.0, ..., 123.0, 0.0, 11315.0, 310.0, 0.0, 0.0, 0.0, 8885.0], 0)

In [16]:
metrics.precision()


Out[16]:
0.6997566315325673

In [18]:
map(lambda cat: (metrics.precision(cat), metrics.recall(cat)), [0, 1, 2, 3, 4, 6])


Out[18]:
[(0.6769920694864048, 0.6803643436595664),
 (0.7836223663170974, 0.729323604682105),
 (0.8678749230855289, 0.6241576988836367),
 (0.4055333090644339, 0.3523086654016445),
 (0.012956915622037291, 0.7109826589595376),
 (0.4332033154558752, 0.6952813209171297)]

In [19]:
def classProbabilities(data):
    countsByCategory = data.map(lambda x: x.label).countByValue()
    counts = np.array(countsByCategory.values()) / sum(countsByCategory.values())
    return counts

In [20]:
trainPriorProbabilities = classProbabilities(training_data)

In [21]:
cvPriorProbabilities = classProbabilities(cv_data)

In [22]:
sum([x[0] * x[1] for x in zip(trainPriorProbabilities, cvPriorProbabilities)])


Out[22]:
0.37673722119246689

In [23]:
for impurity in ('gini', 'entropy'):
    for depth in (1, 20):
        for bins in (10, 300):
            model = DecisionTree.trainClassifier(training_data, 7, {}, impurity, depth, bins)
            predictions = model.predict(cv_data.map(lambda x: x.features))
            labels_and_predictions = cv_data.map(lambda lp: lp.label).zip(predictions)
            metrics = MulticlassMetrics(labels_and_predictions)
            accuracy = metrics.precision()
            print (impurity, depth, bins), accuracy


('gini', 1, 10) 0.632353193313
('gini', 1, 300) 0.631290184312
('gini', 20, 10) 0.891350192885
('gini', 20, 300) 0.905940848693
('entropy', 1, 10) 0.487509644235
('entropy', 1, 300) 0.487509644235
('entropy', 20, 10) 0.895207886841
('entropy', 20, 300) 0.914582083155

In [24]:
model = DecisionTree.trainClassifier(training_data.union(cv_data), 7, {}, 'entropy', 20, 300)

In [25]:
predictions = model.predict(data.map(lambda x: x.features))
labels_and_predictions = data.map(lambda lp: lp.label).zip(predictions)
metrics = MulticlassMetrics(labels_and_predictions)
accuracy = metrics.precision()
print accuracy


0.941133745947

Random Forest


In [26]:
from pyspark.mllib.tree import RandomForest

In [27]:
forest = RandomForest.trainClassifier(training_data, 7, {10:4, 11:40}, 20, 'auto', 'entropy', 30, 300)

In [28]:
predictions = model.predict(data.map(lambda x: x.features))
labels_and_predictions = data.map(lambda lp: lp.label).zip(predictions)
metrics = MulticlassMetrics(labels_and_predictions)
accuracy = metrics.precision()
print accuracy


0.941133745947

In [29]:
from pyspark.mllib.linalg import Vectors

input = '2709,125,28,67,23,3224,253,207,61,6094,0,29'
vector = Vectors.dense([to_float(x) for x in input.split(',')])

In [30]:
result = forest.predict(vector)


---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-30-28f809ae2fc7> in <module>()
----> 1 result = forest.predict(vector)

/opt/anaconda/share/spark/python/pyspark/mllib/tree.py in predict(self, x)
     44 
     45         else:
---> 46             return self.call("predict", _convert_to_vector(x))
     47 
     48     def numTrees(self):

/opt/anaconda/share/spark/python/pyspark/mllib/common.py in call(self, name, *a)
    144     def call(self, name, *a):
    145         """Call method of java_model"""
--> 146         return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
    147 
    148 

/opt/anaconda/share/spark/python/pyspark/mllib/common.py in callJavaFunc(sc, func, *args)
    121     """ Call Java Function """
    122     args = [_py2java(sc, a) for a in args]
--> 123     return _java2py(sc, func(*args))
    124 
    125 

/opt/anaconda/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

/opt/anaconda/share/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     34     def deco(*a, **kw):
     35         try:
---> 36             return f(*a, **kw)
     37         except py4j.protocol.Py4JJavaError as e:
     38             s = e.java_exception.toString()

/opt/anaconda/share/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling o725.predict.
: java.lang.ArrayIndexOutOfBoundsException: 35
	at org.apache.spark.mllib.linalg.DenseVector.apply(Vectors.scala:560)
	at org.apache.spark.mllib.tree.model.Node.predict(Node.scala:91)
	at org.apache.spark.mllib.tree.model.DecisionTreeModel.predict(DecisionTreeModel.scala:58)
	at org.apache.spark.mllib.tree.model.TreeEnsembleModel$$anonfun$predictByVoting$1.apply(treeEnsembleModels.scala:311)
	at org.apache.spark.mllib.tree.model.TreeEnsembleModel$$anonfun$predictByVoting$1.apply(treeEnsembleModels.scala:310)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:42)
	at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:43)
	at org.apache.spark.mllib.tree.model.TreeEnsembleModel.predictByVoting(treeEnsembleModels.scala:310)
	at org.apache.spark.mllib.tree.model.TreeEnsembleModel.predict(treeEnsembleModels.scala:334)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:207)
	at java.lang.Thread.run(Thread.java:745)