Create some RDDs

From Python list


In [ ]:
data = [0,1,2,3,4,5,6,7,8,9]
data

In [ ]:
myRDD = sc.parallelize(data)
myRDD

In [ ]:
myRDD.take(1)

In [ ]:
myRDD.collect()

From text file


In [ ]:
fooRDD = sc.textFile('foo.txt')
fooRDD

In [ ]:
fooRDD.collect()

In [ ]:
type(fooRDD)

In [ ]:
type(fooRDD.collect())

Lambda Functions


In [ ]:
myRDD.collect()

In [ ]:
def addStuff(x):
    return x + 200

addStuff(1)

In [ ]:
funcRDD = myRDD.map(addStuff)
funcRDD.collect()

In [ ]:
lambdaRDD = myRDD.map(lambda x: x + 200)
lambdaRDD.collect()

Tuples and Lists


In [ ]:
tupleRDD = sc.parallelize(
    [(1,2), 
     (1,5), 
     (2,6), (5,7), (6,1), (5,10)]
)

In [ ]:
tupleRDD

In [ ]:
tupleRDD.take(5)

In [ ]:
newTupleRDD = tupleRDD.map(lambda x: x + 200)
newTupleRDD

In [ ]:
newTupleRDD.collect()

In [ ]:
newTupleRDD = tupleRDD.map(lambda (x,y): (x, y+200))
newTupleRDD

In [ ]:
newTupleRDD.collect()

In [ ]:
myRDD = sc.parallelize([1,2,3,4,5])

myTupleRDD = myRDD.map(lambda x: (x, x+200))
myTupleRDD.collect()

In [ ]:
flatMapRDD = myRDD.flatMap(lambda x: (x, x+200))
flatMapRDD.collect()

In [ ]:
listRDD = sc.parallelize([
        [1,2,3,4,5],
        [6,7,8,9,10],
        [10,11,12,13,14,15]
    ])
listRDD.collect()

In [ ]:
listRDD.map(lambda x: x + [5]).collect()

In [ ]:
listRDD.flatMap(lambda x: x + [5]).collect()

Key/Value Tuples


In [ ]:
tupleRDD = sc.parallelize([(1,2), (1,5), (2,6), (5,7), (6,1), (5,10)])
tupleRDD.collect()

In [ ]:
tupleRDD.sortByKey().collect()

In [ ]:
tupleRDD.reduceByKey(lambda x,y: x + y).collect()

In [ ]:
from operator import add
tupleRDD.reduceByKey(add).collect()

In [ ]:
(tupleRDD
 .map(lambda (x,y): (x*3, y))
 .reduceByKey(add).collect())

Wordcount


In [ ]:
data = sc.textFile('wordcount.txt')
data.take(5)

In [ ]:
mappedData = data.map(lambda line: line.split(' '))
mappedData.take(3)

In [ ]:
flatMappedData = data.flatMap(lambda line: line.split(' '))
flatMappedData.take(10)

In [ ]:
words = flatMappedData.map(lambda word: (word, 1))
words.take(10)

In [ ]:
wordcounts = words.reduceByKey(lambda x,y: x + y)
wordcounts.take(10)

In [ ]:
wordcounts.takeOrdered(10, lambda (word,freq): word)

In [ ]:
wordcounts.takeOrdered(10, lambda (word,freq): -1*freq)

In [ ]:
counts = (data.flatMap(lambda x: x.split(' '))
          .map(lambda x: (x, 1))
          .reduceByKey(lambda x,y: x + y)
          .sortBy(lambda (word,freq): -1*freq))
counts.collect()

Broadcast variables


In [ ]:
bigVar = sc.parallelize(range(1000000)).zipWithIndex().collectAsMap()
myRDD = sc.parallelize(range(10000))

In [ ]:
bigVar

In [ ]:
myRDD.take(5)

Without broadcast


In [ ]:
%%time
def myfunc(x, var):
    if x in var:
        return 1
    else:
        return 0

myRDD.map(lambda x: myfunc(x, bigVar)).sum()

With broadcast


In [ ]:
%%time
def myfunc_broadcast(x):
    var = bigVarBroadcast.value
    if x in var:
        return 1
    else:
        return 0

bigVarBroadcast = sc.broadcast(bigVar)

myRDD.map(lambda x: myfunc_broadcast(x)).sum()

Vectors


In [ ]:
import sys
dense = [0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0]
sys.getsizeof(dense)

In [ ]:
sparse = (100,[5,16],[1,1])
sys.getsizeof(sparse)

In [ ]:
from pyspark.mllib.linalg import SparseVector

sv = SparseVector(100,[1,5,7,12],[1.0,1.0,1.0,1.0])
sv

In [ ]:
sv.toArray()

LabeledPoint


In [ ]:
from pyspark.mllib.feature import LabeledPoint

lp = LabeledPoint(1.0, [3,4,5,6,7])
lp

In [ ]:
lp.features

In [ ]:
lp.label

RDD of LabeledPoints with SparseVector for features


In [ ]:
labeledRDD = sc.parallelize([
        LabeledPoint(1.0, SparseVector(1000,[1,5,7,12],[1.0,1.0,1.0,1.0])),
        LabeledPoint(1.0, SparseVector(1000,[12,15,27,42],[1.0,1.0,1.0,1.0])),
        LabeledPoint(0.0, SparseVector(1000,[5,15,17,102],[1.0,1.0,1.0,1.0])),
        LabeledPoint(1.0, SparseVector(1000,[100,500,700],[1.0,1.0,1.0])),
        LabeledPoint(0.0, SparseVector(1000,[1],[1.0])),
])
labeledRDD.take(5)

Building Models


In [ ]:
from pyspark.mllib.classification import LogisticRegressionWithSGD

lr = LogisticRegressionWithSGD()
model = lr.train(labeledRDD)
type(model)

In [ ]:
?model.predict

In [ ]:
model.predict(SparseVector(1000,[1,500,702],[1.0,1.0,1.0]))

Evaluating Models


In [ ]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

scoresAndLabels = sc.parallelize([
        (0.2, 1.0),
        (0.3, 0.0),
        (0.4, 0.0),
        (0.5, 0.0),
        (0.7, 0.0),
        (0.8, 1.0),
        (0.8, 0.0),
        (0.9, 1.0)
    ])
metrics = BinaryClassificationMetrics(scoresAndLabels)
metrics.areaUnderROC

In [ ]:
from pyspark.mllib.evaluation import MulticlassMetrics

predictionsAndLabels = sc.parallelize([
        (0.0, 1.0),
        (0.0, 0.0),
        (0.0, 0.0),
        (0.0, 0.0),
        (0.0, 0.0),
        (1.0, 1.0),
        (1.0, 0.0),
        (1.0, 1.0)
    ])
metrics = MulticlassMetrics(predictionsAndLabels)
metrics.truePositiveRate(1.0)

In [ ]:
?MulticlassMetrics.falsePositiveRate

Evaluate predicted instances


In [ ]:
trainRDD = sc.parallelize([
        LabeledPoint(1.0, SparseVector(1000,[1,5,7,12],[1.0,1.0,1.0,1.0])),
        LabeledPoint(1.0, SparseVector(1000,[12,15,27,42],[1.0,1.0,1.0,1.0])),
        LabeledPoint(0.0, SparseVector(1000,[5,15,17,102],[1.0,1.0,1.0,1.0])),
        LabeledPoint(1.0, SparseVector(1000,[100,500,700],[1.0,1.0,1.0])),
        LabeledPoint(0.0, SparseVector(1000,[1],[1.0])),
])
testRDD = sc.parallelize([
        LabeledPoint(1.0, SparseVector(1000,[10,15,17,120],[1.0,1.0,1.0,1.0])),
        LabeledPoint(1.0, SparseVector(1000,[12,27,42],[1.0,1.0,1.0])),
        LabeledPoint(0.0, SparseVector(1000,[15,17,102],[1.0,1.0,1.0])),
        LabeledPoint(1.0, SparseVector(1000,[10,150,700],[1.0,1.0,1.0])),
        LabeledPoint(0.0, SparseVector(1000,[1,2],[1.0,1.0])),
])

In [ ]:
selectedModel = LogisticRegressionWithSGD.train(trainRDD)
predictionAndLabel = testRDD.map(lambda p : (float(selectedModel.predict(p.features)), p.label))
auc = BinaryClassificationMetrics(predictionAndLabel).areaUnderROC
print 'AUC:', auc