In [1]:
from collections import Counter, defaultdict
import re
""" Let's use map-reduce to count some words! """
def tokenize(message):
""" Removes all punctuation and returns a simple set
of all the words in the given string (pushed to lower case)
"""
message = message.lower()
all_words = re.findall("[a-z0-9]+", message)
return set(all_words)
def wc_mapper(doc):
""" for each word in the doc, emis (word, 1) """
for word in tokenize(doc):
yield (word, 1)
def wc_reducer(word, counts):
""" sum the counts for a word """
yield (word, sum(counts))
def word_count(docs):
""" count the words in the input docs using map-reduce """
# place to store grouped values
collector = defaultdict(list)
for doc in docs:
for word, count in wc_mapper(doc):
collector[word].append(count)
return [output for word, counts in collector.items()
for output in wc_reducer(word, counts)]
In [2]:
docs = ['ENter the Collector', 'Enter the Dragon', 'Enter the Void']
word_count(docs)
Out[2]:
In [3]:
""" Now let's try a slightly more general map-reduce example """
def map_reduce(inputs, mapper, reducer):
""" runs the provided mapper and reducer on the inputs """
collector = defaultdict(list)
for inp in inputs:
for key, value in mapper(inp):
collector[key].append(value)
return [output for key, values in collector.items()
for output in reducer(key, values)]
In [4]:
map_reduce(docs, wc_mapper, wc_reducer)
Out[4]:
In [9]:
""" Matrix multplication using map-reduce """
def matrix_multiply_mapper(m, element):
""" m is the common dimensions (cols of A, rows of B)
element is a tuple (matrix name, i, j, value)
"""
name, i, j, value = element
if name == "A":
#A_ij is the jth element in the sum for each C_ik
for k in range(m):
yield((i, k), (j, value))
else:
for k in range(m):
yield((k, j), (i, value))
def matrix_multiply_reducer(m, key, indexed_values):
results_by_index = defaultdict(list)
for index, value in indexed_values:
results_by_index[index].append(value)
sum_product = sum(results[0] * results[1]
for results in results_by_index.values()
if len(results) == 2)
if sum_product != 0.0:
yield (key, sum_product)
In [10]:
A = [[3,2,0],[0,0,0]]
B = [[4,-1,0],[10,0,0],[0,0,0]]
entries = [("A",0,0,3),("A",0,1,2),("B",0,0,4),("B",0,1,-1),("B",1,0,10)]
from functools import partial
mapper = partial(matrix_multiply_mapper, 3)
reducer = partial(matrix_multiply_reducer, 3)
map_reduce(entries, mapper, reducer)
Out[10]:
In [ ]: