Performance Enhancement of MR Jobs
Using custom combiner
Combiner reduces the shuffle across the network
Combiner are used on the same node as mapper
Output of the combiner must match with the reducer input
You can write a custom combiner class and specify the combiner in the driver class using setCombinerClass method
If the reduction function is associative and commutative, you should be able to use reducer class as combiner
Combiner not always improves performance. For example, of the computation is more CPU bound rather than IO bound
As a best practice, use combiner if the number of records is significantly higher than the number of keys
Combiner functions are invoked after the mapping, partition and sort are complete
Compressing mapper output
Compressing the out of the mapper save disk IO and network traffic at the expense of CPU cycles required for necessary compression and decompression
It is generally to use compression when working with non binary data
Modifying the number of reducers
Specify the number of reducers in mapreduce.job.reduces property. Default value is 1 it not specified at the cluster.
By increase the number of of reducers, increase the level of parallelism of reducer tasks
If the number of reducers tasks required is more than available containers, reduce operation will happen in multiple wave of reduce operations
Using speculative execution
By enabling speculative execution YARN preempt slowly running containers. YARN starts the same tasks at two containers and picks the result from the one that finishes first.
The map and reduce tasks must be idempotent (... meaning the free from side effect). For example, if mapper or reducer write data to an external database, it may have side effect.
Speculative execution runs every task twice. Performance gain of a job due to speculative should outweighs the additional resource utilization.
You can turn on speculative execution by using the properties mapreduce.map.speculative and mapreduce.reduce.speculative
Reusing JVM
By default each map reduce task launches separate JVM for execution. Each JVM launch adds about a 4-8 seconds for startup and tear down. This overhead can be significant for a short duration tasks.
Recommended to use reusing JVM for large number of short duration tasks. For long running tasks reusing JVM does not add significant benefits; moreover, long running tasks may suffer heap fragmentation that affects performance.
This is applicable in MR1. In MR2, each task launches its on own JVM.
Configuring Sort properties
Mapper output is stored in a circular memory buffer in the java heap (controlled by mapreduce.task.io.sort.mb, 100 MB default). Once threshold (controlled by mapreduce.map.sort.spill.percent) is reached, the data is spilled to the disk. Spill can more than once. In case of multiple spill, a final re-merging of spills into a single file occurs. Examine the ratio of spilled records counters to the map record counters to detect whether spill is occurring.
Configuring Java properties
When your tasks fail due to out of memory exceptions, consider increasing heap size allocated for the tasks.
You can control the heap size using mapred.child.java.opts property
mapreduce.task.io.sort.mb, mapreduce.map.memory.mb, mapreduce.reduce.memory.mb must be less than JVM heap size allocated for the containers.
Improve Code
Identify non performing parts of the map reduce code and refactor them.
Rethink Algorithms
Better algorithm can give surprising dividend
Zen
You cannot tune what you do not understand
You cannot improve what you cannot measure
Use Ganglia, Cacti, Cloudera Manager to gather metrics
4 key vital metrics around utilization of CPU, Memory, Disk and Memory
Approach Toward Fine Tuning
Run Job > Identify Bottlenecks > Address Bottlenecks
Identify Bottlenecks
Graphs
Job Counters
Job Logs
Profile Results