In [1]:
# Initialization Spark in Python
from pyspark import SparkContext
sc = SparkContext("local", "Work with key-value")
In [11]:
lines = sc.parallelize(["hello world", "hi 123", "good moning"])
In [12]:
pairs = lines.map(lambda x: (x.split(" ")[0], x))
In [17]:
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)
result.first()
Out[17]:
In [20]:
rdd = sc.parallelize({(1, 2), (3, 4), (3, 6)})
In [22]:
result = rdd.reduceByKey(lambda x, y: x + y)
result.first()
Out[22]:
In [26]:
result = rdd.groupByKey()
result.first()
Out[26]:
In [27]:
result = rdd.mapValues(lambda x: x + 1)
result.first()
Out[27]:
In [31]:
result = rdd.keys()
result.first()
Out[31]:
In [32]:
result = rdd.values()
result.first()
Out[32]:
In [33]:
result = rdd.sortByKey()
result.first()
Out[33]:
In [34]:
rdd = sc.parallelize({(1, 2), (3, 4), (3, 6)})
other = sc.parallelize({(3, 9)})
In [37]:
result = rdd.subtractByKey(other)
result.first()
Out[37]:
In [38]:
result = rdd.join(other)
result.first()
Out[38]:
In [39]:
result = rdd.rightOuterJoin(other)
result.first()
Out[39]:
In [41]:
result = rdd.leftOuterJoin(other)
result.first()
Out[41]:
In [42]:
result = rdd.cogroup(other)
result.first()
Out[42]:
In [46]:
result = rdd.mapValues(lambda x: (x, 1)).reduceByKey(
lambda x, y: (x[0] + y[0], x[1] + y[1]))
In [53]:
# counting words
text = sc.textFile("log.txt")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
In [67]:
# The calculation of the average value for each key
sumCount = rdd.combineByKey(
(lambda x: (x, 1)),
(lambda x, y: (x[1] + 1)),
(lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1]))
sumCount.collectAsMap()
Out[67]:
In [69]:
data = [("a", 3), ("b", 4), ("a", 1)]
In [72]:
# Default
result = sc.parallelize(data).reduceByKey(lambda x, y: x + y)
result.first()
Out[72]:
In [74]:
# Custom
result = sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10)
result.first()
Out[74]:
In [77]:
rdd.getNumPartitions()
Out[77]:
In [79]:
result = rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))
result.first()
Out[79]:
In [80]:
sc.stop()