Menu

Kafka Broker Install

module.exports = header: 'Kafka Broker Install', handler: ({options}) ->

Register

  @registry.register 'hdp_select', 'ryba/lib/hdp_select'
  @registry.register ['file', 'jaas'], 'ryba/lib/file_jaas'

Wait

  @call 'masson/core/krb5_client/wait', once: true, options.wait_krb5_client

Identities

By default, the "kafka" package create the following entries:

cat /etc/passwd | grep kafka
kafka:x:496:496:KAFKA:/home/kafka:/bin/bash
cat /etc/group | grep kafka
kafka:x:496:kafka
  @system.group header: 'Group', options.group
  @system.user header: 'User', options.user

IPTables

ServicePortProtoParameter
Kafka Broker9092httpport
Kafka Broker9093httpsport
Kafka Broker9094sasl_httpport
Kafka Broker9096sasl_httpsport

IPTables rules are only inserted if the parameter "iptables.action" is set to "start" (default value).

  @tools.iptables
    header: 'IPTables'
    if: options.iptables
    rules: for protocol in options.protocols
      chain: 'INPUT', jump: 'ACCEPT', dport: options.ports[protocol], protocol: 'tcp', state: 'NEW', comment: "Kafka Broker #{protocol}"

Package

Install the Kafka consumer package and set it to the latest version. Note, we select the "kafka-broker" hdp directory. There is no "kafka-consumer" directories.

  @call header: 'Packages', handler: ->
    @service
      name: 'kafka'
    @hdp_select
      name: 'kafka-broker'
    @system.mkdir
      target: '/var/lib/kafka'
      uid: options.user.name
      gid: options.user.name
    @service.init
      if_os: name: ['redhat','centos'], version: '6'
      header: 'Init Script'
      target: '/etc/init.d/kafka-broker'
      source: "#{__dirname}/../resources/kafka-broker.j2"
      local: true
      mode: 0o0755
      context: options: options
    @call
      if_os: name: ['redhat','centos'], version: '7'
    , ->
      @service.init
        header: 'Systemd Script'
        target: '/usr/lib/systemd/system/kafka-broker.service'
        source: "#{__dirname}/../resources/kafka-broker-systemd.j2"
        local: true
        context: options: options
        mode: 0o0640
      @system.tmpfs
        mount: options.run_dir
        uid: options.user.name
        gid: options.group.name
        perm: '0750'

Configure

Update the file "broker.properties" with the properties defined in the "config" option.

  @file
    header: 'Server properties'
    target: "#{options.conf_dir}/server.properties"
    write: for k, v of options.config
      match: RegExp "^#{quote k}=.*$", 'mg'
      replace: "#{k}=#{v}"
      append: true
    backup: true
    eof: true
    mode: 0o0750
    uid: options.user.name
    gid: options.group.name

Metrics

Upload *.properties files in /etc/kafka-broker/conf directory.

  @call header: 'Metrics', handler: ->
    @call (_, callback) ->
      glob "#{__dirname}/../resources/*.properties", (err, files) =>
        for file in files
          continue if /^\./.test path.basename file
          @file
            source: file
            local: true
            target: "#{options.conf_dir}/#{path.basename file}"
            binary: true
        @next callback
    @file
      source: "#{__dirname}/../resources/connect-console-sink.properties"
      local: true
      target: "#{options.conf_dir}/connect-console-sink.properties"
      binary: true
    @file
      source: "#{__dirname}/../resources/connect-console-sink.properties"
      local: true
      target: "#{options.conf_dir}/connect-console-sink.properties"
      binary: true
    @file
      source: "#{__dirname}/../resources/connect-console-sink.properties"
      local: true
      target: "#{options.conf_dir}/connect-console-sink.properties"
      binary: true

Env

Update the kafka-env.sh file (/etc/kafka-broker/conf/kafka-enh.sh) Note: With systemd environment, JAVA_HOME needs to be defined.

  @file
    header: 'Environment'
    source: "#{__dirname}/../resources/kafka-env.sh"
    target: "#{options.conf_dir}/kafka-env.sh"
    write: for k, v of options.env
      match: RegExp "export #{k}=.*", 'm'
      replace: "export #{k}=\"#{v}\" # RYBA, DONT OVERWRITE"
      append: true
    backup: true
    local: true
    eof: true
    mode: 0o0750
    uid: options.user.name
    gid: options.group.name

Logging

Set Log4j properties

  @file.properties
    header: 'Broker Log4j'
    target: "#{options.conf_dir}/log4j.properties"
    content: options.log4j.properties
    backup: true
  @file.properties
    header: 'Common Log4j'
    target: "/etc/kafka/conf/log4j.properties"
    content: options.log4j.properties
    backup: true

Modify bin scripts to set $KAFKA_HOME variable to match /etc/kafka-broker/conf. Replace KAFKA_BROKER_CMD kafka-broker conf directory path This Fixs are needed to be able to isolate confs betwwen broker and client

  @call header: 'Fix Startup Script', handler: ->
    # @file
    #   target: "/usr/hdp/current/kafka-broker/bin/kafka"
    #   write: [
    #     match: /^KAFKA_BROKER_CMD=(.*)/m
    #     replace: "KAFKA_BROKER_CMD=\"$KAFKA_HOME/bin/kafka-server-broker-start.sh #{options.conf_dir}/server.properties\""
    #   ]
    #   backup: true
    #   eof: true
    @file
      target: '/usr/hdp/current/kafka-broker/bin/kafka-server-start.sh'
      write: [
            match: RegExp "^exec.*$", 'mg'
            replace: "exec /usr/hdp/current/kafka-broker/bin/kafka-run-broker-class.sh $EXTRA_ARGS kafka.Kafka #{options.conf_dir}/server.properties # RYBA DON'T OVERWRITE"
        ]
      backup: true
      eof: true
      mode: 0o755
    @system.copy
      source: '/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh'
      target: '/usr/hdp/current/kafka-broker/bin/kafka-run-broker-class.sh'
      mode: 0o0755
    @file
      target: '/usr/hdp/current/kafka-broker/bin/kafka-run-broker-class.sh'
      write: [
        match: RegExp "^KAFKA_ENV=.*$", 'mg'
        replace: "KAFKA_ENV=#{options.conf_dir}/kafka-env.sh # RYBA DON'T OVERWRITE"
      ,
        match: RegExp "KAFKA_GC_LOG_OPTS=\"[^\"]+\"", 'mg'
        replace: """
        if [ -z "$KAFKA_GC_LOG_OPTS" ]; then
            KAFKA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps "
          fi
        """
        replace: "KAFKA_ENV=#{options.conf_dir}/kafka-env.sh # RYBA DON'T OVERWRITE"
      ]
      backup: true
      eof: true
      mode: 0o755

Kerberos

Broker Server principal, keytab and JAAS

  @call
    header: 'Kerberos'
    if: options.config['zookeeper.set.acl'] is 'true'
    handler: ->
      @krb5.addprinc options.krb5.admin,
        header: 'Broker Server Kerberos'
        principal: options.kerberos.principal
        randkey: true
        keytab: options.kerberos.keyTab
        uid: options.user.name
        gid: options.group.name
      @file.jaas
        header: 'Broker JAAS'
        target: "#{options.conf_dir}/kafka-server.jaas"
        content:
          KafkaServer:
            principal: options.kerberos.principal
            keyTab: options.kerberos.keyTab
            useKeyTab: true
            storeKey: true
          Client:
            principal: options.kerberos.principal
            keyTab: options.kerberos.keyTab
            useKeyTab: true
            storeKey: true
        uid: options.user.name
        gid: options.group.name

Kafka Superuser principal generation

      @krb5.addprinc options.krb5.admin,
        header: 'Kafka Superuser kerberos'
        principal: options.admin.principal
        password: options.admin.password

SSL Server

Upload and register the SSL certificate and private key. SSL is enabled at least for inter broker communication

  @call
    header: 'SSL'
    unless: options.config['replication.security.protocol'] is 'PLAINTEXT'
  , ->
    @java.keystore_add
      keystore: options.config['ssl.keystore.location']
      storepass: options.config['ssl.keystore.password']
      key: options.ssl.key.source
      cert: options.ssl.cert.source
      keypass: options.config['ssl.key.password']
      name: options.ssl.key.name
      local: options.ssl.cert.local
    @java.keystore_add
      keystore: options.config['ssl.keystore.location']
      storepass: options.config['ssl.keystore.password']
      caname: "hadoop_root_ca"
      cacert: options.ssl.cacert.source
      local: options.ssl.cacert.local
    # imports kafka broker server hadoop_root_ca CA truststore
    @java.keystore_add
      keystore: options.config['ssl.truststore.location']
      storepass: options.config['ssl.truststore.password']
      caname: "hadoop_root_ca"
      cacert: options.ssl.cacert.source
      local: options.ssl.cacert.local

Layout

Directories in which Kafka data is stored. Each new partition that is created will be placed in the directory which currently has the fewest partitions.

  @call header: 'Layout', handler: ->
    @system.mkdir (
      target: dir
      uid: options.user.name
      gid: options.group.name
      mode: 0o0750
      parent: true
    ) for dir in options.config['log.dirs'].split ','
    @system.mkdir
      target: options.log_dir
      uid: options.user.name
      gid: options.group.name
      mode: 0o0750
    @system.mkdir
      target: options.run_dir
      uid: options.user.name
      gid: options.group.name
      mode: 0o0750

Dependencies

glob = require 'glob'
path = require 'path'
quote = require 'regexp-quote'