In [1]:
from pyspark import SparkConf, SparkContext
import re

In [2]:
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))

magic_word = "narodnaya"

def parse_article(line):
    try:
        article_id, text = unicode(line.rstrip()).split('\t', 1)
        text = re.sub("^\W+|\W+$", "", text, flags=re.UNICODE)
        words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
        result = []
        words = [s.lower() for s in words]
        for i in range(len(words) - 1):
            if (magic_word == words[i]):
                result.append(words[i] + "_" + words[i+1])
        return result
    except ValueError as e:
        return []

parser_wc = lambda line: [(word, 1) for word in parse_article(line)]
reducer_wc = lambda a, b: a + b

result = (
    sc.textFile("/data/wiki/en_articles_part/articles-part")
        .flatMap(parse_article)
        .map(lambda x: (x, 1))
        .reduceByKey(lambda a, b: a+b)
        .collect()
)
for word in result:
    print word[0], word[1]


narodnaya_gazeta 1
narodnaya_volya 9

In [ ]: