Configure Kafka Broker
Example:
{
"ryba": {
"kafka": {
"broker": {
"heapsize": 1024
}
}
}
}
module.exports = (service) ->
options = service.options
Kerberos
options.krb5 ?= {}
options.krb5.realm ?= service.deps.krb5_client.options.etc_krb5_conf?.libdefaults?.default_realm
throw Error 'Required Options: "realm"' unless options.krb5.realm
options.krb5.admin ?= service.deps.krb5_client.options.admin[options.krb5.realm]
Identities
# Group
options.group = name: options.group if typeof options.group is 'string'
options.group ?= {}
options.group.name ?= 'kafka'
options.group.system ?= true
# User
options.user ?= {}
options.user = name: options.user if typeof options.user is 'string'
options.user.name ?= options.group.name
options.user.gid = options.group.name
options.user.system ?= true
options.user.comment ?= 'Kafka User'
options.user.home = "/var/lib/#{options.user.name}"
options.user.limits ?= {}
options.user.limits.nofile ?= 64000
options.user.limits.nproc ?= 32000
# Admin
options.admin ?= {}
options.admin.principal ?= "#{options.user.name}@#{options.krb5.realm}"
throw Error "Required Option: admin.password" unless options.admin.password
#list of kafka superusers
# match = /^(.+?)[@\/]/.exec options.admin.principal
# throw Error 'Invalid kafka.broker.admin.principal' unless match
# options.superusers ?= [match[0]]
# throw Error 'Kafka admin_principal must be in kafka superusers' unless match[0] in options.superusers
options.superusers ?= [options.admin.principal.split('@')[0].split('/')[0]]
Environment
# Layout
options.conf_dir ?= '/etc/kafka-broker/conf'
options.log_dir ?= '/var/log/kafka'
options.run_dir ?= '/var/run/kafka'
# Env and Java
options.heapsize ?= '1024'
options.env ?= {}
# A more agressive configuration for production is provided here:
# http://docs.confluent.io/1.0.1/kafka-rest/docs/deployment.html#jvm
options.env['KAFKA_HEAP_OPTS'] ?= "-Xmx#{options.heapsize}m -Xms#{options.heapsize}m"
# Avoid console verbose ouput in a non-rotated kafka.out file
# options.env['KAFKA_LOG4J_OPTS'] ?= "-Dlog4j.configuration=file:$base_dir/../config/log4j.properties -Dkafka.root.logger=INFO, kafkaAppender"
options.env['KAFKA_LOG4J_OPTS'] ?= "-Dlog4j.configuration=file:#{options.conf_dir}/log4j.properties"
options.env['KAFKA_GC_LOG_OPTS'] ?= "-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps "
# Misc
options.iptables ?= service.deps.iptables and service.deps.iptables.options.action is 'start'
ZooKeeper Quorun
options.zookeeper_quorum ?= for srv in service.deps.zookeeper_server
continue unless srv.options.config['peerType'] is 'participant'
"#{srv.node.fqdn}:#{srv.options.config['clientPort']}"
Configuration
options.config ?= {}
options.config['log.dirs'] ?= '/var/kafka' # Comma-separated, default is "/tmp/kafka-logs"
options.config['log.dirs'] = options.config['log.dirs'].join ',' if Array.isArray options.config['log.dirs']
options.config['zookeeper.connect'] ?= options.zookeeper_quorum
options.config['log.retention.hours'] ?= '168'
options.config['delete.topic.enable'] ?= 'true'
options.config['zookeeper.set.acl'] ?= 'true'
options.config['super.users'] ?= options.superusers.map( (user) -> "User:#{user}").join(',')
options.config['num.partitions'] ?= service.instances.length # Default number of log partitions per topic, default is "2"
options.config['auto.create.topics.enable'] ?= 'false'
for instance, i in service.instances
options.config['broker.id'] ?= "#{i}" if instance.node.fqdn is service.node.fqdn
Metrics
options.metrics = merge {}, service.deps.metrics?.options, options.metrics
options.metrics.sinks ?= {}
options.metrics.sinks.file_enabled ?= true
options.metrics.sinks.ganglia_enabled ?= false
options.metrics.sinks.graphite_enabled ?= false
# Graphite Sink
if options.metrics.sinks.graphite_enabled
throw Error 'Missing remote_host ryba.kafka.broker.metrics.sinks.graphite.config.server_host' unless options.metrics.sinks.graphite.config?.server_host?
throw Error 'Missing remote_port ryba.kafka.broker.metrics.sinks.graphite.config.server_port' unless options.metrics.sinks.graphite.config?.server_port?
options.config['kafka.metrics.reporters'] ?= 'com.criteo.kafka.KafkaGraphiteMetricsReporter'
options.config['kafka.graphite.metrics.reporter.enabled'] ?= 'true'
options.config['kafka.graphite.metrics.host'] ?= options.metrics.sinks.graphite.config.server_host
options.config['kafka.graphite.metrics.port'] ?= options.metrics.sinks.graphite.config.server_port
options.config['kafka.graphite.metrics.group'] ?= "#{options.metrics.sinks.graphite.config.metrics_prefix}.#{service.node.fqdn}"
Log4J
options.log4j = merge {}, service.deps.log4j?.options, options.log4j
options.log4j.properties ?= {}
options.log4j.properties['log4j.appender.stdout'] ?= 'org.apache.log4j.ConsoleAppender'
options.log4j.properties['log4j.appender.stdout.layout'] ?= 'org.apache.log4j.PatternLayout'
options.log4j.properties['log4j.appender.stdout.layout.ConversionPattern'] ?= '[%d] %p %m (%c)%n'
options.log4j.properties['log4j.appender.kafkaAppender'] ?= 'org.apache.log4j.RollingFileAppender'
options.log4j.properties['log4j.appender.kafkaAppender.MaxFileSize'] ?= '100MB'
options.log4j.properties['log4j.appender.kafkaAppender.MaxBackupIndex'] ?= '10'
options.log4j.properties['log4j.appender.kafkaAppender.File'] ?= '${kafka.logs.dir}/server.log'
options.log4j.properties['log4j.appender.kafkaAppender.layout'] ?= 'org.apache.log4j.PatternLayout'
options.log4j.properties['log4j.appender.kafkaAppender.layout.ConversionPattern'] ?= '[%d] %p %m (%c)%n'
options.log4j.properties['log4j.appender.stateChangeAppender'] ?= 'org.apache.log4j.RollingFileAppender'
options.log4j.properties['log4j.appender.stateChangeAppender.MaxFileSize'] ?= '100MB'
options.log4j.properties['log4j.appender.stateChangeAppender.MaxBackupIndex'] ?= '1'
options.log4j.properties['log4j.appender.stateChangeAppender.File'] ?= '${kafka.logs.dir}/state-change.log'
options.log4j.properties['log4j.appender.stateChangeAppender.layout'] ?= 'org.apache.log4j.PatternLayout'
options.log4j.properties['log4j.appender.stateChangeAppender.layout.ConversionPattern'] ?= '[%d] %p %m (%c)%n'
options.log4j.properties['log4j.appender.requestAppender'] ?= 'org.apache.log4j.RollingFileAppender'
options.log4j.properties['log4j.appender.requestAppender.MaxFileSize'] ?= '100MB'
options.log4j.properties['log4j.appender.requestAppender.MaxBackupIndex'] ?= '1'
options.log4j.properties['log4j.appender.requestAppender.File'] ?= '${kafka.logs.dir}/kafka-request.log'
options.log4j.properties['log4j.appender.requestAppender.layout'] ?= 'org.apache.log4j.PatternLayout'
options.log4j.properties['log4j.appender.requestAppender.layout.ConversionPattern'] ?= '[%d] %p %m (%c)%n'
options.log4j.properties['log4j.appender.cleanerAppender'] ?= 'org.apache.log4j.RollingFileAppender'
options.log4j.properties['log4j.appender.cleanerAppender.MaxFileSize'] ?= '100MB'
options.log4j.properties['log4j.appender.cleanerAppender.MaxBackupIndex'] ?= '1'
options.log4j.properties['log4j.appender.cleanerAppender.File'] ?= '${kafka.logs.dir}/log-cleaner.log'
options.log4j.properties['log4j.appender.cleanerAppender.layout'] ?= 'org.apache.log4j.PatternLayout'
options.log4j.properties['log4j.appender.cleanerAppender.layout.ConversionPattern'] ?= '[%d] %p %m (%c)%n'
options.log4j.properties['log4j.appender.controllerAppender'] ?= 'org.apache.log4j.RollingFileAppender'
options.log4j.properties['log4j.appender.controllerAppender.MaxFileSize'] ?= '100MB'
options.log4j.properties['log4j.appender.controllerAppender.MaxBackupIndex'] ?= '1'
options.log4j.properties['log4j.appender.controllerAppender.File'] ?= '${kafka.logs.dir}/controller.log'
options.log4j.properties['log4j.appender.controllerAppender.layout'] ?= 'org.apache.log4j.PatternLayout'
options.log4j.properties['log4j.appender.controllerAppender.layout.ConversionPattern'] ?= '[%d] %p %m (%c)%n'
options.log4j.properties['log4j.appender.authorizerAppender'] ?= 'org.apache.log4j.RollingFileAppender'
options.log4j.properties['log4j.appender.authorizerAppender.MaxFileSize'] ?= '100MB'
options.log4j.properties['log4j.appender.authorizerAppender.MaxBackupIndex'] ?= '1'
options.log4j.properties['log4j.appender.authorizerAppender.File'] ?= '${kafka.logs.dir}/kafka-authorizer.log'
options.log4j.properties['log4j.appender.authorizerAppender.layout'] ?= 'org.apache.log4j.PatternLayout'
options.log4j.properties['log4j.appender.authorizerAppender.layout.ConversionPattern'] ?= '[%d] %p %m (%c)%n'
options.log4j.extra_appender = ''
if options.log4j.remote_host and options.log4j.remote_port
options.log4j.extra_appender = ',socketAppender'
options.log4j.properties['log4j.appender.socketAppender'] ?= 'org.apache.log4j.net.SocketAppender'
options.log4j.properties['log4j.appender.socketAppender.Application'] ?= 'kafka'
options.log4j.properties['log4j.appender.socketAppender.RemoteHost'] ?= options.log4j.remote_host
options.log4j.properties['log4j.appender.socketAppender.Port'] ?= options.log4j.remote_port
options.log4j.properties['log4j.appender.socketAppender.ReconnectionDelay'] ?= '10000'
#options.log4j.properties['log4j.logger.kafka.producer.async.DefaultEventHandler'] ?= 'DEBUG, kafkaAppender' + options.log4j.extra_appender
#options.log4j.properties['log4j.logger.kafka.client.ClientUtils'] ?= 'DEBUG, kafkaAppender' + options.log4j.extra_appender
#options.log4j.properties['log4j.logger.kafka.perf'] ?= 'DEBUG, kafkaAppender' + ' socketAppender' + options.log4j.extra_appender
#options.log4j.properties['log4j.logger.kafka.perf.ProducerPerformance$ProducerThread'] ?= 'DEBUG, kafkaAppender' + options.log4j.extra_appender
#options.log4j.properties['log4j.logger.org.I0Itec.zkclient.ZkClient'] ?= 'DEBUG'
#options.log4j.properties['log4j.logger.kafka.network.Processor'] ?= 'TRACE, requestAppender' + options.log4j.extra_appender
#options.log4j.properties['log4j.logger.kafka.server.KafkaApis'] ?= 'TRACE, requestAppender' + options.log4j.extra_appender
#options.log4j.properties['log4j.additivity.kafka.server.KafkaApis'] ?= 'false'
options.log4j.properties['log4j.rootLogger'] ?= 'INFO, kafkaAppender' + options.log4j.extra_appender
options.log4j.properties['log4j.logger.kafka'] ?= 'INFO, kafkaAppender' + options.log4j.extra_appender
options.log4j.properties['log4j.additivity.kafka'] ?= 'false'
options.log4j.properties['log4j.logger.kafka.network.RequestChannel$'] ?= 'WARN, requestAppender' + options.log4j.extra_appender
options.log4j.properties['log4j.additivity.kafka.network.RequestChannel$'] ?= 'false'
options.log4j.properties['log4j.logger.kafka.request.logger'] ?= 'WARN, requestAppender' + options.log4j.extra_appender
options.log4j.properties['log4j.additivity.kafka.request.logger'] ?= 'false'
options.log4j.properties['log4j.logger.kafka.controller'] ?= 'TRACE, controllerAppender' + options.log4j.extra_appender
options.log4j.properties['log4j.additivity.kafka.controller'] ?= 'false'
options.log4j.properties['log4j.logger.kafka.log.LogCleaner'] ?= 'INFO, cleanerAppender' + options.log4j.extra_appender
options.log4j.properties['log4j.additivity.kafka.log.LogCleaner'] ?= 'false'
options.log4j.properties['log4j.logger.state.change.logger'] ?= 'TRACE, stateChangeAppender' + options.log4j.extra_appender
options.log4j.properties['log4j.additivity.state.change.logger'] ?= 'false'
options.log4j.properties['log4j.logger.kafka.authorizer.logger'] ?= 'WARN, authorizerAppender' + options.log4j.extra_appender
options.log4j.properties['log4j.additivity.kafka.authorizer.logger'] ?= 'false'
# Push user and group configuration to consumer and producer
# for csm_ctx in ctx.contexts ['ryba/kafka/consumer', 'ryba/kafka/producer']
# csm_ctx.config.ryba ?= {}
# csm_ctx.config.ryba.kafka ?= {}
# csm_ctx.config.ryba.kafka.user ?= kafka.user
# csm_ctx.config.ryba.kafka.group ?= kafka.group
Kafka Broker Protocols
Sarting from 0.9, kafka broker supports multiple secured and un-secured protocols when broadcasting messages for broker/broker and client/broker communications. They are PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. By default it set at least to SSL for broker/broker and client/broker. For broker/client communication all protocols are supported. For broker/broker communication we allow only SSL or SASL_SSL. Needed protocols can be set at cluster config level.
Example only PLAINTEXT: { "ryba": { "kafka": { "broker": { "protocols" : "PLAINTEXT" } } } } Example PLAINTEXT and SSL: { "ryba": { "kafka": { "broker": { "protocols" : ["PLAINTEXT","SSL"] } } } }
options.protocols ?= if service.deps.hadoop_core.options.core_site['hadoop.security.authentication'] is 'kerberos' then ['SASL_SSL'] else ['SSL']
return Error 'No protocol specified' unless options.protocols.length > 0
options.ports ?= {}
options.ports['PLAINTEXT'] ?= '9092'
options.ports['SSL'] ?= '9093'
options.ports['SASL_PLAINTEXT'] ?= '9094'
options.ports['SASL_SSL'] ?= '9096'
Security protocol used to communicate between brokers
Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
options.config['security.inter.broker.protocol'] ?= if service.deps.hadoop_core.options.core_site['hadoop.security.authentication'] is 'kerberos'
then ['SASL_SSL']
else ['SSL']
Security SSL
options.ssl = merge {}, service.deps.ssl?.options, options.ssl
options.config['ssl.keystore.location'] ?= "#{options.conf_dir}/keystore"
throw Error "Required Option: options.config['ssl.keystore.password']" unless options.config['ssl.keystore.password']
options.config['ssl.key.password'] ?= options.config['ssl.keystore.password']
options.config['ssl.truststore.location'] ?= "#{options.conf_dir}/truststore"
throw Error "Required Option: options.config['ssl.truststore.passwor']" unless options.config['ssl.truststore.password']
Security Kerberos & ACL
if options.config['zookeeper.set.acl'] is 'true'
options.kerberos ?= {}
options.kerberos['principal'] ?= "#{options.user.name}/#{service.node.fqdn}@#{options.krb5.realm}"
options.kerberos['keyTab'] ?= '/etc/security/keytabs/kafka.service.keytab'
match = /^(.+?)[@\/]/.exec options.kerberos['principal']
options.config['sasl.kerberos.service.name'] = "#{match[1]}"
# set to true to be able to use 9092 if PLAINTEXT only mode is enabled
options.config['allow.everyone.if.no.acl.found'] ?= 'false'
options.config['authorizer.class.name'] ?= 'kafka.security.auth.SimpleAclAuthorizer'
options.env['KAFKA_KERBEROS_PARAMS'] ?= "-Djava.security.auth.login.config=#{options.conf_dir}/kafka-server.jaas"
Brokers internal communication
if options.config['zookeeper.set.acl'] is 'true'
options.config['replication.security.protocol'] ?= 'SASL_SSL'
else
options.config['replication.security.protocol'] ?= 'SSL'
# Validation
for prot in options.protocols
throw Error 'ACL must be activated' if prot.indexOf('SASL') > -1 and options.config['zookeeper.set.acl'] isnt 'true'
Listeners Protocols
# HDP 2.5.0
throw Error 'security.inter.broker.protocol must be a protocol in the configured set of advertised.listeners' unless options.config['replication.security.protocol'] in options.protocols
options.config['listeners'] ?= options.protocols
.map (protocol) -> "#{protocol}://#{service.node.fqdn}:#{options.ports[protocol]}"
.join ','
Wait
options.wait_krb5_client = service.deps.krb5_client.options.wait
options.wait_zookeeper_server = service.deps.zookeeper_server[0].options.wait
options.wait = {}
# options.wait.brokers = for srv in service.deps.kafka_broker
# for protocol in options.protocols
# host: srv.node.fqdn
# port: options.ports[protocol]
for protocol in options.protocols
options.wait[protocol] = for srv in service.deps.kafka_broker
host: srv.node.fqdn
port: options.ports[protocol]
Dependencies
{merge} = require '@nikitajs/core/lib/misc'