Menu

MapReduce Configure

module.exports = (service) ->
  options = service.options

Identities

  options.hadoop_group = merge {}, service.deps.hadoop_core.options.hadoop_group, options.hadoop_group
  options.group = merge {}, service.deps.hadoop_core.options.mapred.group, options.group
  options.user = merge {}, service.deps.hadoop_core.options.mapred.user, options.user

Kerberos

  # Kerberos HDFS Admin
  options.hdfs_krb5_user = service.deps.hadoop_core.options.hdfs.krb5_user
  # Kerberos Test Principal
  options.test_krb5_user ?= service.deps.test_user.options.krb5.user

Environment

  # Layout
  options.log_dir ?= '/var/log/hadoop-mapreduce' # Default to "/var/log/hadoop-mapreduce/$USER"
  options.pid_dir ?= '/var/run/hadoop-mapreduce'  # /etc/hadoop/conf/hadoop-env.sh#94
  options.conf_dir ?= service.deps.hadoop_core.options.conf_dir
  # Misc
  options.force_check ?= false
  options.hostname ?= service.node.hostname
  options.iptables ?= service.deps.iptables and service.deps.iptables.options.action is 'start'
  options.nn_url = service.deps.hdfs_client[0].options.nn_url

Configuration

  options.mapred_site ?= {}
  options.mapred_site['mapreduce.job.counters.max'] ?= 120
  options.mapred_site['mapreduce.reduce.shuffle.parallelcopies'] ?= '50' #  Higher number of parallel copies run by reduces to fetch outputs from very large number of maps.
  # http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.6.0/bk_installing_manually_book/content/rpm_chap3.html
  # Optional: Configure MapReduce to use Snappy Compression
  # Complement core-site.xml configuration
  # options.mapred_site['mapreduce.admin.map.child.java.opts'] ?= "-server -XX:NewRatio=8 -Djava.library.path=/usr/lib/hadoop/lib/native/ -Djava.net.preferIPv4Stack=true"
  options.mapred_site['mapreduce.admin.map.child.java.opts'] ?= "-server -Djava.net.preferIPv4Stack=true -Dhdp.version=${hdp.version}"
  options.mapred_site['mapreduce.admin.reduce.child.java.opts'] ?= null
  options.mapred_site['mapreduce.task.io.sort.factor'] ?= 100 # Default to "TODO..." inside HPD and 100 inside ambari and 10 inside mapred-default.xml
  # options.mapred_site['mapreduce.admin.reduce.child.java.opts'] ?= "-server -XX:NewRatio=8 -Djava.library.path=/usr/lib/hadoop/lib/native/ -Djava.net.preferIPv4Stack=true"
  options.mapred_site['mapreduce.admin.user.env'] ?= "LD_LIBRARY_PATH=/usr/hdp/${hdp.version}/hadoop/lib/native:/usr/hdp/${hdp.version}/hadoop/lib/native/Linux-amd64-64"
  # [Configurations for MapReduce JobHistory Server](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/ClusterSetup.html#Configuring_the_Hadoop_Daemons_in_Non-Secure_Mode)
  options.mapred_site['mapreduce.application.framework.path'] ?= "/hdp/apps/${hdp.version}/mapreduce/mapreduce.tar.gz#mr-framework"
  options.mapred_site['mapreduce.application.classpath'] ?= "$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/current/share/lzo/0.6.0/lib/hadoop-lzo-0.6.0.jar:/usr/hdp/current/hadoop-client/lib/*:/etc/hadoop/conf/secure"
  for property in [
    'yarn.app.mapreduce.am.staging-dir'
    'mapreduce.jobhistory.address'
    'mapreduce.jobhistory.webapp.address'
    'mapreduce.jobhistory.webapp.https.address'
    'mapreduce.jobhistory.done-dir'
    'mapreduce.jobhistory.intermediate-done-dir'
    'mapreduce.jobhistory.principal'
  ]
    options.mapred_site[property] ?= if service.deps.mapred_jhs then service.deps.mapred_jhs.options.mapred_site[property] else null
  # The value is set by the client app and the iptables are enforced on the worker nodes
  options.mapred_site['yarn.app.mapreduce.am.job.client.port-range'] ?= '59100-59200'
  options.mapred_site['mapreduce.framework.name'] ?= 'yarn' # Execution framework set to Hadoop YARN.
  # Deprecated properties
  options.mapred_site['mapreduce.cluster.local.dir'] = null # Now "yarn.nodemanager.local-dirs"
  options.mapred_site['mapreduce.jobtracker.system.dir'] = null # JobTracker no longer used
  # The replication level for submitted job files should be around the square root of the number of nodes.
  # see https://issues.apache.org/jira/browse/MAPREDUCE-2845
  options.mapred_site['mapreduce.client.submit.file.replication'] ?= Math.min (Math.round Math.sqrt service.deps.yarn_nm.length), 10

Configuration for Resource Allocation

There are three aspects to consider:

  • Physical RAM limit for each Map And Reduce task
  • The JVM heap size limit for each task
  • The amount of virtual memory each task will get

