Is a programming model invented by Google to perform distributed computation on huge amounts of data, it's heavily inspired by the functional programming map and reduce functions.
The process is divided in two steps:
In [1]:
numbers = range(1000)
def doubled_sum(values):
total = 0
for n in numbers:
total += n*2
return total
print doubled_sum(numbers)
As Python implements the functional programming paradigm too, it provides the functions required to implement the map-reduce paradigm builtin.
The foundation tools to implement map reduce are:
Both phases can be called multiple times (the output of a reducer can become the input of another reducer and a mapper can call other mappers).
Many MapReduce implementations also have additional phases like "combination" and "aggregation" which are executed after the mapper or the reducer to further cleanup their output.
In [2]:
numbers = range(1000)
def mapper(value):
return value*2
def reducer(*values):
return sum(values)
#first_step = map(mapper, numbers)
#print first_step
result = reduce(reducer, map(mapper, numbers))
print result
The previous map-reduce in pure python implementation lacks of course the core feature of MapReduce: working parallely.
It's easy to understand that as each mapper and reducer works only on a subset of the data (its own input) it can work independently from the status of the other mappers and reducers. So the computation can proceed parallely.
It's really easy to simulate a parallel map reduce in python by using the multiprocessing module
In [18]:
from itertools import islice
import multiprocessing
class ParallelMapReduce(object):
def __init__(self, map_func, reduce_func, num_workers=None):
self.num_workers = num_workers
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)
def partition(self, n, iterable):
i = iter(iterable)
piece = list(islice(i, n))
while piece:
yield piece
piece = list(islice(i, n))
def __call__(self, inputs):
values = self.pool.map(self.map_func, inputs)
print '>>> MAPPED VALUES (%s values): %s, ...' % (len(values), str(values[:10]))
values = self.pool.map(self.reduce_func,
self.partition(len(values)//self.num_workers, values))
print '>>> REDUCED VALUES', values
return self.reduce_func(values)
The previous mapreduce implementation takes a Mapper and a Reducer and splits them across num_workers until it gets back the final result
In [19]:
numbers = range(1000)
def mapper(value):
return value*2
def reducer(values):
return sum(values)
mapreduce = ParallelMapReduce(mapper, reducer, 10)
print mapreduce(numbers)
Our parallel map reduce works parallely by using the multiple cores our computer provides, but it is unable to distribute the work on multiple computers. As map reduce is actually meant to work on milions of items it is vital to be able to distribute the work across multiple servers.
This is the reason why various MapReduce frameworks and toolkits were created, on Python environment there are some wildly used solutions:
Born at Nokia Research centers it is one of the solutions with the best tradeof in complexity and features. It relies on an ERlang core involved in dispatching jobs to the various workers and Python is considered the standard language used to implement workers (while they can be written in any language respecting the Disco protocol).
It also provides its own Distributed File System (DDFS) and Distributed Database (DiscoDB) which can be used to store data shared by the various workers.
It's a pure Python solution that is aimed at being really quick to setup, doesn't provide a shared storage for data. Can be useful for really small problems where the cost of configuring a cluster can overcome the solution benefit.
It's the standard de facto in MapReduce frameworks, heavily implemented in Java it's the most complex solution to setup and maintain. Through the Hadoop Streaming feature it is possible to run mappers and reducers in any programming language that can be called by a shell script. Hadoop Streaming it is used be various Python libraries to run python implemented mappers and reducers.
Like Disco it's a full fledged solution that provides also a Distributed File System where data can be stored for processing HDFS
EMR, Elastic Map Reduce is an Amazon provided Hadoop cluster in the cloud.
As setting up a complex MapReduce toolkit like Hadoop is a complex and long procedure, it is common to rely on services that provide the computation cluster on demand. During the BigDive course we are going to test our solutions locally, but we will see also how to run them on EMR. You will notice that while EMR has a huge kickstart cost, it quickly becames more convienient when data increases
It relies on Elastic Compute Cloud (EC2) to power up a set of computational instances that will became nodes of the Hadoop cluster and on Simple Storage Service (S3) to provide a distribuited file system to which each computation node can access data.
The issue with using EMR is that you have a big kickstart cost caused by:
For this reason, as the kickstart cost of EMR takes minutes, we will test most examples locally using the multiprocessing module to emulate Hadoop nodes. But we will see that our code is perfectly able to run on EMR without changes, so that you can start using EMR as soon as your data gets big enough to justify the kickstart cost.
The feature that permits to run map reduce processes written in any language on hadoop is called Hadoop Streaming, Hadoop Streaming is a tool that takes as a parameter any runnable software and runs it as a mapper or as a reducer.
This makes possible to pass as mappers and reducers Python scripts, which will be started in the Hadoop Cluster.
For input and output a line based protocol (data separated by "\n") is used:
So the typical I/O in hadoop streaming looks like:
key1\tvalue1\nkey2\tvalue2\n
In [ ]: