MrJob is a MapReduce framework which is able to run mapper and reducers on:
It's a powerfull abstraction over the underlying cluster which takes care for you to save the data on the file system used by the underlying cluster and starting the processing nodes. Your python code won't change in case you run it locally, on hadoop or on EMR.
MrJob can be installed using
$ pip install mrjob
in your python environment.
As Hadoop Streaming I/O protocol is line based you usually need to take care of properly splitting your data in lines and to properly escape any newline or tab character inside the data.
This is something MrJob will do for you through the use of Protocols, whenever you declare a MrJob Process you can tell it to encode/decode data using the RawValueProtocol
which uses the standard Hadoop protocol or the JSONProtocol
which encoded the values to JSON to permit representing something more complex than plain text.
MrJob provides far more steps than the classical Map/Reduce steps.
The more classic step is the combiner which gets executed after the mapper and before the reducer. This can combine multiple values emitted by the same mapper into a single output. It is usually similar to a reducer but it runs only on a specific mapper instead of getting data from any mapper.
The full list of all the available steps is:
MrJob processes are created by declaring a class that inherits from mrjob.job.MRJob
and declares methods named after the MapReduce steps. Each method is required to emit a key, value pair through the use of yield
expression.
This makes so that each MapReduce steps doesn't have to remember the values it has to emit, it is a generator that sends values to the next step as soon as they are emitted and goes on emitting new values.
Due to the way Hadoop Streaming works each MrJob process must be a standalone python module which can be started by command line. This is achieved in Python using the
if __name__ == '__main__':
do_something()
which makes so that the module is an executable (__name__
is main only when the .py file is started from command line). Whenever the script is started as a standalone executable and not imported by another python software do_something()
will trigger.
For MrJob to work instead of do something we will have MrJobProcessName.run()
which actually starts the MrJob process.
The reason for this is that your .py file has to be copied to the Hadoop Cluster to be executed and then it must be possible to run it as a standalone software that gets started by the Hadoop Streaming.
In [ ]:
from mrjob.job import MRJob
class MRWordFreqCount(MRJob):
def mapper(self, _, line):
for word in line.split():
yield word.lower(), 1
def reducer(self, word, counts):
yield word, sum(counts)
if __name__ == '__main__':
try:
MRWordFreqCount.run()
except TypeError:
print 'MrJob cannot work inside iPython Notebook as it is not saved as a standalone .py file'
Supponing you saved the previous scripts as wordcount.py you can start it using:
$ python 00_wordcount.py lorem.txt
Where lorem.txt is the input of the data (in this case plain text):
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Quisque molestie lacus a iaculis tempus. Nam lorem nulla, viverra non pulvinar ut, fermentum et tortor. Cras vitae libero sed purus venenatis posuere. Proin commodo risus augue, vitae suscipit lectus accumsan sit amet. Praesent eu erat sem. Pellentesque interdum porta libero, et ultrices nunc eleifend sit amet. In in mauris nec elit ullamcorper ultrices at ac ante. Suspendisse potenti. Aenean eu nisl in ante adipiscing imperdiet. Ut pulvinar lectus quis feugiat adipiscing.
Nunc vulputate mauris congue diam ultrices aliquet. Nulla pharetra laoreet est quis vestibulum. Quisque feugiat pharetra sagittis. Phasellus nulla massa, sodales a suscipit blandit, facilisis eu augue. Cras mi massa, ullamcorper nec tristique at, convallis quis eros. Mauris non fermentum lacus, vitae tristique tellus. In volutpat metus augue, nec laoreet ante hendrerit vitae. Vivamus id lacus nec orci tristique vulputate.
When you run the mrjob process you will get something like:
creating tmp directory /var/folders/js/ykgc_8hj10n1fmh3pzdkw2w40000gn/T/wordcount.amol.20140611.163251.274075
writing to /var/folders/js/ykgc_8hj10n1fmh3pzdkw2w40000gn/T/wordcount.amol.20140611.163251.274075/step-0-mapper_part-00000
Counters from step 1:
(no counters found)
writing to /var/folders/js/ykgc_8hj10n1fmh3pzdkw2w40000gn/T/wordcount.amol.20140611.163251.274075/step-0-mapper-sorted
> sort /var/folders/js/ykgc_8hj10n1fmh3pzdkw2w40000gn/T/wordcount.amol.20140611.163251.274075/step-0-mapper_part-00000
writing to /var/folders/js/ykgc_8hj10n1fmh3pzdkw2w40000gn/T/wordcount.amol.20140611.163251.274075/step-0-reducer_part-00000
Counters from step 1:
(no counters found)
Moving /var/folders/js/ykgc_8hj10n1fmh3pzdkw2w40000gn/T/wordcount.amol.20140611.163251.274075/step-0-reducer_part-00000 -> /var/folders/js/ykgc_8hj10n1fmh3pzdkw2w40000gn/T/wordcount.amol.20140611.163251.274075/output/part-00000
And final output will be
Streaming final output from /var/folders/js/ykgc_8hj10n1fmh3pzdkw2w40000gn/T/wordcount.amol.20140611.163251.274075/output
"a" 2
"ac" 1
"accumsan" 1
"adipiscing" 3
"aenean" 1
"aliquet." 1
Note that MrJob is taking care of copying the input and outputs of the various steps to temporary directories (in this case on my own computer as I'm not starting it on an hadoop cluster), and then returns the output.
You can note that the output is encoded as by Hadoop Streaming protocol with the word and the count separated by tab as those are the key and value emitted by our reducer.
To save automatically your result inside a file you can simple use command bash command to redirect your output:
$ python 00_wordcount.py lorem.txt > result
We currently counted the words, which is the same example used to explain MapReduce, what if we want to do something else where the data we emit doesn't come from the input itself?
MapReduce doesn't make any assumption on the data you emit, nor the key nor the value have to be correlated to the input in any way. Actually each mapper can emit any number of data too.
So if we want to get some statistics on the text, like words, characters and phrases we can easily achieve it
In [ ]:
from mrjob.job import MRJob
class MRTextInfo(MRJob):
def mapper(self, _, line):
for phrase in line.split('.'):
yield 'phrases', 1
for word in phrase.split():
yield 'words', 1
yield 'characters', len(word)
def reducer(self, key, counts):
yield key, sum(counts)
if __name__ == '__main__':
try:
MRTextInfo.run()
except TypeError:
print 'MrJob cannot work inside iPython Notebook as it is not saved as a standalone .py file'
Final output will be something like:
Creating temp directory /var/folders/_x/g5brlyv963vclshf_kffdm440000gn/T/01_text_info.alexcomu.20160610.150420.364592
Running step 1 of 1...
Streaming final output from /var/folders/_x/g5brlyv963vclshf_kffdm440000gn/T/01_text_info.alexcomu.20160610.150420.364592/output...
"characters" 1258
"phrases" 21
"words" 144
There are cases when it is convenient to run multiple MapReduce steps on a single input to actually provide the expected output.
If you want to have multiple steps instead of the plain mapper, reducer steps you can specify them in MrJob using steps()
method which will be called by MrJob to know the actual MapReduce Steps to run in place of the standard ones.
We are going to create a multistep job that gets the most frequent word in the text and only returns that one. It's clear that the first loop through MapReduce will be our previous word frequency counter, then we need to filter the most frequent word out of all the counted ones.
In [ ]:
from mrjob.job import MRJob
from mrjob.step import MRStep
class MRMostFreqWord(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_wordcount,
reducer=self.reducer_wordcount),
MRStep(mapper=self.mapper_freq,
reducer=self.reducer_freq),
MRStep(mapper=self.mapper_most,
reducer=self.reducer_most)
]
def mapper_wordcount(self, _, line):
for word in line.split():
if len(word)>2:
yield word.lower(), 1 # return only word with at least 3 letters
def reducer_wordcount(self, word, counts):
yield word, sum(counts) # Sum occurrences of each word to get frequency
def mapper_freq(self, word, total):
if total > 1: # Only get words that appear more than once
yield total, word # Group them by frequency
def reducer_freq(self, total, words):
yield total, words.next() # .next() gets the first element, so we emit only one word for each frequency
def mapper_most(self, freq, word):
yield 'most_used', [freq, word] # Group all the words together in a list of (frequency, word) tuples
def reducer_most(self, _, freqs):
yield 'most_used', max(freqs) # Get only the most frequent word
if __name__ == '__main__':
try:
MRMostFreqWord.run()
except TypeError:
print 'MrJob cannot work inside iPython Notebook as it is not saved as a standalone .py file'
Output of our script when launched against the lorem.txt file will be:
"most_used" [4, "nec"]
Which is actually a conjuction, so our rule of filtering words shorter than 3 characters didn't remove all the conjunctions, we can try to run again the script using a better filter (using for example regular expressions)
In [ ]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
## MultiStep Example
WORD_REGEXP = re.compile(r"[\w']+")
class MRMostFreqWord(MRJob):
def steps(self):
return [
MRStep(mapper=self.mapper_wordcount,
reducer=self.reducer_wordcount),
MRStep(mapper=self.mapper_freq,
reducer=self.reducer_freq),
MRStep(mapper=self.mapper_most,
reducer=self.reducer_most)
]
def mapper_wordcount(self, _, line):
words = WORD_REGEXP.findall(line)
for w in words:
if len(w)>3:
yield w.lower(), 1
#for word in line.split():
# if len(word)>4:
# yield word.lower(), 1 # return only word with at least 3 letters
def reducer_wordcount(self, word, counts):
yield word, sum(counts) # Sum occurrences of each word to get frequency
def mapper_freq(self, word, total):
if total > 1: # Only get words that appear more than once
yield total, word # Group them by frequency
def reducer_freq(self, total, words):
yield total, words.next() # .next() gets the first element, so we emit only one word for each frequency
def mapper_most(self, freq, word):
yield 'most_used', [freq, word] # Group all the words together in a list of (frequency, word) tuples
def reducer_most(self, _, freqs):
yield 'most_used', max(freqs) # Get only the most frequent word
if __name__ == '__main__':
try:
MRMostFreqWord.run()
except TypeError:
print 'MrJob cannot work inside iPython Notebook as it is not saved as a standalone .py file'
In this case we will have a better result:
"most_used" [4, "vitae"]
From the lorem.txt file calculate how many words starts for each letter of the alphabet.
From the lorem.txt file calculate how many words starts for each letter of the alphabet and print out the max and the min.
Write a MapReduce job that report the most frequent word grouped by word length.
ES:
5 Chars -> hello -> 8 occurrences
3 Chars -> cat -> 4 occurrences
2 Chars -> at -> 7 occurrences
While starting jobs from the command line is a good way to test them, it is often the case that you have to visualize the resulting data. So we need to be able to start the MapReduce job from our software, get the output and send it to the HTML layer for visualization.
This can be achieved using MrJob Runners, which permit to start the execution programmatically and read the output. Keep in mind that, as the MrJob process must live in a separate .py file, the runner must be kept separate from it. So the runner will rely in our application, while the actual MrJob class will be in a separate module that can be sent to Hadoop for execution.
Our runner for the WordFreqCount job will look like (check the folder 06_runner):
In [ ]:
from wordcount import MRWordFreqCount
mr_job = MRWordFreqCount()
mr_job.stdin = open('lorem.txt').readlines()
with mr_job.make_runner() as runner:
runner.run()
for line in runner.stream_output():
key, value = mr_job.parse_output_line(line)
print 'Word:', key, 'Count:', value
The stdin
parameter of the job is the actual input it will receive.
As Hadoop inputs are line separated we need to pass a list of strings, one for each line.
Being our data already text on multiple lines we can just read the lines in the text file.
Then the runner will provide back the output through the stream_output
function, which is a generator that returns a single HadoopStreaming output line. This has to be parsed according to the communication protocol, so we need to call parse_output_line
to get back the emitted key and value.
Then we have the emitted values and we can use them as needed. In this case we just print them. Note that differently from when you run MrJob manually, in this case there is no output apart from our own prints.
So if you start the runner without printing anything it will just do nothing.
I already downloaded the dataset inside the folder /PATH/TO/MRJOB/ROOT/examples/_dataset
. Unzip the folder and let's start!
We have (from u.info
file):
943 users
1682 films
100000 ratings
We'll use the file u.data which contains (splitted by TAB):
user id | film id | rating | timestamp
299 144 4 877881320
In [ ]: