Running Jobs on Hadoop

The aim of this exercise is to become familiar with Hadoop. We will show how to run Hadoop application and create our own Python-based streaming application for parsing log file data.

Configure Hadoop Environment and Test Applications

For the following exercise we will use two examples provided as part of the standard Hadoop distribution. We use the Hortonworks HDP 2.3.2 deployed on Amazon Web Services (EC2). First we need to set these two variables to the jar files containing the application.


In [37]:
HADOOP_EXAMPLES="/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-mapreduce-examples.jar"
HADOOP_STREAMING="/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming.jar"

We will use the standard Hadoop command line utilities to enquire the status of the cluster (in particular HDFS and Yarn), and to submit applications. Shell commands can be executed in iPython notebook cells using the prefix !. Use shift-enter or the play button in the menu to execute cells.

1. Hadoop Services (HDFS and YARN)

The -report argument provides information about the file-system, disk space, number of nodes etc.


In [32]:
!hadoop dfsadmin -report


DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Configured Capacity: 638810435584 (594.94 GB)
Present Capacity: 572727713792 (533.39 GB)
DFS Remaining: 504732938240 (470.07 GB)
DFS Used: 67994775552 (63.33 GB)
DFS Used%: 11.87%
Under replicated blocks: 4
Blocks with corrupt replicas: 0
Missing blocks: 0
Missing blocks (with replication factor 1): 0

-------------------------------------------------
report: Access denied for user radical. Superuser privilege is required

In [33]:
!yarn node -list all


15/11/15 15:38:14 INFO impl.TimelineClientImpl: Timeline service address: http://ip-10-63-179-69.ec2.internal:8188/ws/v1/timeline/
15/11/15 15:38:14 INFO client.RMProxy: Connecting to ResourceManager at ip-10-63-179-69.ec2.internal/10.63.179.69:8050
Total Nodes:6
         Node-Id	     Node-State	Node-Http-Address	Number-of-Running-Containers
ip-10-63-179-69.ec2.internal:45454	        RUNNING	ip-10-63-179-69.ec2.internal:8042	                           0
ip-10-47-179-69.ec2.internal:45454	        RUNNING	ip-10-47-179-69.ec2.internal:8042	                           0
ip-10-145-0-4.ec2.internal:45454	        RUNNING	ip-10-145-0-4.ec2.internal:8042	                           0
ip-10-99-194-113.ec2.internal:45454	        RUNNING	ip-10-99-194-113.ec2.internal:8042	                           0
ip-10-218-164-206.ec2.internal:45454	        RUNNING	ip-10-218-164-206.ec2.internal:8042	                           0
ip-10-179-174-236.ec2.internal:45454	        RUNNING	ip-10-179-174-236.ec2.internal:8042	                           0

2. Terasort

The Terasort application consists of two parts: (i) data generation (teragen), and (ii) the sorting of the data (terasort).

Note that if you execute teragen multiple times, please make sure to delete the target directory (teragen); otherwise an error is thrown as shown.


In [34]:
!hdfs dfs -rm -r teragen teraout


15/11/15 15:38:45 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 360 minutes, Emptier interval = 0 minutes.
Moved: 'hdfs://ip-10-63-179-69.ec2.internal:8020/user/radical/teragen' to trash at: hdfs://ip-10-63-179-69.ec2.internal:8020/user/radical/.Trash/Current
15/11/15 15:38:45 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 360 minutes, Emptier interval = 0 minutes.
Moved: 'hdfs://ip-10-63-179-69.ec2.internal:8020/user/radical/teraout' to trash at: hdfs://ip-10-63-179-69.ec2.internal:8020/user/radical/.Trash/Current

In the next command we will create a dataset of 100,000 records, each of 100 bytes (for a total of 10Mb).


In [35]:
!yarn jar $HADOOP_EXAMPLES teragen 100000 teragen


