In [ ]:
data = [0,1,2,3,4,5,6,7,8,9]
data
In [ ]:
myRDD = sc.parallelize(data)
myRDD
In [ ]:
myRDD.take(1)
In [ ]:
myRDD.collect()
In [ ]:
fooRDD = sc.textFile('foo.txt')
fooRDD
In [ ]:
fooRDD.collect()
In [ ]:
type(fooRDD)
In [ ]:
type(fooRDD.collect())
In [ ]:
myRDD.collect()
In [ ]:
def addStuff(x):
return x + 200
addStuff(1)
In [ ]:
funcRDD = myRDD.map(addStuff)
funcRDD.collect()
In [ ]:
lambdaRDD = myRDD.map(lambda x: x + 200)
lambdaRDD.collect()
In [ ]:
tupleRDD = sc.parallelize(
[(1,2),
(1,5),
(2,6), (5,7), (6,1), (5,10)]
)
In [ ]:
tupleRDD
In [ ]:
tupleRDD.take(5)
In [ ]:
newTupleRDD = tupleRDD.map(lambda x: x + 200)
newTupleRDD
In [ ]:
newTupleRDD.collect()
In [ ]:
newTupleRDD = tupleRDD.map(lambda (x,y): (x, y+200))
newTupleRDD
In [ ]:
newTupleRDD.collect()
In [ ]:
myRDD = sc.parallelize([1,2,3,4,5])
myTupleRDD = myRDD.map(lambda x: (x, x+200))
myTupleRDD.collect()
In [ ]:
flatMapRDD = myRDD.flatMap(lambda x: (x, x+200))
flatMapRDD.collect()
In [ ]:
listRDD = sc.parallelize([
[1,2,3,4,5],
[6,7,8,9,10],
[10,11,12,13,14,15]
])
listRDD.collect()
In [ ]:
listRDD.map(lambda x: x + [5]).collect()
In [ ]:
listRDD.flatMap(lambda x: x + [5]).collect()
In [ ]:
tupleRDD = sc.parallelize([(1,2), (1,5), (2,6), (5,7), (6,1), (5,10)])
tupleRDD.collect()
In [ ]:
tupleRDD.sortByKey().collect()
In [ ]:
tupleRDD.reduceByKey(lambda x,y: x + y).collect()
In [ ]:
from operator import add
tupleRDD.reduceByKey(add).collect()
In [ ]:
(tupleRDD
.map(lambda (x,y): (x*3, y))
.reduceByKey(add).collect())
In [ ]:
data = sc.textFile('wordcount.txt')
data.take(5)
In [ ]:
mappedData = data.map(lambda line: line.split(' '))
mappedData.take(3)
In [ ]:
flatMappedData = data.flatMap(lambda line: line.split(' '))
flatMappedData.take(10)
In [ ]:
words = flatMappedData.map(lambda word: (word, 1))
words.take(10)
In [ ]:
wordcounts = words.reduceByKey(lambda x,y: x + y)
wordcounts.take(10)
In [ ]:
wordcounts.takeOrdered(10, lambda (word,freq): word)
In [ ]:
wordcounts.takeOrdered(10, lambda (word,freq): -1*freq)
In [ ]:
counts = (data.flatMap(lambda x: x.split(' '))
.map(lambda x: (x, 1))
.reduceByKey(lambda x,y: x + y)
.sortBy(lambda (word,freq): -1*freq))
counts.collect()
In [ ]:
bigVar = sc.parallelize(range(1000000)).zipWithIndex().collectAsMap()
myRDD = sc.parallelize(range(10000))
In [ ]:
bigVar
In [ ]:
myRDD.take(5)
In [ ]:
%%time
def myfunc(x, var):
if x in var:
return 1
else:
return 0
myRDD.map(lambda x: myfunc(x, bigVar)).sum()
In [ ]:
%%time
def myfunc_broadcast(x):
var = bigVarBroadcast.value
if x in var:
return 1
else:
return 0
bigVarBroadcast = sc.broadcast(bigVar)
myRDD.map(lambda x: myfunc_broadcast(x)).sum()
In [ ]:
import sys
dense = [0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0]
sys.getsizeof(dense)
In [ ]:
sparse = (100,[5,16],[1,1])
sys.getsizeof(sparse)
In [ ]:
from pyspark.mllib.linalg import SparseVector
sv = SparseVector(100,[1,5,7,12],[1.0,1.0,1.0,1.0])
sv
In [ ]:
sv.toArray()
In [ ]:
from pyspark.mllib.feature import LabeledPoint
lp = LabeledPoint(1.0, [3,4,5,6,7])
lp
In [ ]:
lp.features
In [ ]:
lp.label
In [ ]:
labeledRDD = sc.parallelize([
LabeledPoint(1.0, SparseVector(1000,[1,5,7,12],[1.0,1.0,1.0,1.0])),
LabeledPoint(1.0, SparseVector(1000,[12,15,27,42],[1.0,1.0,1.0,1.0])),
LabeledPoint(0.0, SparseVector(1000,[5,15,17,102],[1.0,1.0,1.0,1.0])),
LabeledPoint(1.0, SparseVector(1000,[100,500,700],[1.0,1.0,1.0])),
LabeledPoint(0.0, SparseVector(1000,[1],[1.0])),
])
labeledRDD.take(5)
In [ ]:
from pyspark.mllib.classification import LogisticRegressionWithSGD
lr = LogisticRegressionWithSGD()
model = lr.train(labeledRDD)
type(model)
In [ ]:
?model.predict
In [ ]:
model.predict(SparseVector(1000,[1,500,702],[1.0,1.0,1.0]))
In [ ]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
scoresAndLabels = sc.parallelize([
(0.2, 1.0),
(0.3, 0.0),
(0.4, 0.0),
(0.5, 0.0),
(0.7, 0.0),
(0.8, 1.0),
(0.8, 0.0),
(0.9, 1.0)
])
metrics = BinaryClassificationMetrics(scoresAndLabels)
metrics.areaUnderROC
In [ ]:
from pyspark.mllib.evaluation import MulticlassMetrics
predictionsAndLabels = sc.parallelize([
(0.0, 1.0),
(0.0, 0.0),
(0.0, 0.0),
(0.0, 0.0),
(0.0, 0.0),
(1.0, 1.0),
(1.0, 0.0),
(1.0, 1.0)
])
metrics = MulticlassMetrics(predictionsAndLabels)
metrics.truePositiveRate(1.0)
In [ ]:
?MulticlassMetrics.falsePositiveRate
In [ ]:
trainRDD = sc.parallelize([
LabeledPoint(1.0, SparseVector(1000,[1,5,7,12],[1.0,1.0,1.0,1.0])),
LabeledPoint(1.0, SparseVector(1000,[12,15,27,42],[1.0,1.0,1.0,1.0])),
LabeledPoint(0.0, SparseVector(1000,[5,15,17,102],[1.0,1.0,1.0,1.0])),
LabeledPoint(1.0, SparseVector(1000,[100,500,700],[1.0,1.0,1.0])),
LabeledPoint(0.0, SparseVector(1000,[1],[1.0])),
])
testRDD = sc.parallelize([
LabeledPoint(1.0, SparseVector(1000,[10,15,17,120],[1.0,1.0,1.0,1.0])),
LabeledPoint(1.0, SparseVector(1000,[12,27,42],[1.0,1.0,1.0])),
LabeledPoint(0.0, SparseVector(1000,[15,17,102],[1.0,1.0,1.0])),
LabeledPoint(1.0, SparseVector(1000,[10,150,700],[1.0,1.0,1.0])),
LabeledPoint(0.0, SparseVector(1000,[1,2],[1.0,1.0])),
])
In [ ]:
selectedModel = LogisticRegressionWithSGD.train(trainRDD)
predictionAndLabel = testRDD.map(lambda p : (float(selectedModel.predict(p.features)), p.label))
auc = BinaryClassificationMetrics(predictionAndLabel).areaUnderROC
print 'AUC:', auc