This is the simplest example I can think of which can not easily use the combiner optimization within MapReduce. If you come up with a simpler example, find mistakes/typos, or have questions email wolf@cs.cmu.edu.
Let's assume we have requests for a website and we want to average them.
In [1]:
dataset = [
('g.co', 2),
('g.co', 3),
('g.co', 4)
]
First off, averaging is simple (even in your head):
In [2]:
(2 + 3 + 4) / 3
Out[2]:
The problem with averaging these using combiners is that division is neither associative nor commutative. Although it should be deterministic!
Imagine that the first two records are located on one worker, and the last record is located on another worker.
Averaging using a combiner on the first worker would yield:
In [3]:
(2 + 3) / 2
Out[3]:
And, averaging using a combiner on the second worker would yield:
In [4]:
(4) / 1
Out[4]:
Then these two results from the combiners would be sent via RPC to the reducer worker.
The result of averaging at the reducer would be:
In [5]:
(2.5 + 4.0) / 2
Out[5]:
The way to fix this is not to compute partial averages with combining at map workers, but to keep a partial sum and count of records processed. MapReduce lets users specify custom combine functions which are different from the reduce function.
Let's take a look at an implementation that would work on this dataset. First, the map which is really just the identity function in this case (practical MR implementations also let you skip the map phase if you want):
In [6]:
def map(key, value):
return (key, value) # pretend return is actually `EmitIntermediate`
Now let's take a look at combine:
In [7]:
def combine(key, valueIterator):
n = 0
total = 0
for _, v in valueIterator:
n += 1
total += v
return (key, (n, total)) # see, no division!
combine simply keeps a count of the records its seen, computes a partial sum over them, and returns this partial result.
reduce looks a bit more complicated, but it is actually very simple just summing all the partial results:
In [8]:
def reduce(key, valueIterator):
n = 0
total = 0
for _, partialResult in valueIterator:
partialCount, partialTotal = partialResult
n += partialCount
total += partialTotal
return (key, total / n) # pretend that return is actually `Emit`
In [9]:
mappedDataset = [map(k,v) for k,v in dataset]
print(mappedDataset)
As expected, each entry is now converted into a partial result, although it is basically the identity function.
Next we run combine for the first worker (the one with the first two records):
In [10]:
combinedWorker1 = combine('g.co', [mappedDataset[0], mappedDataset[1]])
combinedWorker2 = combine('g.co', [mappedDataset[2]])
print(combinedWorker1)
print(combinedWorker2)
So, two partial results move on to the reducer representing 3 total records, and the reduce function produces:
In [11]:
finalResult = reduce('g.co', [combinedWorker1, combinedWorker2])
print(finalResult)
assert finalResult[1] == 3.0
Which represents the correct answer!
Hopefully you understand why it is important to verify that a reduce function can directly be used in the combine optimization.
This also demonstrates that there is a tension between implementing optimizations and affecting algorithm correctness. To achieve a useful distributed system performance optimization, you may have to rethink an algorithm.