In [1]:
corpus = sc.textFile("../data/pg1342.txt")
corpus.cache()
corpus.takeSample(True, 5)
Out[1]:
In [2]:
word_split = corpus.flatMap(lambda line: line.split(" ")).filter(lambda element: len(element) > 0)
word_split.takeSample(True, 5)
Out[2]:
In [3]:
mapped = word_split.map(lambda word: (word.lower(), 1))
mapped.takeSample(True, 5)
Out[3]:
In [4]:
reduced = mapped.reduceByKey(lambda a, b: a+b)
reduced.takeSample(True, 5)
Out[4]:
In [5]:
swapped = reduced.map(lambda row: (row[1], row[0])).sortByKey(ascending=False)
swapped.take(10)
Out[5]:
In [6]:
unswapped = swapped.map(lambda row: (row[1], row[0]))
unswapped.take(10)
Out[6]:
In [8]:
#concise implementation
from operator import add
swap = lambda row: (row[1], row[0])
words = sc.textFile("../data/pg1342.txt").flatMap(lambda line: line.split(" "))
words.cache()
counts = words.filter(lambda element: len(element) > 0).map(
lambda word: (word, 1)).reduceByKey(add).map(swap).sortByKey(ascending=False).map(swap).collect()
In [15]:
counts[:10]
Out[15]:
In [ ]: