Configuration
The module extends the various settings set by the "ryba/hadoop/hdfs" module.
Unless specified otherwise, the number of tolerated failed volumes is set to "1" if at least 4 disks are used for storage.
java_opts
(string) Datanode Java options.
Example:
{
"ryba": {
"hdfs": {
"datanode_opts": "-Xmx1024m",
"sysctl": {
"vm.swappiness": 0,
"vm.overcommit_memory": 1,
"vm.overcommit_ratio": 100,
"net.core.somaxconn": 1024
}
}
}
module.exports = (service) ->
options = service.options
Environment
Set up Java heap size like in ryba/hadoop/hdfs_nn
.
options.pid_dir ?= service.deps.hadoop_core.options.hdfs.pid_dir
options.secure_dn_pid_dir ?= service.deps.hadoop_core.options.hdfs.secure_dn_pid_dir
options.log_dir ?= service.deps.hadoop_core.options.hdfs.log_dir
options.conf_dir ?= '/etc/hadoop-hdfs-datanode/conf'
# Java
options.java_home ?= service.deps.java.options.java_home
options.newsize ?= '200m'
options.heapsize ?= '1024m'
options.hadoop_heap ?= service.deps.hadoop_core.options.hadoop_heap
# Misc
options.clean_logs ?= false
options.hadoop_opts ?= service.deps.hadoop_core.options.hadoop_opts
options.sysctl ?= {}
options.iptables ?= service.deps.iptables and service.deps.iptables.options.action is 'start'
options.fqdn = service.node.fqdn
Identities
options.hadoop_group = merge {}, service.deps.hadoop_core.options.hadoop_group, options.hadoop_group
options.group = merge {}, service.deps.hadoop_core.options.hdfs.group, options.group
options.user = merge {}, service.deps.hadoop_core.options.hdfs.user, options.user
Kerberos
# Kerberos HDFS Admin
options.hdfs_krb5_user = service.deps.hadoop_core.options.hdfs.krb5_user
System Options
options.opts ?= {}
options.opts.base ?= ''
options.opts.java_properties ?= {}
options.opts.jvm ?= {}
options.opts.jvm['-Xms'] ?= options.heapsize
options.opts.jvm['-Xmx'] ?= options.heapsize
options.opts.jvm['-XX:NewSize='] ?= options.newsize #should be 1/8 of datanode heapsize
options.opts.jvm['-XX:MaxNewSize='] ?= options.newsize #should be 1/8 of datanode heapsize
Configuration
options.core_site = merge {}, service.deps.hadoop_core.options.core_site, options.core_site or {}
# Note: moved during masson migration from nn to dn
options.core_site['io.compression.codecs'] ?= "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.SnappyCodec,com.hadoop.compression.lzo.LzoCodec"
options.hdfs_site ?= {}
# Comma separated list of paths. Use the list of directories from $DFS_DATA_DIR.
# For example, /grid/hadoop/hdfs/dn,/grid1/hadoop/hdfs/dn.
options.hdfs_site['dfs.http.policy'] ?= 'HTTPS_ONLY'
options.hdfs_site['dfs.datanode.data.dir'] ?= ['file:///var/hdfs/data']
options.hdfs_site['dfs.datanode.data.dir'] = options.hdfs_site['dfs.datanode.data.dir'].join ',' if Array.isArray options.hdfs_site['dfs.datanode.data.dir']
# options.hdfs_site['dfs.datanode.data.dir.perm'] ?= '750'
options.hdfs_site['dfs.datanode.data.dir.perm'] ?= '700'
if options.core_site['hadoop.security.authentication'] is 'kerberos'
# Default values are retrieved from the official HDFS page called
# ["SecureMode"][hdfs_secure].
# Ports must be below 1024, because this provides part of the security
# mechanism to make it impossible for a user to run a map task which
# impersonates a DataNode
# TODO: Move this to 'ryba/hadoop/hdfs_dn'
options.hdfs_site['dfs.datanode.address'] ?= '0.0.0.0:1004'
options.hdfs_site['dfs.datanode.ipc.address'] ?= '0.0.0.0:50020'
options.hdfs_site['dfs.datanode.http.address'] ?= '0.0.0.0:1006'
options.hdfs_site['dfs.datanode.https.address'] ?= '0.0.0.0:9865'
else
options.hdfs_site['dfs.datanode.address'] ?= '0.0.0.0:50010'
options.hdfs_site['dfs.datanode.ipc.address'] ?= '0.0.0.0:50020'
options.hdfs_site['dfs.datanode.http.address'] ?= '0.0.0.0:9864'
options.hdfs_site['dfs.datanode.https.address'] ?= '0.0.0.0:9865'
Centralized Cache Management
Centralized cache management in HDFS is an explicit caching mechanism that enables you to specify paths to directories or files that will be cached by HDFS.
If you get the error "Cannot start datanode because the configured max locked memory size... is more than the datanode's available RLIMIT_MEMLOCK ulimit," that means that the operating system is imposing a lower limit on the amount of memory that you can lock than what you have configured.
Kerberos
options.krb5 ?= {}
options.krb5.realm ?= service.deps.krb5_client.options.etc_krb5_conf?.libdefaults?.default_realm
options.krb5.principal ?= "dn/#{service.node.fqdn}@#{options.krb5.realm}"
options.krb5.keytab ?= '/etc/security/keytabs/dn.service.keytab'
throw Error 'Required Options: "realm"' unless options.krb5.realm
options.krb5.admin ?= service.deps.krb5_client.options.admin[options.krb5.realm]
# Configuration in "core-site.xml"
options.hdfs_site['dfs.datanode.kerberos.principal'] ?= options.krb5.principal.replace service.node.fqdn, '_HOST'
options.hdfs_site['dfs.datanode.keytab.file'] ?= options.krb5.keytab
SSL
options.ssl = merge {}, service.deps.hadoop_core.options.ssl, options.ssl
options.ssl_server = merge {}, service.deps.hadoop_core.options.ssl_server, options.ssl_server or {}
options.ssl_client = merge {}, service.deps.hadoop_core.options.ssl_client, options.ssl_client or {}
Tuning
dataDirs = options.hdfs_site['dfs.datanode.data.dir'].split(',')
if dataDirs.length > 3
options.hdfs_site['dfs.datanode.failed.volumes.tolerated'] ?= '1'
else
options.hdfs_site['dfs.datanode.failed.volumes.tolerated'] ?= '0'
# Validation
if options.hdfs_site['dfs.datanode.failed.volumes.tolerated'] >= dataDirs.length
throw Error 'Number of failed volumes must be less than total volumes'
options.datanode_opts ?= ''
Storage-Balancing Policy
# http://gbif.blogspot.fr/2015/05/dont-fill-your-hdfs-disks-upgrading-to.html
# http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/admin_dn_storage_balancing.html
options.hdfs_site['dfs.datanode.fsdataset.volume.choosing.policy'] ?= 'org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy'
options.hdfs_site['dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold'] ?= '10737418240' # 10GB
options.hdfs_site['dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction'] ?= '1.0'
# Note, maybe do a better estimation of du.reserved inside capacity
# currently, 50GB throw DataXceiver exception inside vagrant vm
options.hdfs_site['dfs.datanode.du.reserved'] ?= '1073741824' # 1GB, also default in ambari
HDFS Balancer Performance increase (Fast Mode)
# https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_hdfs-administration/content/configuring_balancer.html
# https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.5.0/bk_hdfs-administration/content/recommended_configurations.html
options.hdfs_site['dfs.datanode.balance.max.concurrent.moves'] ?= Math.max 5, dataDirs.length * 4
options.hdfs_site['dfs.datanode.balance.bandwidthPerSec'] ?= 10737418240 #(10 GB/s) default is 1048576 (=1MB/s)
HDFS Short-Circuit Local Reads
Short Circuit need to be configured on the DataNode and the client.
options.hdfs_site['dfs.client.read.shortcircuit'] ?= if (service.node.services.some (srv) -> srv.module is 'ryba/hadoop/hdfs_dn') then 'true' else 'false'
options.hdfs_site['dfs.domain.socket.path'] ?= '/var/lib/hadoop-hdfs/dn_socket'
Metrics
options.metrics = merge {}, service.deps.metrics?.options, options.metrics
options.metrics.config ?= {}
options.metrics.sinks ?= {}
options.metrics.sinks.file_enabled ?= true
options.metrics.sinks.ganglia_enabled ?= false
options.metrics.sinks.graphite_enabled ?= false
# File sink
if options.metrics.sinks.file_enabled
options.metrics.config["datanode.sink.file.class"] ?= 'org.apache.hadoop.metrics2.sink.FileSink'
options.metrics.config['datanode.sink.file.filename'] ?= 'datanode-metrics.out'
# Ganglia sink, accepted properties are "servers" and "supportsparse"
if options.metrics.sinks.ganglia_enabled
options.metrics.config["datanode.sink.ganglia.class"] ?= 'org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31'
options.metrics.config["*.sink.ganglia.#{k}"] ?= v for k, v of options.sinks.ganglia.config if service.deps.metrics?.options?.sinks?.ganglia_enabled
# Graphite Sink
if options.metrics.sinks.graphite_enabled
throw Error 'Missing remote_host ryba.hdfs.dn.metrics.sinks.graphite.config.server_host' unless options.metrics.sinks.graphite.config.server_host?
throw Error 'Missing remote_port ryba.hdfs.dn.metrics.sinks.graphite.config.server_port' unless options.metrics.sinks.graphite.config.server_port?
options.metrics.config["datanode.sink.graphite.class"] ?= 'org.apache.hadoop.metrics2.sink.GraphiteSink'
options.metrics.config["*.sink.graphite.#{k}"] ?= v for k, v of service.deps.metrics.options.sinks.graphite.config if service.deps.metrics?.options?.sinks?.graphite_enabled
Configuration for Log4J
Inherits log4j configuration from the ryba/log4j
. The rendered file uses the variable
options.log4j.properties
options.log4j = merge {}, service.deps.log4j?.options, options.log4j
options.log4j.properties ?= {}
options.log4j.root_logger ?= 'INFO,RFA'
options.log4j.security_logger ?= 'INFO,RFAS'
options.log4j.audit_logger ?= 'INFO,RFAAUDIT'
if options.log4j.remote_host? and options.log4j.remote_port?
# adding SOCKET appender
options.log4j.socket_client ?= "SOCKET"
# Root logger
if options.log4j.root_logger.indexOf(options.log4j.socket_client) is -1
then options.log4j.root_logger += ",#{options.log4j.socket_client}"
# Security Logger
if options.log4j.security_logger.indexOf(options.log4j.socket_client) is -1
then options.log4j.security_logger += ",#{options.log4j.socket_client}"
# Audit Logger
if options.log4j.audit_logger.indexOf(options.log4j.socket_client) is -1
then options.log4j.audit_logger += ",#{options.log4j.socket_client}"
# Adding Application name, remote host and port values in namenode's opts
options.opts['hadoop.log.application'] ?= 'datanode'
options.opts['hadoop.log.remote_host'] ?= options.log4j.remote_host
options.opts['hadoop.log.remote_port'] ?= options.log4j.remote_port
options.log4j.socket_opts ?=
Application: '${hadoop.log.application}'
RemoteHost: '${hadoop.log.remote_host}'
Port: '${hadoop.log.remote_port}'
ReconnectionDelay: '10000'
options.log4j.properties = merge options.log4j.properties, appender
type: 'org.apache.log4j.net.SocketAppender'
name: options.log4j.socket_client
logj4: options.log4j.properties
properties: options.log4j.socket_opts
Wait
options.wait_krb5_client = service.deps.krb5_client.options.wait
options.wait_zookeeper_server = service.deps.zookeeper_server[0].options.wait
options.wait = {}
options.wait.tcp = for srv in service.deps.hdfs_dn
is_krb5 = options.core_site['hadoop.security.authentication'] is 'kerberos'
addr = if srv.options.hdfs_site?['dfs.datanode.address']?
then srv.options.hdfs_site['dfs.datanode.address']
else unless is_krb5 then '0.0.0.0:50010' else '0.0.0.0:1004'
[_, port] = addr.split ':'
host: srv.node.fqdn, port: port
options.wait.ipc = for srv in service.deps.hdfs_dn
addr = if srv.options.hdfs_site?['dfs.datanode.ipc.address']?
then srv.options.hdfs_site['dfs.datanode.ipc.address']
else '0.0.0.0:50020'
[_, port] = addr.split ':'
host: srv.node.fqdn, port: port
options.wait.http = for srv in service.deps.hdfs_dn
policy = if srv.options.hdfs_site?['dfs.http.policy']?
then srv.options.hdfs_site['dfs.http.policy']
else options.hdfs_site['dfs.http.policy']
protocol = if policy is 'HTTP_ONLY' then 'http' else 'https'
addr = if srv.options.hdfs_site?["dfs.datanode.#{protocol}.address"]?
then srv.options.hdfs_site["dfs.datanode.#{protocol}.address"]
else options.hdfs_site["dfs.datanode.#{protocol}.address"]
[_, port] = addr.split ':'
host: srv.node.fqdn, port: port
# current datanode wait (local one)
is_krb5 = options.core_site['hadoop.security.authentication'] is 'kerberos'
policy = options.hdfs_site['dfs.http.policy']
http_addr = options.hdfs_site["dfs.datanode.#{protocol}.address"]
tcp_addr = unless is_krb5 then '0.0.0.0:50010' else '0.0.0.0:1004'
ipc_addr = '0.0.0.0:50020'
protocol = if policy is 'HTTP_ONLY' then 'http' else 'https'
options.wait.tcp_local =
host: tcp_addr.split(':')[0], port: tcp_addr.split(':')[1]
options.wait.ipc_local =
host: ipc_addr.split(':')[0], port: ipc_addr.split(':')[1]
options.wait.http_local =
host: http_addr.split(':')[0], port: http_addr.split(':')[1]
Dependencies
{merge} = require '@nikitajs/core/lib/misc'
appender = require '../../lib/appender'