In [3]:
# Initialization Spark in Python
from pyspark import SparkContext
sc = SparkContext("local", "Work with RDD")
In [25]:
# Create RDD [type 1]
lines = sc.parallelize(["pandas", "i like pandas"])
In [5]:
# Create RDD [type 2]
logFile = "/usr/apache/spark-2.0.2-bin-hadoop2.7/README.md"
lines = sc.textFile(logFile)
In [6]:
inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "Error" in x)
In [7]:
warningsRDD = inputRDD.filter(lambda x: "Warning" in x)
In [8]:
# union
badLinesRDD = errorsRDD.union(warningsRDD)
In [11]:
# Count messages about error
print "Input had " + str(badLinesRDD.count()) + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
print line
In [14]:
# Calculation of the squares of numbers
nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
print "%i" % (num)
In [18]:
# Splitting strings into words using flatMap()
lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first()
Out[18]:
In [19]:
rdd1 = sc.parallelize(["coffee", "coffee", "panda", "monkey", "tea"])
rdd2 = sc.parallelize(["coffee", "monkey", "kitty"])
In [28]:
test = rdd1.distinct()
test.count() # => ["coffee", "panda", "monkey", "tea"]
Out[28]:
In [29]:
test = rdd1.union(rdd2)
test.count() # => ["coffee", "coffee", "panda", "monkey", "tea", "coffee", "monkey", "kitty"]
Out[29]:
In [30]:
test = rdd1.intersection(rdd2)
test.count() # => ["coffee", "monkey"]
Out[30]:
In [31]:
test = rdd1.subtract(rdd2)
test.count() # => ["panda", "tea"]
Out[31]:
In [32]:
test = rdd1.cartesian(rdd2)
test.count()
Out[32]:
In [58]:
nums = sc.parallelize([1, 2, 3, 3])
In [59]:
sum = nums.reduce(lambda x, y: x + y)
sum
Out[59]:
In [60]:
sumCount = nums.aggregate((0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
sumCount[0] / float(sumCount[1])
Out[60]:
In [63]:
nums.collect()
Out[63]:
In [65]:
nums.count()
Out[65]:
In [66]:
nums.countByValue()
Out[66]:
In [67]:
nums.take(2)
Out[67]:
In [68]:
nums.top(2)
Out[68]:
In [72]:
nums.takeOrdered(2)
Out[72]:
In [77]:
nums.takeSample(1, 1)
Out[77]:
In [79]:
nums.reduce(lambda x, y: x + y)
Out[79]:
In [88]:
nums.fold(0, lambda x, y: x + y)
Out[88]:
In [109]:
nums.aggregate((0, 0),
(lambda x, y: (x[0] + y, x[1] + 1)),
(lambda x, y: (x[0] + y[0], x[1] + y[1])))
Out[109]:
In [116]:
nums.foreach(lambda x: x * 2)
In [121]:
save = nums.persist()
In [122]:
save = save.unpersist()
In [123]:
sc.stop()