In [1]:
from collections import Counter, defaultdict
import re

""" Let's use map-reduce to count some words! """


def tokenize(message):
    """ Removes all punctuation and returns a simple set
    of all the words in the given string (pushed to lower case)
    """
    message = message.lower()
    all_words = re.findall("[a-z0-9]+", message)
    return set(all_words)


def wc_mapper(doc):
    """ for each word in the doc, emis (word, 1) """
    for word in tokenize(doc):
        yield (word, 1)


def wc_reducer(word, counts):
    """ sum the counts for a word """
    yield (word, sum(counts))


def word_count(docs):
    """ count the words in the input docs using map-reduce """
    # place to store grouped values
    collector = defaultdict(list)
    
    for doc in docs:
        for word, count in wc_mapper(doc):
            collector[word].append(count)
    
    return [output for word, counts in collector.items()
                   for output in wc_reducer(word, counts)]

In [2]:
docs = ['ENter the Collector', 'Enter the Dragon', 'Enter the Void']

word_count(docs)


Out[2]:
[('dragon', 1), ('enter', 3), ('void', 1), ('collector', 1), ('the', 3)]

In [3]:
""" Now let's try a slightly more general map-reduce example """

def map_reduce(inputs, mapper, reducer):
    """ runs the provided mapper and reducer on the inputs """
    collector = defaultdict(list)
    
    for inp in inputs:
        for key, value in mapper(inp):
            collector[key].append(value)
    
    return [output for key, values in collector.items()
            for output in reducer(key, values)]

In [4]:
map_reduce(docs, wc_mapper, wc_reducer)


Out[4]:
[('dragon', 1), ('enter', 3), ('void', 1), ('collector', 1), ('the', 3)]

In [9]:
""" Matrix multplication using map-reduce """

def matrix_multiply_mapper(m, element):
    """ m is the common dimensions (cols of A, rows of B)
        element is a tuple (matrix name, i, j, value)
    """
    name, i, j, value = element
    
    if name == "A":
        #A_ij is the jth element in the sum for each C_ik
        for k in range(m):
            yield((i, k), (j, value))
    else:
        for k in range(m):
            yield((k, j), (i, value))


def matrix_multiply_reducer(m, key, indexed_values):
    results_by_index = defaultdict(list)
    for index, value in indexed_values:
        results_by_index[index].append(value)
    
    sum_product = sum(results[0] * results[1]
                      for results in results_by_index.values()
                      if len(results) == 2)
    
    if sum_product != 0.0:
        yield (key, sum_product)

In [10]:
A = [[3,2,0],[0,0,0]]
B = [[4,-1,0],[10,0,0],[0,0,0]]

entries = [("A",0,0,3),("A",0,1,2),("B",0,0,4),("B",0,1,-1),("B",1,0,10)]

from functools import partial
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)

map_reduce(entries, mapper, reducer)


Out[10]:
[((0, 1), -3), ((0, 0), 32)]

In [ ]: