Performance Enhancement of MR Jobs

  • Using custom combiner
    • Combiner reduces the shuffle across the network
    • Combiner are used on the same node as mapper
    • Output of the combiner must match with the reducer input
    • You can write a custom combiner class and specify the combiner in the driver class using setCombinerClass method
    • If the reduction function is associative and commutative, you should be able to use reducer class as combiner
    • Combiner not always improves performance. For example, of the computation is more CPU bound rather than IO bound
    • As a best practice, use combiner if the number of records is significantly higher than the number of keys
    • Combiner functions are invoked after the mapping, partition and sort are complete
  • Compressing mapper output
    • Compressing the out of the mapper save disk IO and network traffic at the expense of CPU cycles required for necessary compression and decompression
    • It is generally to use compression when working with non binary data
  • Modifying the number of reducers
    • Specify the number of reducers in mapreduce.job.reduces property. Default value is 1 it not specified at the cluster.
    • By increase the number of of reducers, increase the level of parallelism of reducer tasks
    • If the number of reducers tasks required is more than available containers, reduce operation will happen in multiple wave of reduce operations
  • Using speculative execution
    • By enabling speculative execution YARN preempt slowly running containers. YARN starts the same tasks at two containers and picks the result from the one that finishes first.
    • The map and reduce tasks must be idempotent (... meaning the free from side effect). For example, if mapper or reducer write data to an external database, it may have side effect.
    • Speculative execution runs every task twice. Performance gain of a job due to speculative should outweighs the additional resource utilization.
    • You can turn on speculative execution by using the properties mapreduce.map.speculative and mapreduce.reduce.speculative
  • Reusing JVM
    • By default each map reduce task launches separate JVM for execution. Each JVM launch adds about a 4-8 seconds for startup and tear down. This overhead can be significant for a short duration tasks.
    • Recommended to use reusing JVM for large number of short duration tasks. For long running tasks reusing JVM does not add significant benefits; moreover, long running tasks may suffer heap fragmentation that affects performance.
    • This is applicable in MR1. In MR2, each task launches its on own JVM.
  • Configuring Sort properties
    • Mapper output is stored in a circular memory buffer in the java heap (controlled by mapreduce.task.io.sort.mb, 100 MB default). Once threshold (controlled by mapreduce.map.sort.spill.percent) is reached, the data is spilled to the disk. Spill can more than once. In case of multiple spill, a final re-merging of spills into a single file occurs. Examine the ratio of spilled records counters to the map record counters to detect whether spill is occurring.
  • Configuring Java properties
    • When your tasks fail due to out of memory exceptions, consider increasing heap size allocated for the tasks.
    • You can control the heap size using mapred.child.java.opts property
    • mapreduce.task.io.sort.mb, mapreduce.map.memory.mb, mapreduce.reduce.memory.mb must be less than JVM heap size allocated for the containers.
  • Improve Code
    • Identify non performing parts of the map reduce code and refactor them.
  • Rethink Algorithms
    • Better algorithm can give surprising dividend

Zen

  • You cannot tune what you do not understand
  • You cannot improve what you cannot measure
    • Use Ganglia, Cacti, Cloudera Manager to gather metrics
    • 4 key vital metrics around utilization of CPU, Memory, Disk and Memory

Approach Toward Fine Tuning

  • Run Job > Identify Bottlenecks > Address Bottlenecks

Identify Bottlenecks

  • Graphs
  • Job Counters
  • Job Logs
  • Profile Results