In this Notebook, we will demonstrate how to run a Hadoop Streaming Map/Reduce job in a docker container. Our setup will be using a single Hadoop node, which will not be very fast, especially when compared to simply running the map/reduce Python code directly. However, the full Hadoop process will be demonstrated, including the use of the Hadoop file system (HDFS) and the Hadoop Streaming process model. Before proceeding with this Notebook, be sure to (at least start to) download the SequenceIQ Hadoop Docker container.
Typically, basic Hadoop is operated on a large cluster that runs both Hadoop and HDFS, although with the development of Yarn, more diverse workflows are now possible. In this Notebook, we only explore the basic Hadoop components of Hadoop and HDFS, which work together to run code on the nodes that hold the relevant data in order to maximize throughput. Other resources exist to learn more about Yarn and other Hadoop workflows. The basic Hadoop task is a map/reduce process, where a map process analyzes data and creates a sequential list of key-value pairs (like a Python dictionary). The Hadoop process model sorts the output of the mappers before passing the results to a reduce process. The reduce process combines the key-value pairs to generate final output. The prototype map/reduce example is the word-count problem, where a large corpus is analyzed to quantify how many times each word appears (one can quickly see how this model can be extended to analyze website as opposed to texts).
Thus to complete a map/reduce task in Hadoop we need to complete the following tasks:
The rest of this Notebook will demonstrate how to perform each of these tasks. We first will create the two Python programs, download a sample text, and also download the hadoop-streaming jar file into a shared local directory from within our course Docker container. Once these steps are complete, we will start our Hadoop Docker container to complete the rest of the process.
In the next code cell, we start the process by running a shell script that creates (and deletes first if it exists) the shared directory that will hold the Python codes and data for our Map/Reduce Hadoop project.
In [1]:
%%bash
#!/usr/bin/env bash
# A Bash Shell Script to delete the Hadoop diorectory if it exists, afterwhich
# make a new Hadoop directory
# Our directory name
DIR=/notebooks/i2ds/hadoop
# Delete if exists
if [ -d "$DIR" ]; then
rm -rf "$DIR"
fi
# Now make the directory
mkdir "$DIR"
The first Python code we will write is the map Python program. This program simply reads data from STDIN, tokenizes each line into words and outputs each word on a separate line along with a count of one. Thus our map program generates a list of word tokens as the keys and the value is always one.
The second Python program we write is our reduce program. In this code,
we read key-value pairs from STDIN and use the fact that the Hadoop
process first sorts all key-value pairs before sending the map output to
the reduce process to accumulate the cumulative count of each word. The
following code could easily be made more sophisticated by using yield
statements and iterators, but for clarity we use the simple approach of
tracking when the current word becomes different than the previous word
to output the key-cumulative count pairs.
In [3]:
%%writefile /notebooks/i2ds/hadoop/reducer.py
#!/usr/bin/env python3
import sys
# We explicitly define the word/count separator token.
sep = '\t'
# We open STDIN and STDOUT
with sys.stdin as fin:
with sys.stdout as fout:
# Keep track of current word and count
cword = None
ccount = 0
word = None
# For every line in STDIN
for line in fin:
# Strip off leading and trailing whitespace
# Note by construction, we should have no leading white space
line = line.strip()
# We split the line into a word and count, based on predefined
# separator token.
# Note we haven't dealt with punctuation.
word, scount = line.split('\t', 1)
# We wil assume count is always an integer value
count = int(scount)
# word is either repeated or new
if cword == word:
ccount += count
else:
# We have to handle first word explicitly
if cword != None:
fout.write("{0:s}{1:s}{2:d}\n".format(cword, sep, ccount))
# New word, so reset variables
cword = word
ccount = count
else:
# Output final word count
if cword == word:
fout.write("{0:s}{1:s}{2:d}\n".format(word, sep, ccount))
The Hadoop Docker container we will use in this course by default does
not support Hadoop streaming. In order to use Python codes with Hadoop,
however, we must have Hadoop Streaming. Fortunately, there is a simple
solution, we just have to grab the appropriate Hadoop streaming jar file
from the appropriate Hadoop release (our container is Hadoop version
2.6.0). We do this below with wget, where we now explicitly specify the
appropriate directory path and output filename (we use the much shorter
hs.jar
which will be asier to enter at the command line).
In [4]:
# We need to explicitly grab hadoop streaming jar file.
!wget --output-document=/notebooks/i2ds/hadoop/hs.jar \
http://central.maven.org/maven2/org/apache/hadoop/hadoop-streaming/2.6.0/hadoop-streaming-2.6.0.jar
Our simple map/reduce programs require text data to operate. While there are a number of possible options, for this example we can grab a free book from Project Gutenberg:
wget --directory-prefix=/notebooks/i2ds/hadoop/ --output-document=book.txt \
http://www.gutenberg.org/cache/epub/4300/pg4300.txt`
In this case, we have grabbed the full text of the novel Ulysses, by James Joyce.
In [5]:
# Grab a book to process
!wget --output-document=/notebooks/i2ds/hadoop/book.txt \
http://www.gutenberg.org/cache/epub/4300/pg4300.txt
In [6]:
!chmod u+x /notebooks/i2ds/hadoop/mapper.py
In [7]:
!chmod u+x /notebooks/i2ds/hadoop/reducer.py
To test out the map Python code, we can run the Python mapper.py
code
and specify that the code should redirect STDIN to read the book text
data. This is done in the following code cell, we pipe the output into
the Unix head
command in order to restrict the output, which would be
one line per word found in the book text file. In the second code cell,
we next pipe the output of mapper.py
into the Unix sort
command,
which is done automatically by Hadoop. To see the result of this
operation, we next pipe the result into the Unix uniq
command to count
duplicates, pipe this result into a new sort routine to sort the output
by the number of occurrences of a word, and finally display the last few
lines with the Unix tail
command to verify the program is operating
correctly.
In [8]:
!/notebooks/i2ds/hadoop/mapper.py < book.txt | wc -l
In [9]:
!/notebooks/i2ds/hadoop/mapper.py < book.txt | sort -n -k 1 | \
uniq -c -d | sort -n -k 1 | tail -10
To test out the reduce Python code, we run the previous code cell, but
rather than piping the result into the Unix tail
command, we pipe the
result of the sort command into the Python reducer.py
code. This
simulates the Hadoop model, where the map output is key sorted before
being passed into the reduce process. First, we will simply count the
number of lines displayed by the reduce process, which will indicate the
number of unique word tokens in the book. Next, we will sort the
output by the number of times each word token appears and display the
last few lines to compare with the previous results.
In [10]:
!/notebooks/i2ds/hadoop/mapper.py < book.txt | sort -n -k 1 | \
/notebooks/i2ds/hadoop/reducer.py | wc -l
In [11]:
!/notebooks/i2ds/hadoop/mapper.py < book.txt | sort -n -k 1 | \
/notebooks/i2ds/hadoop/reducer.py | sort -n -k 2 | tail -10
To use Hadoop in this course, we will need to start up our Hadoop Docker
container. You should have already completed the tasks outlined in the
Preface IPython Notebook regarding pulling and
retagging the SequenceIQ Ubuntu Hadoop Docker container.Once those steps
are completed, we can start a new Hadoop Docker container by running the
container. To use the files we created or downloaded earlier in this
notebook, we need to share folders between the host OS and our Hadoop
container, which we do with the -v
flag. You will need to change the
host path to match your personal laptop. We aso run the
/etc/bootstrap.sh
script, which will properly initialize the Hadoop
environment, and the -bash
flag indicates that we wish to be given
access to the Hadoop container via a Bash shell.
docker run -it -v /Users/rb/i2ds:/i2ds hadoop /etc/bootstrap.sh -bash
When this command is run, a series of messages are displayed that indicate the status of different server daemons starting up, including an SSH server, a namenode, a datanode, secondary nameodes, yarn daemons, a resourcemanager, and a nodemanager, as shown in the following screenshot.
If these message are all displayed and you are left with a prompt that
resembles the string root@185b748bbfa7:/#
you will have successfully
started the Hadoop Docker container. As a final verification, you can
enter the following command:
echo $HADOOP_PREFIX
which should display /usr/local/hadoop
.
At this point, we first need to change into the directory where we
created our Python mapper and reducer programs, and where we downloaded
the hadoop-streaming jar file and the sample book to analyze. In the
Hadoop Docker container, enter cd i2ds/hadoop
, which will change our
current working directory to the appropriate location, which is
indicated by a change in the shell prompt to /i2ds/hadoop#
.
Before proceeding, we should test our Python codes, but now within the Hadoop Docker container, which will have a different python environment than our class container. We can easily do this by modifying our earlier test to now use the correct path in the Hadoop Docker container:
/i2ds/hadoop/mapper.py < book.txt | sort -n -k 1 | \
/i2ds/hadoop/reducer.py | sort -n -k 2 | tail -10
Doing this, however, now gives an UnicodeDecodeError
. The simplest
solution is to explicitly state that the Python interpreter should use
utf-8
for all IO operations, which we can do by setting the Python
environment variable PYTHONIOENCODING
to utf-8
. We do this by
entering the following command at the container prompt:
export PYTHONIOENCODING=utf-8
After setting this environment variable, the previous Unix command string will now produce the correct output.
At this point, we need to move our data to process into the Hadoop
Distributed File system, or HDFS. HDFS is a a file system that is designed
to work effectively with the Hadoop environment. In a typical Hadoop
cluster, files would be broken up and distributed to different Hadoop
nodes. The processing is moved to the data in this model, which can
produce high throughput, especially for map/reduce programming tasks.
However, this means you can not simply move around the HDFS file system
in the same manner as a traditional Unix file system, since the
components of a particular file are not all col-located. Instead, we
must use the HDFS file system interface, which is invoked by
using $HADOOP_PREFIX/bin/hdfs
. Running this command by itself in your
Hadoop Docker container will list the available commands, as shown in
the following screenshot.
The standard command we will use is dfs
which runs a filesystem
command on the HDFS file system that is supported by Hadoop. The list
of supported dfs
commands is extensive, and mirrors many of the
traditional Unix file systems commands. The full listing can be obtained
by entering $HADOOP_PREFIX/bin/hdfs dfs
at the prompt in our Hadoop
Docker container. Some of the more useful commands for this class
include:
cat
: copies the source path to STDOUT.
count -h
: counts the number of directories, files and byts under the
path specified. With the -h
flag, the output is displayed in a
human-readable format.
expunge
: empties the trash. By default, files and directories are
not removed from HDFS with the rm
command, they are simply moved to the
trash. This can be useful when HDFS supplies a Name node is in safe
mode.
message.
ls
: lists the contents of the indicated directory in HDFS.
-mkdir -p
: creates a new directory in HDFS at the specified
location. With the -p
flag any parent directory specified in the full
path will also be created as necessary.
put
: copies indicated file(s) from local host file system into the
specified path in HDFS.
rm -f -r
: delete the indicated file or directory. With the -r -f
flags, the command will not display any message and any will delete any
files or directories under the indicated directory. The -skipTrash
flag should be used to delete the indicated resource immediately.
tail
: display the last kilobyte of the indicated file to STDOUT.
At this point, we first need to create an directory to hold the input
and output of our Hadoop task. We will create a new directory called
wc
with a subdirectory called in
to hold the input data for our
Hadoop task. Second, we will need to copy the book text file into this
new HDFS directory. This means we will need to run the following two
commands at the prompt in our Hadoop Docker container:
$HADOOP_PREFIX/bin/hdfs dfs -mkdir -p wc/in
$HADOOP_PREFIX/bin/hdfs dfs -put book.txt wc/in/book.txt
The following screenshot displays the result of running these two
commands, as well as the dfs -ls
command to display the contents of
our new HDFS directory, and the dfs -count
command to show the size of
the directory contents.
We are now ready tio actually run our Python codes via Hadoop Streaming.
The main command to perform this task is $HADOOP_PREFIX/bin/hadoop jar
hs.jar
, where hs.jar
is the hadoop-streaming jar file we downloaded
earlier in this Notebook. Running this command will display a usage
message that is not extremely useful, supplying the -help
flag will
provide more a more useful summary. For our map/reduce Python example to
run successfully, we will need to specify six flags:
-files
: a comma separated list of files to be copied to the Hadoop cluster.-input
: the HDFS input file(s) to be used for the map task.-output
: the HDFS output directory, used for the reduce task.-mapper
: the command to run for the map task.-reducer
: the command to run for the reduce task.-cmdenv
: set environment variables for a Hadoop streaming task.Given our previous setup, we will run the full command as follows:
$HADOOP_PREFIX/bin/hadoop jar hs.jar -files mapper.py,reducer.py -input wc/in \
-output wc/out -mapper mapper.py -reducer reducer.py -cmdenv PYTHONIOENCODING=utf-8
When this command is run, a series of messages will be displayed to the screen (STDOUT) showing the progress of our Hadoop Streaming task. At the end of the stream of information messages will be a statement indicating the location of the output directory as shown below:
In order to view the results of our Hadoop Streaming task, we must use HDFS DFS commands to examine the directory and files generated by our Python Map/Reduce programs. The following list of DFS commands might prove useful to view the results of this map/reduce job.
$HADOOP_PREFIX/bin/hdfs dfs -ls wc
$HADOOP_PREFIX/bin/hdfs dfs -ls wc/out
$HADOOP_PREFIX/bin/hdfs dfs -count -h wc/out/part-00000
$HADOOP_PREFIX/bin/hdfs dfs -tail wc/out/part-00000
To compare this map/reduce Hadoop Streaming task output to our previous
output, we can use the $HADOOP_PREFIX/bin/hdfs dfs -cat
wc/out/part-00000 | sort -n -k 2 | tail -10
, which should be executed at
a Hadoop Docker container shell prompt. This code listing provides the
succesful output of this command, following a succesful map/reduce
processing task.
/i2ds/hadoop# $HADOOP_PREFIX/bin/hdfs dfs -cat wc/out/part-00000 | \
sort -n -k 2 | tail -10
with 2391
I 2432
he 2712
his 3035
in 4606
to 4787
a 5842
and 6542
of 8127
the 13600
Following the succesful run of our map/reduce Python programs, we have
created a new directory wc/out
, which contains two files. If we wish
to rerun this Hadoop Streaming map/reduce task, we must either specify a
different output directory, or else we must clean up the results of the
previous run. To remove the output directory, we can simply use the DFS
-rm -r -f -skipTrash wc/out
command, which will immediately delete the
wc/out
directory. The successful completion of this command is
indicated by Hadoop, and this can also be verified by listing the
contents of the wc
directory as shown in the following screenshot.