15/11/15 15:39:29 INFO impl.TimelineClientImpl: Timeline service address: http://ip-10-63-179-69.ec2.internal:8188/ws/v1/timeline/
15/11/15 15:39:29 INFO client.RMProxy: Connecting to ResourceManager at ip-10-63-179-69.ec2.internal/10.63.179.69:8050
15/11/15 15:39:30 INFO terasort.TeraSort: Generating 100000 using 2
15/11/15 15:39:30 INFO mapreduce.JobSubmitter: number of splits:2
15/11/15 15:39:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1447000128355_0778
15/11/15 15:39:31 INFO impl.YarnClientImpl: Submitted application application_1447000128355_0778
15/11/15 15:39:31 INFO mapreduce.Job: The url to track the job: http://ip-10-63-179-69.ec2.internal:8088/proxy/application_1447000128355_0778/
15/11/15 15:39:31 INFO mapreduce.Job: Running job: job_1447000128355_0778
15/11/15 15:39:37 INFO mapreduce.Job: Job job_1447000128355_0778 running in uber mode : false
15/11/15 15:39:37 INFO mapreduce.Job:  map 0% reduce 0%
15/11/15 15:39:43 INFO mapreduce.Job:  map 100% reduce 0%
15/11/15 15:39:43 INFO mapreduce.Job: Job job_1447000128355_0778 completed successfully
15/11/15 15:39:43 INFO mapreduce.Job: Counters: 31
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=252368
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=164
		HDFS: Number of bytes written=10000000
		HDFS: Number of read operations=8
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=4
	Job Counters 
		Launched map tasks=2
		Other local map tasks=2
		Total time spent by all maps in occupied slots (ms)=6485
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=6485
		Total vcore-seconds taken by all map tasks=6485
		Total megabyte-seconds taken by all map tasks=16601600
	Map-Reduce Framework
		Map input records=100000
		Map output records=100000
		Input split bytes=164
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=50
		CPU time spent (ms)=2540
		Physical memory (bytes) snapshot=374366208
		Virtual memory (bytes) snapshot=5856428032
		Total committed heap usage (bytes)=1516765184
	org.apache.hadoop.examples.terasort.TeraGen$Counters
		CHECKSUM=214574985129000
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=10000000

The following command will use Hadoop commands (yarn) to perform sorting.


In [40]:
!yarn jar $HADOOP_EXAMPLES terasort teragen teraout




In [39]:
!hadoop fs -text teraout/part-r-00000 | head




3. Word Count

Here we will discuss the hello world of Big Data: word count! We will count the words contained in the log file located at /data/nasa/NASA_access_log_Jul95


In [41]:
!hdfs dfs -rm -r wordcount-out


15/11/15 15:46:38 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 360 minutes, Emptier interval = 0 minutes.
Moved: 'hdfs://ip-10-63-179-69.ec2.internal:8020/user/radical/wordcount-out' to trash at: hdfs://ip-10-63-179-69.ec2.internal:8020/user/radical/.Trash/Current

In [42]:
!hdfs dfs -text /data/nasa/NASA_access_log_Jul95 | head


199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0
205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985
d104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] "GET / HTTP/1.0" 200 7074

In [43]:
!yarn jar $HADOOP_EXAMPLES wordcount /data/nasa/ wordcount-out/


15/11/15 15:47:07 INFO impl.TimelineClientImpl: Timeline service address: http://ip-10-63-179-69.ec2.internal:8188/ws/v1/timeline/
15/11/15 15:47:07 INFO client.RMProxy: Connecting to ResourceManager at ip-10-63-179-69.ec2.internal/10.63.179.69:8050
15/11/15 15:47:08 INFO input.FileInputFormat: Total input paths to process : 1
15/11/15 15:47:08 INFO mapreduce.JobSubmitter: number of splits:2
15/11/15 15:47:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1447000128355_0807
15/11/15 15:47:09 INFO impl.YarnClientImpl: Submitted application application_1447000128355_0807
15/11/15 15:47:09 INFO mapreduce.Job: The url to track the job: http://ip-10-63-179-69.ec2.internal:8088/proxy/application_1447000128355_0807/
15/11/15 15:47:09 INFO mapreduce.Job: Running job: job_1447000128355_0807
15/11/15 15:47:15 INFO mapreduce.Job: Job job_1447000128355_0807 running in uber mode : false
15/11/15 15:47:15 INFO mapreduce.Job:  map 0% reduce 0%
15/11/15 15:47:27 INFO mapreduce.Job:  map 37% reduce 0%
15/11/15 15:47:31 INFO mapreduce.Job:  map 51% reduce 0%
15/11/15 15:47:34 INFO mapreduce.Job:  map 56% reduce 0%
15/11/15 15:47:37 INFO mapreduce.Job:  map 62% reduce 0%
15/11/15 15:47:40 INFO mapreduce.Job:  map 67% reduce 0%
15/11/15 15:47:47 INFO mapreduce.Job:  map 83% reduce 0%
15/11/15 15:47:58 INFO mapreduce.Job:  map 83% reduce 17%
15/11/15 15:48:02 INFO mapreduce.Job:  map 100% reduce 17%
15/11/15 15:48:04 INFO mapreduce.Job:  map 100% reduce 73%
15/11/15 15:48:05 INFO mapreduce.Job:  map 100% reduce 100%
15/11/15 15:48:07 INFO mapreduce.Job: Job job_1447000128355_0807 completed successfully
15/11/15 15:48:07 INFO mapreduce.Job: Counters: 50
	File System Counters
		FILE: Number of bytes read=34517909
		FILE: Number of bytes written=69415428
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=205373714
		HDFS: Number of bytes written=29131159
		HDFS: Number of read operations=9
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=2
		Launched reduce tasks=1
		Data-local map tasks=1
		Rack-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=74953
		Total time spent by all reduces in occupied slots (ms)=31964
		Total time spent by all map tasks (ms)=74953
		Total time spent by all reduce tasks (ms)=15982
		Total vcore-seconds taken by all map tasks=74953
		Total vcore-seconds taken by all reduce tasks=15982
		Total megabyte-seconds taken by all map tasks=191879680
		Total megabyte-seconds taken by all reduce tasks=81827840
	Map-Reduce Framework
		Map input records=1891715
		Map output records=18915314
		Map output bytes=280900888
		Map output materialized bytes=34517915
		Input split bytes=274
		Combine input records=18915314
		Combine output records=1237021
		Reduce input groups=1216956
		Reduce shuffle bytes=34517915
		Reduce input records=1237021
		Reduce output records=1216956
		Spilled Records=2474042
		Shuffled Maps =2
		Failed Shuffles=0
		Merged Map outputs=2
		GC time elapsed (ms)=2380
		CPU time spent (ms)=56880
		Physical memory (bytes) snapshot=4037234688
		Virtual memory (bytes) snapshot=11021074432
		Total committed heap usage (bytes)=4972347392
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=205373440
	File Output Format Counters 
		Bytes Written=29131159

4. Log Parsing

Use the commands head, cat, uniq, wc, sort, find, xargs, awk to evaluate the NASA log file:

Which page was called the most? What was the most frequent return code? How many errors occurred? What is the percentage of errors? Implement a Python version of this Unix Shell script using the script as a template (the answer can be found in mapreduce_streaming.py)

We will now run the Python script inside a Hadoop Streaming job.


In [28]:
!cat mapreduce_streaming.py


#!/usr/bin/python
#
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  Cloudera, Inc. licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Template for python Hadoop streaming.  Fill in the map() and reduce()
# functions, which should call emit(), as appropriate.
#
# Test your script with
#  cat input | python map_reduce.py map | sort | python wordcount.py reduce

import sys
import re
try:
    import simplejson as json
except ImportError:
    import json

import __builtin__

def map(line):
    try:
        words = line.split()
        http_response_code = words[-2]
        emit(http_response_code, str(1))
    except:
        pass
    
def reduce(key, values):
    emit(key, str(sum(__builtin__.map(int,values))))

# Common library code follows:

def emit(key, value):
    """
    Emits a key->value pair.  Key and value should be strings.
    """
    try:
        print "\t".join( (key, value) )
    except:
        pass

def run_map():
    """Calls map() for each input value."""
    for line in sys.stdin:
        line = line.rstrip()
        map(line)

def run_reduce():
    """Gathers reduce() data in memory, and calls reduce()."""
    prev_key = None
    values = []
    for line in sys.stdin:
        line = line.rstrip()
        key, value = re.split("\t", line, 1)
        if prev_key == key:
            values.append(value)
        else:
            if prev_key is not None:
                reduce(prev_key, values)
            prev_key = key
            values = [ value ]

    if prev_key is not None:
        reduce(prev_key, values)

def main():
    """Runs map or reduce code, per arguments."""
    if len(sys.argv) != 2 or sys.argv[1] not in ("map", "reduce"):
        print "Usage: %s <map|reduce>" % sys.argv[0]
        sys.exit(1)
    if sys.argv[1] == "map":
        run_map()
    elif sys.argv[1] == "reduce":
        run_reduce()
    else:
        assert False

if __name__ == "__main__":
  main()

In the next example, we'll execute mapreduce_streaming.py with YARN as a hadoop streaming application.


In [29]:
!yarn jar $HADOOP_STREAMING -input /data/nasa -output logs-parsed \
                            -file mapreduce_streaming.py \
                            -mapper "python mapreduce_streaming.py map" \
                            -reducer "python mapreduce_streaming.py reduce"




In the next two commands, we'll parse the output of the operations above.


In [30]:
!hdfs dfs -ls logs-parsed


Found 2 items
-rw-r--r--   3 radical hdfs          0 2015-11-08 16:48 logs-parsed/_SUCCESS
-rw-r--r--   3 radical hdfs         70 2015-11-08 16:48 logs-parsed/part-00000

In [44]:
!hdfs dfs -text logs-parsed/*


200	1701534
302	46573
304	132627
400	5
403	54
404	10845
500	62
501	14

In [ ]: