Configure
module.exports = (service) ->
options = service.options
Identities
Merge group and user from the Kafka broker configuration.
options.group = merge {}, service.deps.kafka_broker[0].options.group, options.group
options.user = merge {}, service.deps.kafka_broker[0].options.user, options.user
# Admin principal
options.admin ?= {}
options.admin.principal ?= service.deps.kafka_broker[0].options.admin.principal
options.admin.password ?= service.deps.kafka_broker[0].options.admin.password
options.superusers ?= service.deps.kafka_broker[0].options.superusers
# Ranger
options.ranger_admin ?= service.deps.ranger_admin.options.admin if service.deps.ranger_admin
Environment
# Layout
options.conf_dir ?= '/etc/kafka/conf'
# Env
options.env ?= {}
# Misc
options.hostname = service.node.hostname
Kerberos
# JAAS
if service.deps.kafka_broker[0].options.config['zookeeper.set.acl'] is 'true'
options.env['KAFKA_KERBEROS_PARAMS'] ?= "-Djava.security.auth.login.config=#{options.conf_dir}/kafka-client.jaas"
# Kerberos Test Principal
options.test_krb5_user ?= service.deps.test_user.options.krb5.user
Configuration
options.config ?= {}
# Consumer
options.consumer ?= {}
options.consumer.config ?= {}
options.consumer.config['zookeeper.connect'] ?= service.deps.kafka_broker[0].options.config['zookeeper.connect']
# Producer
options.producer ?= {}
options.producer.config ?= {}
options.producer.config['compression.codec'] ?= 'snappy'
# for now the prop 'sasl.kerberos.service.name' has to be deleted because of
# https://issues.apache.org/jira/browse/KAFKA-2974
# http://mail-archives.apache.org/mod_mbox/kafka-commits/201512.mbox/%3Cacb73f26d3bd440ab8a9f33686db0020@git.apache.org%3E
# which result with the error:
# Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file kafka, value in Kafka config kafka
# fixed in 0.9.0.1
# kafka.consumer.config['sasl.kerberos.service.name'] = service.deps.kafka_broker[0].options.config['sasl.kerberos.service.name']
delete options.consumer.config['sasl.kerberos.service.name']
Brokers and protocols
Producer config does not support several protocol like kafka/broker (for example the 'listeners' property), this is why we make dynamic discovery of the best protocol available and pass needed protocol to command line in the checks.
options.protocols = service.deps.kafka_broker[0].options.protocols
options.brokers = {}
for protocol in options.protocols
options.brokers[protocol] = for srv in service.deps.kafka_broker
"#{srv.node.fqdn}:#{srv.options.ports[protocol]}"
ssl_enabled = if service.deps.kafka_broker[0].options.config['ssl.keystore.location'] then true else false
sasl_enabled = if service.deps.kafka_broker[0].options.kerberos then true else false
recommended_protocol = if sasl_enabled
if ssl_enabled then 'SASL_SSL' else 'SASL_PLAINTEXT'
else
if ssl_enabled then 'SSL' else 'PLAINTEXT'
# Producer
options.producer.config['security.protocol'] ?= options.protocols
options.producer.config['metadata.broker.list'] ?= options.brokers[recommended_protocol].join ','
options.producer.config['bootstrap.servers'] ?= options.brokers[recommended_protocol].join ','
# Consumer
options.consumer.config['bootstrap.servers'] ?= options.brokers[recommended_protocol].join ','
options.consumer.config['security.protocol'] ?= recommended_protocol
Log4j
# Producer
options.log4j ?= {}
options.log4j['log4j.rootLogger'] ?= 'INFO, stdout'
options.log4j['log4j.appender.stdout'] ?= 'org.apache.log4j.ConsoleAppender'
options.log4j['log4j.appender.stdout.layout'] ?= 'org.apache.log4j.PatternLayout'
options.log4j['log4j.appender.stdout.layout.ConversionPattern'] ?= '[%d] %p %m (%c)%n'
SSL
options.ssl = merge {}, service.deps.ssl?.options, options.ssl
# Configuration
ssl_enabled = service.deps.kafka_broker[0].options.protocols.some (protocol) ->
protocol in ['SASL_SSL', 'SSL']
if ssl_enabled
options.config['ssl.truststore.location'] ?= "#{options.conf_dir}/truststore"
options.config['ssl.truststore.password'] ?= 'ryba123'
options.consumer.config['ssl.truststore.location'] ?= options.config['ssl.truststore.location']
options.consumer.config['ssl.truststore.password'] ?= options.config['ssl.truststore.password']
options.producer.config['ssl.truststore.location'] ?= options.config['ssl.truststore.location']
options.producer.config['ssl.truststore.password'] ?= options.config['ssl.truststore.password']
Test
options.ranger_install = service.deps.ranger_kafka[0].options.install if service.deps.ranger_kafka
options.test = merge {}, service.deps.test_user.options, options.test
Wait
options.wait_krb5_client = service.deps.krb5_client.options.wait
options.wait_zookeeper_server = service.deps.zookeeper_server[0].options.wait
options.wait_kafka_broker = service.deps.kafka_broker[0].options.wait
options.wait_ranger_admin = service.deps.ranger_admin.options.wait if service.deps.ranger_admin
Dependencies
{merge} = require '@nikitajs/core/lib/misc'