Introduction to Hadoop

Professor Robert J. Brunner

</DIV>


Introduction

In this Notebook, we will demonstrate how to run a Hadoop Streaming Map/Reduce job in a docker container. Our setup will be using a single Hadoop node, which will not be very fast, especially when compared to simply running the map/reduce Python code directly. However, the full Hadoop process will be demonstrated, including the use of the Hadoop file system (HDFS) and the Hadoop Streaming process model. Before proceeding with this Notebook, be sure to (at least start to) download the SequenceIQ Hadoop Docker container.

Typically, basic Hadoop is operated on a large cluster that runs both Hadoop and HDFS, although with the development of Yarn, more diverse workflows are now possible. In this Notebook, we only explore the basic Hadoop components of Hadoop and HDFS, which work together to run code on the nodes that hold the relevant data in order to maximize throughput. Other resources exist to learn more about Yarn and other Hadoop workflows. The basic Hadoop task is a map/reduce process, where a map process analyzes data and creates a sequential list of key-value pairs (like a Python dictionary). The Hadoop process model sorts the output of the mappers before passing the results to a reduce process. The reduce process combines the key-value pairs to generate final output. The prototype map/reduce example is the word-count problem, where a large corpus is analyzed to quantify how many times each word appears (one can quickly see how this model can be extended to analyze website as opposed to texts).

Thus to complete a map/reduce task in Hadoop we need to complete the following tasks:

  1. create a Map program
  2. create a Reduce program
  3. obtain a data set to analyze
  4. load our data into HDFS
  5. execute our map/reduce program by using Hadoop

The rest of this Notebook will demonstrate how to perform each of these tasks. We first will create the two Python programs, download a sample text, and also download the hadoop-streaming jar file into a shared local directory from within our course Docker container. Once these steps are complete, we will start our Hadoop Docker container to complete the rest of the process.

In the next code cell, we start the process by running a shell script that creates (and deletes first if it exists) the shared directory that will hold the Python codes and data for our Map/Reduce Hadoop project.



In [1]:
%%bash
#!/usr/bin/env bash
# A Bash Shell Script to delete the Hadoop diorectory if it exists, afterwhich
# make a new Hadoop directory

# Our directory name
DIR=/notebooks/i2ds/hadoop

# Delete if exists
if [ -d "$DIR" ]; then
    rm -rf "$DIR"
fi

# Now make the directory
mkdir "$DIR"

Mapper: Word Count

The first Python code we will write is the map Python program. This program simply reads data from STDIN, tokenizes each line into words and outputs each word on a separate line along with a count of one. Thus our map program generates a list of word tokens as the keys and the value is always one.



In [ ]:
%%writefile /notebooks/i2ds/hadoop/mapper.py
#!/usr/bin/env python3

import sys

# We explicitly define the word/count separator token.
sep = '\t'

# We open STDIN and STDOUT
with sys.stdin as fin:
    with sys.stdout as fout:
    
        # For every line in STDIN
        for line in fin:
        
            # Strip off leading and trailing whitespace
            line = line.strip()
            
            # We split the line into word tokens. Use whitespace to split.
            # Note we don't deal with punctuation.
            
            words = line.split()
            
            # Now loop through all words in the line and output

            for word in words:
                fout.write("{0}{1}1\n".format(word, sep))

Reducer: Word Count

The second Python program we write is our reduce program. In this code, we read key-value pairs from STDIN and use the fact that the Hadoop process first sorts all key-value pairs before sending the map output to the reduce process to accumulate the cumulative count of each word. The following code could easily be made more sophisticated by using yield statements and iterators, but for clarity we use the simple approach of tracking when the current word becomes different than the previous word to output the key-cumulative count pairs.



In [3]:
%%writefile /notebooks/i2ds/hadoop/reducer.py
#!/usr/bin/env python3

import sys

# We explicitly define the word/count separator token.
sep = '\t'

# We open STDIN and STDOUT
with sys.stdin as fin:
    with sys.stdout as fout:
    
        # Keep track of current word and count
        cword = None
        ccount = 0
        word = None
   
        # For every line in STDIN
        for line in fin:
        
            # Strip off leading and trailing whitespace
            # Note by construction, we should have no leading white space
            line = line.strip()
            
            # We split the line into a word and count, based on predefined
            # separator token.
            # Note we haven't dealt with punctuation.
            
            word, scount = line.split('\t', 1)
            
            # We wil assume count is always an integer value
            
            count = int(scount)
            
            # word is either repeated or new
            
            if cword == word:
                ccount += count
            else:
                # We have to handle first word explicitly
                if cword != None:
                    fout.write("{0:s}{1:s}{2:d}\n".format(cword, sep, ccount))
                
                # New word, so reset variables
                cword = word
                ccount = count
        else:
            # Output final word count
            if cword == word:
                fout.write("{0:s}{1:s}{2:d}\n".format(word, sep, ccount))


