In [3]:
# Initialization Spark in Python
from pyspark import SparkContext
sc = SparkContext("local", "Work with RDD")

Load data


In [25]:
# Create RDD [type 1]
lines = sc.parallelize(["pandas", "i like pandas"])

In [5]:
# Create RDD [type 2]
logFile = "/usr/apache/spark-2.0.2-bin-hadoop2.7/README.md"
lines = sc.textFile(logFile)

Operation with RDD

Tranformation

Get new RDD (filter, mapp)

Action

Get data (count, first)

Transformation example


In [6]:
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "Error" in x)

In [7]:
warningsRDD = inputRDD.filter(lambda x: "Warning" in x)

In [8]:
# union
badLinesRDD = errorsRDD.union(warningsRDD)

Action example


In [11]:
# Count messages about error
print "Input had " + str(badLinesRDD.count()) + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
    print line


Input had 16 concerning lines
Here are 10 examples:
[12.08.16:12.46] Error: add bonuss
[12.08.16:12.56] Error: add bonuss
[12.08.16:12.56] Error: add bonuss
[12.08.16:12.57] Error: add bonuss
[12.08.09:12.56] Warning: bad request 'add bonuss'
[12.08.15:12.56] Warning: bad request 'add bonuss'
[12.08.15:12.56] Warning: bad request 'add bonuss'
[12.08.16:12.56] Warning: bad request 'add bonuss'
[12.08.16:12.56] Warning: bad request 'add bonuss'
[12.08.16:12.56] Warning: bad request 'add bonuss'

Elementwise conversion


In [14]:
# Calculation of the squares of numbers
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i" % (num)


1
4
9
16

In [18]:
# Splitting strings into words using flatMap()
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first()


Out[18]:
'hello'

Operations with sets


In [19]:
rdd1 = sc.parallelize(["coffee", "coffee", "panda", "monkey", "tea"])
rdd2 = sc.parallelize(["coffee", "monkey", "kitty"])

In [28]:
test = rdd1.distinct()
test.count() # => ["coffee", "panda", "monkey", "tea"]


Out[28]:
4

In [29]:
test = rdd1.union(rdd2)
test.count() # => ["coffee", "coffee", "panda", "monkey", "tea", "coffee", "monkey", "kitty"]


Out[29]:
8

In [30]:
test = rdd1.intersection(rdd2)
test.count() # => ["coffee", "monkey"]


Out[30]:
2

In [31]:
test = rdd1.subtract(rdd2)
test.count() # => ["panda", "tea"]


Out[31]:
2

In [32]:
test = rdd1.cartesian(rdd2)
test.count()


Out[32]:
15

Actions


In [58]:
nums = sc.parallelize([1, 2, 3, 3])

In [59]:
sum = nums.reduce(lambda x, y: x + y)
sum


Out[59]:
9

In [60]:
sumCount = nums.aggregate((0, 0),
                         (lambda acc, value: (acc[0] + value, acc[1] + 1)),
                         (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
sumCount[0] / float(sumCount[1])


Out[60]:
2.25

Example:


In [63]:
nums.collect()


Out[63]:
[1, 2, 3, 3]

In [65]:
nums.count()


Out[65]:
4

In [66]:
nums.countByValue()


Out[66]:
defaultdict(int, {1: 1, 2: 1, 3: 2})

In [67]:
nums.take(2)


Out[67]:
[1, 2]

In [68]:
nums.top(2)


Out[68]:
[3, 3]

In [72]:
nums.takeOrdered(2)


Out[72]:
[1, 2]

In [77]:
nums.takeSample(1, 1)


Out[77]:
[3]

In [79]:
nums.reduce(lambda x, y: x + y)


Out[79]:
9

In [88]:
nums.fold(0, lambda x, y: x + y)


Out[88]:
9

In [109]:
nums.aggregate((0, 0), 
            (lambda x, y: (x[0] + y, x[1] + 1)),
            (lambda x, y: (x[0] + y[0], x[1] + y[1])))


Out[109]:
(9, 4)

In [116]:
nums.foreach(lambda x: x * 2)

Save (Chache)


In [121]:
save = nums.persist()

In [122]:
save = save.unpersist()

In [123]:
sc.stop()