Map Reduce Explained

Is a programming model invented by Google to perform distributed computation on huge amounts of data, it's heavily inspired by the functional programming map and reduce functions.

The process is divided in two steps:

  • A mapping step: the input is divided in subparts until it reaches its minimum processable particle which gets processed and the corresponding result gets returned.
  • A reduction step: each value processed by the mapping step goes back to the reduction step that joins the values together to a single result.

Practice Example

Example Problem without MapReduce

Our example problem is to compute the sum of the doubles of various numbers.

So if we have 1, 2, 3 as the input the result would be 2+4+6 -> 12


In [1]:
numbers = range(1000)

def doubled_sum(values):
    total = 0
    for n in numbers:
        total += n*2
    return total

print doubled_sum(numbers)


999000

Example Problem using MapReduce

As Python implements the functional programming paradigm too, it provides the functions required to implement the map-reduce paradigm builtin.

The foundation tools to implement map reduce are:

  • "mapper" which is in charge of mapping each input value to a corresponding output value
  • "reducer" which is in charge of merging multiple mapper outputs into a single output.

Both phases can be called multiple times (the output of a reducer can become the input of another reducer and a mapper can call other mappers).

Many MapReduce implementations also have additional phases like "combination" and "aggregation" which are executed after the mapper or the reducer to further cleanup their output.


In [2]:
numbers = range(1000)

def mapper(value):
    return value*2

def reducer(*values):
    return sum(values)
#first_step = map(mapper, numbers)
#print first_step
result = reduce(reducer, map(mapper, numbers))
print result


999000

The previous map-reduce in pure python implementation lacks of course the core feature of MapReduce: working parallely.

It's easy to understand that as each mapper and reducer works only on a subset of the data (its own input) it can work independently from the status of the other mappers and reducers. So the computation can proceed parallely.

Parallel Map Reduce in Pure Python

It's really easy to simulate a parallel map reduce in python by using the multiprocessing module


In [18]:
from itertools import islice
import multiprocessing


class ParallelMapReduce(object):
    def __init__(self, map_func, reduce_func, num_workers=None):
        self.num_workers = num_workers
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)
    
    def partition(self, n, iterable):
        i = iter(iterable)
        piece = list(islice(i, n))
        while piece:
            yield piece
            piece = list(islice(i, n))
    
    def __call__(self, inputs):
        values = self.pool.map(self.map_func, inputs)
        
        print '>>> MAPPED VALUES (%s values): %s, ...' % (len(values), str(values[:10]))

        values = self.pool.map(self.reduce_func, 
                               self.partition(len(values)//self.num_workers, values))
        print '>>> REDUCED VALUES', values

        return self.reduce_func(values)

The previous mapreduce implementation takes a Mapper and a Reducer and splits them across num_workers until it gets back the final result


In [19]:
numbers = range(1000)

def mapper(value):
    return value*2

def reducer(values):
    return sum(values)

mapreduce = ParallelMapReduce(mapper, reducer, 10)
print mapreduce(numbers)


>>> MAPPED VALUES (1000 values): [0, 2, 4, 6, 8, 10, 12, 14, 16, 18], ...
>>> REDUCED VALUES [9900, 29900, 49900, 69900, 89900, 109900, 129900, 149900, 169900, 189900]
999000

Distributed Map Reduce

Our parallel map reduce works parallely by using the multiple cores our computer provides, but it is unable to distribute the work on multiple computers. As map reduce is actually meant to work on milions of items it is vital to be able to distribute the work across multiple servers.

This is the reason why various MapReduce frameworks and toolkits were created, on Python environment there are some wildly used solutions:

Disco

Born at Nokia Research centers it is one of the solutions with the best tradeof in complexity and features. It relies on an ERlang core involved in dispatching jobs to the various workers and Python is considered the standard language used to implement workers (while they can be written in any language respecting the Disco protocol).

It also provides its own Distributed File System (DDFS) and Distributed Database (DiscoDB) which can be used to store data shared by the various workers.

OctoPy 

It's a pure Python solution that is aimed at being really quick to setup, doesn't provide a shared storage for data. Can be useful for really small problems where the cost of configuring a cluster can overcome the solution benefit.

Hadoop 

It's the standard de facto in MapReduce frameworks, heavily implemented in Java it's the most complex solution to setup and maintain. Through the Hadoop Streaming feature it is possible to run mappers and reducers in any programming language that can be called by a shell script. Hadoop Streaming it is used be various Python libraries to run python implemented mappers and reducers.

Like Disco it's a full fledged solution that provides also a Distributed File System where data can be stored for processing HDFS

Amazon EMR - Elastic MapReduce

EMR, Elastic Map Reduce is an Amazon provided Hadoop cluster in the cloud.

As setting up a complex MapReduce toolkit like Hadoop is a complex and long procedure, it is common to rely on services that provide the computation cluster on demand. During the BigDive course we are going to test our solutions locally, but we will see also how to run them on EMR. You will notice that while EMR has a huge kickstart cost, it quickly becames more convienient when data increases

It relies on Elastic Compute Cloud (EC2) to power up a set of computational instances that will became nodes of the Hadoop cluster and on Simple Storage Service (S3) to provide a distribuited file system to which each computation node can access data.

Kickstart Cost

The issue with using EMR is that you have a big kickstart cost caused by:

  • EC2 instances need to be created and fully booted to start processing data
  • Your data has to be uploaded to an S3 bucket for instances to be able to read it and process it

For this reason, as the kickstart cost of EMR takes minutes, we will test most examples locally using the multiprocessing module to emulate Hadoop nodes. But we will see that our code is perfectly able to run on EMR without changes, so that you can start using EMR as soon as your data gets big enough to justify the kickstart cost.

Hadoop Streaming

The feature that permits to run map reduce processes written in any language on hadoop is called Hadoop Streaming, Hadoop Streaming is a tool that takes as a parameter any runnable software and runs it as a mapper or as a reducer.

This makes possible to pass as mappers and reducers Python scripts, which will be started in the Hadoop Cluster.

For input and output a line based protocol (data separated by "\n") is used:

  • Each "line" of the input sent to the mapper has to be considered one value to map
  • Each "line" of the output from the mapper is condidered an emitted (key, value) pair where key and value are tab ("\t") separated

So the typical I/O in hadoop streaming looks like:

key1\tvalue1\nkey2\tvalue2\n


In [ ]: