In [1]:
from pyspark import SparkConf, SparkContext
import re
In [2]:
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))
magic_word = "narodnaya"
def parse_article(line):
try:
article_id, text = unicode(line.rstrip()).split('\t', 1)
text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
result = []
words = [s.lower() for s in words]
for i in range(len(words) - 1):
if (magic_word == words[i]):
result.append(words[i] + "_" + words[i+1])
return result
except ValueError as e:
return []
parser_wc = lambda line: [(word, 1) for word in parse_article(line)]
reducer_wc = lambda a, b: a + b
result = (
sc.textFile("/data/wiki/en_articles_part/articles-part")
.flatMap(parse_article)
.map(lambda x: (x, 1))
.reduceByKey(lambda a, b: a+b)
.collect()
)
for word in result:
print word[0], word[1]
In [ ]: