Setup Eclipse for Hadoop MapReduce Development

Objective: Create a java project for Hadoop MapReduce application using maven.

Download Eclipse for Java Developer appropriate for your OS (Linux, WIndows, Mac) and CPU architecture (64 bit)

http://www.eclipse.org/downloads/packages/eclipse-ide-java-developers/mars2)

Create a new maven project modify pom.xml with the following details.

Add a maven repository. Below is an example cloudera:

<repositories>
<repository>
<id>cloudera-releases</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

You can choose one of the following repo based on the distribution you work.

Find hadoop version.

$ hadoop version Hadoop 2.6.0-cdh5.8.0 Subversion http://github.com/cloudera/hadoop -r 57e7b8556919574d517e874abfb7ebe31a366c2b Compiled by jenkins on 2016-06-16T19:38Z Compiled with protoc 2.5.0 From source with checksum 9e99ecd28376acfd5f78c325dd939fed This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.8.0.jar

In pom.xml, add a dependency for hadoop client library. Match the version of client library with the hadoop platform that you want to deployment against.

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.8.0</version>
</dependency>
</dependencies> 

Create a resource folder, you can name it "resources" under project root

Mark the project folder as a Source folder:

> right click on Project Folder

> Click on Build Path

> Click on Use As Source Folder (Once added this option is replaced by Remove from Build Path)

Add a few file inside it, name it log4j.properties. This file control logging output from job run. Note, the name of the file should be exactly as mentioned. It is case sensitive.

file: <project root>resources/log4j.properties

log4j.rootLogger=INFO,console,file
log4j.appender.console=org.apache.log4j.ConsoleAppender

log4j.appender.console.Threshold=WARN

log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=hadoop.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=10
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.logger.org.apache.hadoop.util.Shell=INFO,console

Now create a new Java class called WordCount inside the source.

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCount extends Configured implements Tool {
 public static void main(String args[]) throws Exception {
  int res = ToolRunner.run(new WordCount(), args);
  System.exit(res);
 }
 public int run(String[] args) throws Exception {
  if (args.length != 2) {
   System.err.println("Usage: WordCount <input path> <output path>");
   System.exit(0);
  }
  Path inputPath = new Path(args[0]);
  Path outputPath = new Path(args[1]);
  Configuration conf = getConf();
  FileSystem hdfs = FileSystem.get(conf);
  // delete existing directory
  if (hdfs.exists(outputPath)) {
   hdfs.delete(outputPath, true);
  }
  Job job = Job.getInstance(conf, "word count");
  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);
  job.setJobName("WordCount");
  job.setJarByClass(WordCount.class);
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(IntWritable.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  job.setMapperClass(Map.class);
  job.setCombinerClass(Reduce.class);
  job.setReducerClass(Reduce.class);
  return job.waitForCompletion(true) ? 0 : 1;
 }
 public static class Map extends
 Mapper < LongWritable, Text, Text, IntWritable > {
  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();
  @Override
  public void map(LongWritable key, Text value, Context context)
  throws IOException,
  InterruptedException {
   String line = value.toString().toLowerCase();
   String[] tokens = line.split("\\W+");
   for (String token: tokens) {
    word.set(token);
    context.write(word, one);
   }
  }
 }
 public static class Reduce extends
 Reducer < Text, IntWritable, Text, IntWritable > {
  @Override
  public void reduce(Text key, Iterable < IntWritable > values,
   Context context) throws IOException,
  InterruptedException {
   int sum = 0;
   for (IntWritable value: values) {
    sum += value.get();
   }
   context.write(key, new IntWritable(sum));
  }
 }
}

Build the project, right click on the project > Run As > Maven Build > select target and "package". It will show output jar at the bottom of the console output. Move the jar to hadoop edge node or the machine on which you want to test and run the following command to run the jar against Hadoop.

$ hadoop jar <.jar file> WordCount <input HDFS path> <output HDFS path>

Advanced Configuration for Job Run

Specify the number of reducers

$ hadoop jar <.jar file> WordCount -Dmapreduce.job.reduces=2 <input path> <output path>

Verify the number of output files created by the job.

Using Partitioner

Hadoop mapreduce uses hash partitioner by default to determine which key from mapper output is processed by which reducer. To observe the behaviour, take a look at the output part files of the mapreduce job using 2 reducer. You should see, both part files contain letter that probably span across a-z, although words within each part file is sorted, combined output of part files (using getmerge command) is not. You can override this behaviour by using a custom partitioner that will allow you to control A. how the key are distributed and B. achieve global sorting order.

public static class customPartitioner extends
Partitioner < Text, IntWritable > {
 public int getPartition(Text key, IntWritable value, int numReduceTasks) {
  int partition = 0;
  int firstChar = -1;
  if (key.toString().length() > 0) {
   firstChar = (int) key.toString().charAt(0);
  }
  if (numReduceTasks > 0 && firstChar >= 110) { // 110 is ASCII value of n 
   partition = 1;
  }

  return partition;

 }
}

In the the following in the job configuration

job.setPartitionerClass(customPartitioner.class);

And add the following import statement to WordCount.java

import org.apache.hadoop.mapreduce.Partitioner;

Run wordcount example to produce compressed output.

$ hadoop jar <.jar file> WordCount \
"-Dmapreduce.output.fileoutputformat.compress=true" \
"-Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec" \
<input path> <output path>

To see the effect, compare the job count "HDFS: Number of bytes written" This counter indicates the byte size of the out of the MR job.

Compress intermediate output of the map function

$ hadoop jar <.jar file> WordCount \
"-Dmapreduce.output.fileoutputformat.compress=true" \
"-Dmapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec" \
"-Dmapreduce.map.output.compress=true" \
"-Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec" \
<input path> <output path>

To see the impact of compression of intermediate map out data, compare the counter - "Map output materialized bytes".

Alternative to Maven: Add External Jars to the Project

Add ALL jars from the following locations to the project as external jar

  • /usr/lib/hadoop
  • /usr/lib/hadoop/lib
  • /usr/lib/hadoop-yarn/
  • /usr/lib/hadoop-mapreduce