Writing /notebooks/rppds/hadoop/reducer.py

Hadoop Streaming

The Hadoop Docker container we will use in this course by default does not support Hadoop streaming. In order to use Python codes with Hadoop, however, we must have Hadoop Streaming. Fortunately, there is a simple solution, we just have to grab the appropriate Hadoop streaming jar file from the appropriate Hadoop release (our container is Hadoop version 2.6.0). We do this below with wget, where we now explicitly specify the appropriate directory path and output filename (we use the much shorter hs.jar which will be asier to enter at the command line).



In [4]:
# We need to explicitly grab hadoop streaming jar file.
!wget --output-document=/notebooks/i2ds/hadoop/hs.jar \
http://central.maven.org/maven2/org/apache/hadoop/hadoop-streaming/2.6.0/hadoop-streaming-2.6.0.jar


--2015-03-16 18:17:08--  http://central.maven.org/maven2/org/apache/hadoop/hadoop-streaming/2.6.0/hadoop-streaming-2.6.0.jar
Resolving central.maven.org (central.maven.org)... 23.235.44.209
Connecting to central.maven.org (central.maven.org)|23.235.44.209|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 104979 (103K) [application/java-archive]
Saving to: ‘/notebooks/rppds/hadoop/hs.jar’

100%[======================================>] 104,979      425KB/s   in 0.2s   

2015-03-16 18:17:09 (425 KB/s) - ‘/notebooks/rppds/hadoop/hs.jar’ saved [104979/104979]


Word Count

Our simple map/reduce programs require text data to operate. While there are a number of possible options, for this example we can grab a free book from Project Gutenberg:

wget --directory-prefix=/notebooks/i2ds/hadoop/ --output-document=book.txt \
    http://www.gutenberg.org/cache/epub/4300/pg4300.txt`

In this case, we have grabbed the full text of the novel Ulysses, by James Joyce.



In [5]:
# Grab a book to process
!wget --output-document=/notebooks/i2ds/hadoop/book.txt \
http://www.gutenberg.org/cache/epub/4300/pg4300.txt


--2015-03-16 18:17:10--  http://www.gutenberg.org/cache/epub/4300/pg4300.txt
Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47
Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1573151 (1.5M) [text/plain]
Saving to: ‘/notebooks/rppds/hadoop/book.txt’

100%[======================================>] 1,573,151   1.24MB/s   in 1.2s   

2015-03-16 18:17:12 (1.24 MB/s) - ‘/notebooks/rppds/hadoop/book.txt’ saved [1573151/1573151]


Testing Python Map-Reduce

Before we begin using Hadoop, we should first test our Python codes out to ensure they work as expected. First, we should change the permissions of the two programs to be executable, which we can do with the Unix chmod command.



In [6]:
!chmod u+x /notebooks/i2ds/hadoop/mapper.py

In [7]:
!chmod u+x /notebooks/i2ds/hadoop/reducer.py

Testing Mapper.py

To test out the map Python code, we can run the Python mapper.py code and specify that the code should redirect STDIN to read the book text data. This is done in the following code cell, we pipe the output into the Unix head command in order to restrict the output, which would be one line per word found in the book text file. In the second code cell, we next pipe the output of mapper.py into the Unix sort command, which is done automatically by Hadoop. To see the result of this operation, we next pipe the result into the Unix uniq command to count duplicates, pipe this result into a new sort routine to sort the output by the number of occurrences of a word, and finally display the last few lines with the Unix tail command to verify the program is operating correctly.



In [8]:
!/notebooks/i2ds/hadoop/mapper.py <  book.txt | wc -l


113156

In [9]:
!/notebooks/i2ds/hadoop/mapper.py <  book.txt | sort -n -k 1 | \
 uniq -c -d | sort -n -k 1 | tail -10


    799 which	1
   1031 The	1
   1106 was	1
   1151 by	1
   1848 a	1
   2151 to	1
   2689 in	1
   3035 and	1
   7289 of	1
  10312 the	1

Testing Reducer.py

To test out the reduce Python code, we run the previous code cell, but rather than piping the result into the Unix tail command, we pipe the result of the sort command into the Python reducer.py code. This simulates the Hadoop model, where the map output is key sorted before being passed into the reduce process. First, we will simply count the number of lines displayed by the reduce process, which will indicate the number of unique word tokens in the book. Next, we will sort the output by the number of times each word token appears and display the last few lines to compare with the previous results.



In [10]:
!/notebooks/i2ds/hadoop/mapper.py <  book.txt | sort -n -k 1 | \
 /notebooks/i2ds/hadoop/reducer.py | wc -l


21296

In [11]:
!/notebooks/i2ds/hadoop/mapper.py <  book.txt | sort -n -k 1 | \
 /notebooks/i2ds/hadoop/reducer.py | sort -n -k 2 | tail -10


which	799
The	1031
was	1106
by	1151
a	1848
to	2151
in	2689
and	3035
of	7289
the	10312

Running Hadoop

To use Hadoop in this course, we will need to start up our Hadoop Docker container. You should have already completed the tasks outlined in the Preface IPython Notebook regarding pulling and retagging the SequenceIQ Ubuntu Hadoop Docker container.Once those steps are completed, we can start a new Hadoop Docker container by running the container. To use the files we created or downloaded earlier in this notebook, we need to share folders between the host OS and our Hadoop container, which we do with the -v flag. You will need to change the host path to match your personal laptop. We aso run the /etc/bootstrap.sh script, which will properly initialize the Hadoop environment, and the -bash flag indicates that we wish to be given access to the Hadoop container via a Bash shell.

docker run -it -v /Users/rb/i2ds:/i2ds hadoop /etc/bootstrap.sh -bash

When this command is run, a series of messages are displayed that indicate the status of different server daemons starting up, including an SSH server, a namenode, a datanode, secondary nameodes, yarn daemons, a resourcemanager, and a nodemanager, as shown in the following screenshot.

If these message are all displayed and you are left with a prompt that resembles the string root@185b748bbfa7:/# you will have successfully started the Hadoop Docker container. As a final verification, you can enter the following command:

echo $HADOOP_PREFIX

which should display /usr/local/hadoop.


Python Map/Reduce

At this point, we first need to change into the directory where we created our Python mapper and reducer programs, and where we downloaded the hadoop-streaming jar file and the sample book to analyze. In the Hadoop Docker container, enter cd i2ds/hadoop, which will change our current working directory to the appropriate location, which is indicated by a change in the shell prompt to /i2ds/hadoop#.

Before proceeding, we should test our Python codes, but now within the Hadoop Docker container, which will have a different python environment than our class container. We can easily do this by modifying our earlier test to now use the correct path in the Hadoop Docker container:

/i2ds/hadoop/mapper.py <  book.txt | sort -n -k 1 |  \
    /i2ds/hadoop/reducer.py | sort -n -k 2 | tail -10

Doing this, however, now gives an UnicodeDecodeError. The simplest solution is to explicitly state that the Python interpreter should use utf-8 for all IO operations, which we can do by setting the Python environment variable PYTHONIOENCODING to utf-8. We do this by entering the following command at the container prompt:

export PYTHONIOENCODING=utf-8

After setting this environment variable, the previous Unix command string will now produce the correct output.

HDFS

At this point, we need to move our data to process into the Hadoop Distributed File system, or HDFS. HDFS is a a file system that is designed to work effectively with the Hadoop environment. In a typical Hadoop cluster, files would be broken up and distributed to different Hadoop nodes. The processing is moved to the data in this model, which can produce high throughput, especially for map/reduce programming tasks. However, this means you can not simply move around the HDFS file system in the same manner as a traditional Unix file system, since the components of a particular file are not all col-located. Instead, we must use the HDFS file system interface, which is invoked by using $HADOOP_PREFIX/bin/hdfs. Running this command by itself in your Hadoop Docker container will list the available commands, as shown in the following screenshot.

The standard command we will use is dfs which runs a filesystem command on the HDFS file system that is supported by Hadoop. The list of supported dfs commands is extensive, and mirrors many of the traditional Unix file systems commands. The full listing can be obtained by entering $HADOOP_PREFIX/bin/hdfs dfs at the prompt in our Hadoop Docker container. Some of the more useful commands for this class include:

  • cat: copies the source path to STDOUT.

  • count -h: counts the number of directories, files and byts under the path specified. With the -h flag, the output is displayed in a human-readable format.

  • expunge: empties the trash. By default, files and directories are not removed from HDFS with the rm command, they are simply moved to the trash. This can be useful when HDFS supplies a Name node is in safe mode. message.

  • ls: lists the contents of the indicated directory in HDFS.

  • -mkdir -p: creates a new directory in HDFS at the specified location. With the -p flag any parent directory specified in the full path will also be created as necessary.

  • put: copies indicated file(s) from local host file system into the specified path in HDFS.

  • rm -f -r: delete the indicated file or directory. With the -r -f flags, the command will not display any message and any will delete any files or directories under the indicated directory. The -skipTrash flag should be used to delete the indicated resource immediately.

  • tail: display the last kilobyte of the indicated file to STDOUT.


At this point, we first need to create an directory to hold the input and output of our Hadoop task. We will create a new directory called wc with a subdirectory called in to hold the input data for our Hadoop task. Second, we will need to copy the book text file into this new HDFS directory. This means we will need to run the following two commands at the prompt in our Hadoop Docker container:

  1. $HADOOP_PREFIX/bin/hdfs dfs -mkdir -p wc/in
  2. $HADOOP_PREFIX/bin/hdfs dfs -put book.txt wc/in/book.txt

The following screenshot displays the result of running these two commands, as well as the dfs -ls command to display the contents of our new HDFS directory, and the dfs -count command to show the size of the directory contents.


Python Hadoop Streaming

We are now ready tio actually run our Python codes via Hadoop Streaming. The main command to perform this task is $HADOOP_PREFIX/bin/hadoop jar hs.jar, where hs.jar is the hadoop-streaming jar file we downloaded earlier in this Notebook. Running this command will display a usage message that is not extremely useful, supplying the -help flag will provide more a more useful summary. For our map/reduce Python example to run successfully, we will need to specify six flags:

  1. -files: a comma separated list of files to be copied to the Hadoop cluster.
  2. -input: the HDFS input file(s) to be used for the map task.
  3. -output: the HDFS output directory, used for the reduce task.
  4. -mapper: the command to run for the map task.
  5. -reducer: the command to run for the reduce task.
  6. -cmdenv: set environment variables for a Hadoop streaming task.

Given our previous setup, we will run the full command as follows:

$HADOOP_PREFIX/bin/hadoop jar hs.jar -files mapper.py,reducer.py -input wc/in \
    -output wc/out -mapper mapper.py -reducer reducer.py -cmdenv PYTHONIOENCODING=utf-8

When this command is run, a series of messages will be displayed to the screen (STDOUT) showing the progress of our Hadoop Streaming task. At the end of the stream of information messages will be a statement indicating the location of the output directory as shown below:

In order to view the results of our Hadoop Streaming task, we must use HDFS DFS commands to examine the directory and files generated by our Python Map/Reduce programs. The following list of DFS commands might prove useful to view the results of this map/reduce job.

$HADOOP_PREFIX/bin/hdfs dfs -ls wc

$HADOOP_PREFIX/bin/hdfs dfs -ls wc/out

$HADOOP_PREFIX/bin/hdfs dfs -count -h wc/out/part-00000

$HADOOP_PREFIX/bin/hdfs dfs -tail wc/out/part-00000

To compare this map/reduce Hadoop Streaming task output to our previous output, we can use the $HADOOP_PREFIX/bin/hdfs dfs -cat wc/out/part-00000 | sort -n -k 2 | tail -10, which should be executed at a Hadoop Docker container shell prompt. This code listing provides the succesful output of this command, following a succesful map/reduce processing task.

/i2ds/hadoop# $HADOOP_PREFIX/bin/hdfs dfs -cat wc/out/part-00000 | \
    sort -n -k 2 | tail -10

with    2391
I   2432
he  2712
his 3035
in  4606
to  4787
a   5842
and 6542
of  8127
the 13600

Hadoop Cleanup

Following the succesful run of our map/reduce Python programs, we have created a new directory wc/out, which contains two files. If we wish to rerun this Hadoop Streaming map/reduce task, we must either specify a different output directory, or else we must clean up the results of the previous run. To remove the output directory, we can simply use the DFS -rm -r -f -skipTrash wc/out command, which will immediately delete the wc/out directory. The successful completion of this command is indicated by Hadoop, and this can also be verified by listing the contents of the wc directory as shown in the following screenshot.


Additional References

  1. Hadoop Documentation