The total size of the memory given to the JVM available to each map/reduce container is defined by the properties "mapreduce.map.memory.mb" and "mapreduce.reduce.memory.mb" in megabytes (MB). This includes both heap memory (which many of us Java developers always are thinking about) and non-heap memory. Non-heap memory includes the stack and the PermGen space. It should be at least equal to or more than the YARN minimum Container allocation.

For this reason, the maximum size of the heap (Java -Xmx parameter) is set to an inferior value, commonly 80% of the maximum available memory. The heap size parameter is defined inside the "mapreduce.map.java.opts" and "mapreduce.reduce.java.opts" properties.

Resources:

  • Understanding YARN MapReduce Memory Allocation

    memory_per_container = 512 rm_memory_min_mb = service.deps.yarn_rm[0].options.yarn_site['yarn.scheduler.minimum-allocation-mb'] rm_memory_max_mb = service.deps.yarn_rm[0].options.yarn_site['yarn.scheduler.maximum-allocation-mb'] rm_cpu_min = service.deps.yarn_rm[0].options.yarn_site['yarn.scheduler.minimum-allocation-vcores'] rm_cpu_max = service.deps.yarn_rm[0].options.yarn_site['yarn.scheduler.maximum-allocation-mb'] yarn_mapred_am_memory_mb = options.mapred_site['yarn.app.mapreduce.am.resource.mb'] or if memory_per_container > 1024 then 2 * memory_per_container else memory_per_container yarn_mapred_am_memory_mb = Math.min rm_memory_max_mb, yarn_mapred_am_memory_mb options.mapred_site['yarn.app.mapreduce.am.resource.mb'] = "#{yarn_mapred_am_memory_mb}"

    yarn_mapred_opts = /-Xmx(.*?)m/.exec(options.mapred_site['yarn.app.mapreduce.am.command-opts'])?[1] or Math.floor(.8 * yarn_mapred_am_memory_mb) yarn_mapred_opts = Math.min rm_memory_max_mb, yarn_mapred_opts options.mapred_site['yarn.app.mapreduce.am.command-opts'] = "-Xmx#{yarn_mapred_opts}m"

    map_memory_mb = options.mapred_site['mapreduce.map.memory.mb'] or memory_per_container map_memory_mb = Math.min rm_memory_max_mb, map_memory_mb map_memory_mb = Math.max rm_memory_min_mb, map_memory_mb options.mapred_site['mapreduce.map.memory.mb'] = "#{map_memory_mb}"

    reduce_memory_mb = options.mapred_site['mapreduce.reduce.memory.mb'] or memory_per_container #2 * memory_per_container reduce_memory_mb = Math.min rm_memory_max_mb, reduce_memory_mb reduce_memory_mb = Math.max rm_memory_min_mb, reduce_memory_mb options.mapred_site['mapreduce.reduce.memory.mb'] = "#{reduce_memory_mb}"

    map_memory_xmx = /-Xmx(.*?)m/.exec(options.mapred_site['mapreduce.map.java.opts'])?[1] or Math.floor .8 * map_memory_mb map_memory_xmx = Math.min rm_memory_max_mb, map_memory_xmx options.mapred_site['mapreduce.map.java.opts'] ?= "-Xmx#{map_memory_xmx}m"

    reduce_memory_xmx = /-Xmx(.*?)m/.exec(options.mapred_site['mapreduce.reduce.java.opts'])?[1] or Math.floor .8 * reduce_memory_mb reduce_memory_xmx = Math.min rm_memory_max_mb, reduce_memory_xmx options.mapred_site['mapreduce.reduce.java.opts'] ?= "-Xmx#{reduce_memory_xmx}m"

    options.mapred_site['mapreduce.task.io.sort.mb'] ?= "#{Math.floor .4 * memory_per_container}"

    map_cpu = options.mapred_site['mapreduce.map.cpu.vcores'] or 1 map_cpu = Math.min rm_cpu_max, map_cpu map_cpu = Math.max rm_cpu_min, map_cpu options.mapred_site['mapreduce.map.cpu.vcores'] = "#{map_cpu}"

    reduce_cpu = options.mapred_site['mapreduce.reduce.cpu.vcores'] or 1 reduce_cpu = Math.min rm_cpu_max, reduce_cpu reduce_cpu = Math.max rm_cpu_min, reduce_cpu options.mapred_site['mapreduce.reduce.cpu.vcores'] = "#{reduce_cpu}"

Write Data To YARN TimelineV2

  if service.deps.yarn_tr?[0].options.yarn_site['yarn.timeline-service.version'] is '2.0'
    options.mapred_site['mapreduce.job.emit-timeline-data'] ?= 'true'

Wait

  options.wait_mapred_jhs = service.deps.mapred_jhs.options.wait
  options.wait_yarn_ts = service.deps.yarn_ts?.options?.wait if service.deps.yarn_ts
  options.wait_yarn_tr = service.deps.yarn_tr?.options?.wait if service.deps.yarn_tr
  options.wait_yarn_nm = service.deps.yarn_nm[0].options.wait
  options.wait_yarn_rm = service.deps.yarn_rm[0].options.wait

Dependencies

{merge} = require '@nikitajs/core/lib/misc'