In [1]:
from __future__ import division
import numpy as np
from pyspark import SparkConf
from pyspark import SparkContext
In [2]:
conf = SparkConf()
conf.setMaster('spark://ip-172-31-9-200:7077')
conf.setAppName('spark_analytics_chpt_4')
conf.set("spark.executor.memory", "10g")
sc = SparkContext(conf=conf)
In [3]:
raw_data = sc.textFile('covtype.data')
In [4]:
raw_data.count()
Out[4]:
In [5]:
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
def to_float(s):
try:
return float(s)
except:
return float('nan')
def clean(line):
for x in line.split(','):
values = map(to_float, line.split(','))
featureVector = Vectors.dense(values[:-1])
label = values[-1] - 1
return LabeledPoint(label, featureVector)
In [6]:
data = raw_data.map(clean)
In [7]:
data.take(5)
Out[7]:
In [8]:
training_data, cv_data, test_data = data.randomSplit([0.8, 0.1, 0.1])
In [9]:
training_data.cache()
cv_data.cache()
test_data.cache()
Out[9]:
In [10]:
training_data.count(), cv_data.count(), test_data.count()
Out[10]:
In [11]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
In [12]:
model = DecisionTree.trainClassifier(training_data, 7, {}, 'gini', 4, 100)
In [13]:
predictions = model.predict(data.map(lambda x: x.features))
labels_and_predictions = data.map(lambda lp: lp.label).zip(predictions)
In [14]:
metrics = MulticlassMetrics(labels_and_predictions)
In [15]:
metrics.confusionMatrix()
Out[15]:
In [16]:
metrics.precision()
Out[16]:
In [18]:
map(lambda cat: (metrics.precision(cat), metrics.recall(cat)), [0, 1, 2, 3, 4, 6])
Out[18]:
In [19]:
def classProbabilities(data):
countsByCategory = data.map(lambda x: x.label).countByValue()
counts = np.array(countsByCategory.values()) / sum(countsByCategory.values())
return counts
In [20]:
trainPriorProbabilities = classProbabilities(training_data)
In [21]:
cvPriorProbabilities = classProbabilities(cv_data)
In [22]:
sum([x[0] * x[1] for x in zip(trainPriorProbabilities, cvPriorProbabilities)])
Out[22]:
In [23]:
for impurity in ('gini', 'entropy'):
for depth in (1, 20):
for bins in (10, 300):
model = DecisionTree.trainClassifier(training_data, 7, {}, impurity, depth, bins)
predictions = model.predict(cv_data.map(lambda x: x.features))
labels_and_predictions = cv_data.map(lambda lp: lp.label).zip(predictions)
metrics = MulticlassMetrics(labels_and_predictions)
accuracy = metrics.precision()
print (impurity, depth, bins), accuracy
In [24]:
model = DecisionTree.trainClassifier(training_data.union(cv_data), 7, {}, 'entropy', 20, 300)
In [25]:
predictions = model.predict(data.map(lambda x: x.features))
labels_and_predictions = data.map(lambda lp: lp.label).zip(predictions)
metrics = MulticlassMetrics(labels_and_predictions)
accuracy = metrics.precision()
print accuracy
In [26]:
from pyspark.mllib.tree import RandomForest
In [27]:
forest = RandomForest.trainClassifier(training_data, 7, {10:4, 11:40}, 20, 'auto', 'entropy', 30, 300)
In [28]:
predictions = model.predict(data.map(lambda x: x.features))
labels_and_predictions = data.map(lambda lp: lp.label).zip(predictions)
metrics = MulticlassMetrics(labels_and_predictions)
accuracy = metrics.precision()
print accuracy
In [29]:
from pyspark.mllib.linalg import Vectors
input = '2709,125,28,67,23,3224,253,207,61,6094,0,29'
vector = Vectors.dense([to_float(x) for x in input.split(',')])
In [30]:
result = forest.predict(vector)