Hadoop Framework for Data Processing

Outline

  • Data processing in Hadoop
    • Mapper, reducer, and partitioners
  • Hadoop I/O
    • InputFormat and OutputFormat
    • Readin sequence files
    • Compressing the output of mappers and reducers
  • Chaining multiple map and reduce tasks

Hadoop Processing

Hadoop splits the input data among the mappers. Each mapper will generate (key,value) pairs, and the intermediate output of mappers is partitioned for the reducers and the partitions are written into disk (local disk to each mapper, not HDFS).

Partitioner

When there are multiple reducers, the output of mappers need to be partitioned. There is a default partitioner that creates partitions by hashing the mappers' output keys:

job.setPartitionerClass(HashPartitioner.class);

public class HashPartitioner<K,V> extends Partitioner<K,V> {
    public int getPartition(K ket, V value, int numReduceTasks) {
        return (key.hashcode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

Example wordcount.java

The general scheme of a java program in hadoop is implemented in a main class, which the main class has a mapper class, a reducer class and a main function, as follows

public class wordcount {
    public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {...}

    public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {...}

    public static void main(string[] args) throws Exception {
       ...
       ...
    }
}

InputFormat (only for mappers)

InputFormat is an abstract class to specify the way input records are defined in input data. By default, the input format is TextInputFormat where each record is a line and the input key is byte offset, and valye is contetn of the line.

public abstract class InputFormat<K,V> {
    public abstract List<inputSplit> getSplit(JobContext context) throws Exception;

    public abstract RecordReader<K,V> {
        createRecordReader(inputSplit split,
                           TaskAttempt(Context context)) 
                           throws Exception;
    }
}
InputFormat Description
TextInputFormat Each line is a record;
key:LongWritable
value:Text
KeyValueTextInputFormat
SequenceFileInputFormat
NLineInputFormat

Specifying InputFormat:

public static void main(String[] args) throws Exception {
    Job job = new Job();
    ...
    job,setInputFormatClass(TextInputFormat.class);
}

OutputFormat (only for reducers)

OutputFormat is an abstract class to determine the format of purput from reducers. To set the OutputFormat class use setOutputFormatClass(). The possible options are

OutputFormat Description
TextOutputFormat< K , V >
SequenceFileOutputFormat< K, V >
NullOutputFormat< K, V>
Example
public static void main(String[] args) throws Exception {
    Job job = new Job();
    ...
    job,setOutputFormatClass(TextOutputFormat.class);
}

Compressing the outputs (for mappers and reducers)

If the input data is compressed, hadoop can automatically handle compressed inpit file with no modifications needed. An example is to run hadoop with the compressed input file, while the same Java program could be used for both compressed or uncompressed input:

# Running with uncompressed input file
hadoop jar wordcount.jar WordCount /user/hduser/wordcount/bigdata.txt /user/hduser/wordcount/output

# Running with compressed input file
hadoop jar wordcount.jar WordCount /user/hduser/wordcount/bigdata.txt.gz /user/hduser/wordcount/output

However, sometimes it is desirable to compress ..

Various compression formats are available, such as *.gz, *.bz2, etc). Hadoop uses some implementation of compression called CompressionCodec as an interface to compress the output. Some codecs are given below

Format Splittable? HadoopCompressionCodec
DEFLATE No org.apache.hadoop.io.compress.DefaultCodec
gzip No org.apache.hadoop.io.compress.GzipCodec
bzip2 Yes org.apache.hadoop.io.compress.BZip2Codec
LZO No org.apache.hadoop.io.compress.LzoCodec
Snappy No org.apache.hadoop.io.compress.SnappyCodec
Example: reducer output compression
public static void main(String[] args) throws Exception {
    Job job = new Job();
    ...
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(BZip2Codec.class);
}

which makes the following output file: part-r-00000.bz2.

Mapper output compression

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean("mapred.compress.map.output", true);
    conf.setClass("mapred.map.output.compression.class", BZip2Codec.class, CompressionCodec.class);
    ...
    Job job = new Job();
    ...
}

Chaining

Sometimes it is necessary to chan multiple mappers and reducers to accomplish complex problems. Chaining involves sequentially calling mappers and reducers one after another. The input to next job, is the output of current one. This is accomplished by ChainMapper and ChainReducer.

Example: ChainMapper and ChainReducer
Configuration conf = getConf();
JobConf job = new JobConf(conf);

job.setJobName("A-chain-job");
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setPutputPath(job, out);

// Mapper A (preprocessing)
JobConf mapAconf = new JobConf(false);
ChainMapper.addMapper(job, MapA.class, 
                        LongWritable.class, Text.class, 
                        Text.class, Text.class,
                        true, mapAconf);

// Mapper B (preprocessing)
JobConf mapBconf = new JobConf(false);
ChainMapper.addMapper(job, MapB.class, 
                      Tex.class, Text.class,
                      LongWritable.class, Text.class, 
                      true, mapBconf);

// Reducer
JobConf reduceConf = new JobConf(false);
ChainReducer.addReducer(job, myReducer.class, 
                        LongWritable.class, Tex.class,
                        Text.class, Text.class,
                        true, reducer1conf);

// Mapper C (post-processing)
JobConf mapCconf new JobConf(false);
ChainReducer.addMapper(job, MapC.class,
                       Text.class, Text.class,
                       LongWritable.class, Text.class,
                       true, mapCconf);

// Mapper D (post-processing)
JobConf mapDconf new JobConf(false);
ChainReducer.addMapper(job, MapD.class,
                       LongWritable.class, Text.class,
                       LongWritable.class, Text.class,
                       true, mapDconf);