Kafka Check
module.exports = header: 'Kafka Client Check', handler: ({options}) ->
Register
@registry.register 'ranger_policy', 'ryba/ranger/actions/ranger_policy'
Wait
@call once: true, 'masson/core/krb5_client/wait', options.wait_krb5_client
@call once: true, 'ryba/zookeeper/server/wait', options.wait_zookeeper_server
@call once: true, 'ryba/kafka/broker/wait', options.wait_kafka_broker
Add Ranger Policy
@call
header: 'Ranger'
if: !!options.ranger_admin
, ->
@call 'ryba/ranger/admin/wait', once: true, options.wait_ranger_admin
topics = options.protocols.map (prot) =>
"check-#{options.hostname}-client-#{prot.toLowerCase().split('_').join('-')}-topic"
users = ["#{options.test.user.name}", options.superusers...]
users.push 'ANONYMOUS' if ('PLAINTEXT' in options.protocols) or ('SSL' in options.protocols)
@wait.execute
header: 'Wait Service'
cmd: """
curl --fail -H "Content-Type: application/json" -k -X GET \
-u #{options.ranger_admin.username}:#{options.ranger_admin.password} \
"#{options.ranger_install['POLICY_MGR_URL']}/service/public/v2/api/service/name/#{options.ranger_install['REPOSITORY_NAME']}"
"""
code_skipped: [1,7,22] #22 is for 404 not found,7 is for not connected to host
@ranger_policy
header: 'Create Policy'
username: options.ranger_admin.username
password: options.ranger_admin.password
url: options.ranger_install['POLICY_MGR_URL']
policy:
service: "#{options.ranger_install['REPOSITORY_NAME']}"
name: "test-ryba-client-#{options.hostname}"
description: "Policy for ryba kafka Client test"
isAuditEnabled: true
resources:
topic:
values: topics
isExcludes: false
isRecursive: false
policyItems: [
"accesses": [
'type': 'publish'
'isAllowed': true
,
'type': 'consume'
'isAllowed': true
,
'type': 'configure'
'isAllowed': true
,
'type': 'describe'
'isAllowed': true
,
'type': 'create'
'isAllowed': true
,
'type': 'delete'
'isAllowed': true
,
'type': 'kafka_admin'
'isAllowed': true
],
'users': users
'groups': []
'conditions': []
'delegateAdmin': false
]
@wait
time: 10000
if: -> @status -1
PLAINTEXT Protocol
Check Message by writing to a test topic on the PLAINTEXT channel. Since new API (0.8-0.9) and security features, kafka broker are able to deal with multiple channel with different protocols. For its internal functionment, it associates an not authenticated user to ANONYMOUS name when client communicates on PLAINTEXT-SSL protocols.
@call
header: 'PLAINTEXT'
if: 'PLAINTEXT' in options.protocols
retry: 3
, ->
test_topic = "check-#{options.hostname}-client-plaintext-topic"
@system.execute
if: options.env['KAFKA_KERBEROS_PARAMS']?
cmd: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
--partitions #{Math.min(options.brokers['PLAINTEXT'].length-1,1)} \
--replication-factor #{Math.min(options.brokers['PLAINTEXT'].length-1,0)} \
--topic #{test_topic}
"""
unless_exec: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
| grep #{test_topic}
"""
@system.execute
unless: options.env['KAFKA_KERBEROS_PARAMS']? or !!options.ranger_admin
cmd: """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
--partitions #{Math.min(options.brokers['PLAINTEXT'].length-1,1)} \
--replication-factor #{Math.min(options.brokers['PLAINTEXT'].length-1,0)} \
--topic #{test_topic}
"""
unless_exec: """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
| grep #{test_topic}
"""
@system.execute
if: options.env['KAFKA_KERBEROS_PARAMS']?
cmd: mkcmd.kafka options.admin, """
(
sleep 1
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add \
--allow-principal User:ANONYMOUS \
--operation Read \
--operation Write \
--topic #{test_topic}
)&
(
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add \
--allow-principal User:ANONYMOUS \
--consumer \
--producer \
--group #{options.consumer.config['group.id']} \
--topic #{test_topic}
)
"""
unless_exec: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh --list \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--topic #{test_topic} \
| grep 'User:ANONYMOUS has Allow permission for operations: Write from hosts: *'
"""
@system.execute
unless: options.env['KAFKA_KERBEROS_PARAMS']? or !!options.ranger_admin
cmd: """
(
sleep 1
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add --allow-principal User:ANONYMOUS \
--operation Read \
--operation Write \
--topic #{test_topic}
)&
(
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add \
--allow-principal User:ANONYMOUS \
--consumer \
--producer \
--group #{options.consumer.config['group.id']} \
--topic #{test_topic}
)
"""
unless_exec: """
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh --list \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--topic #{test_topic} \
| grep 'User:ANONYMOUS has Allow permission for operations: Write from hosts: *'
"""
@system.execute
cmd: """
(
sleep 5
echo 'hello #{options.hostname}' | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
--producer-property security.protocol=PLAINTEXT \
--broker-list #{options.brokers['PLAINTEXT'].join ','} \
--security-protocol PLAINTEXT \
--producer.config #{options.conf_dir}/producer.properties \
--topic #{test_topic}
)&
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
--delete-consumer-offsets \
--bootstrap-server #{options.brokers['PLAINTEXT'].join ','} \
--topic #{test_topic} \
--security-protocol PLAINTEXT \
--property security.protocol=PLAINTEXT \
--consumer.config #{options.conf_dir}/consumer.properties \
--from-beginning \
--max-messages 1 \
| grep 'hello #{options.hostname}'
"""
SSL Protocol
Check Message by writing to a test topic on the SSL channel. Truststore location and password given to line command because if executed before producer install '/etc/kafka/conf/producer.properties' might be empty.
@call
header: 'SSL'
retry: 3
if: 'SSL' in options.protocols
, ->
test_topic = "check-#{options.hostname}-client-ssl-topic"
@system.execute
cmd: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
--partitions #{Math.min(options.brokers['SSL'].length-1,1)} \
--replication-factor #{Math.min(options.brokers['SSL'].length-1,0)} \
--topic #{test_topic}
"""
unless_exec: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
| grep #{test_topic}
"""
@system.execute
unless: !!options.ranger_admin
cmd: mkcmd.kafka options.admin, """
(
sleep 1
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add --allow-principal User:ANONYMOUS \
--operation Read \
--operation Write \
--topic #{test_topic}
)&
(
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add \
--allow-principal User:ANONYMOUS \
--consumer \
--producer \
--group #{options.consumer.config['group.id']} \
--topic #{test_topic}
)
"""
unless_exec: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh --list \
--authorizer-properties \
zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--topic #{test_topic} \
| grep 'User:ANONYMOUS has Allow permission for operations: Write from hosts: *'
"""
@system.execute
cmd: """
(
sleep 5
echo 'hello #{options.hostname}' | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
--producer-property security.protocol=SSL \
--broker-list #{options.brokers['SSL'].join ','} \
--security-protocol SSL \
--producer-property ssl.truststore.location=#{options.config['ssl.truststore.location']} \
--producer-property ssl.truststore.password=#{options.config['ssl.truststore.password']} \
--producer.config #{options.conf_dir}/producer.properties \
--topic #{test_topic}
)&
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
--delete-consumer-offsets \
--bootstrap-server #{options.brokers['SSL'].join ','} \
--topic #{test_topic} \
--security-protocol SSL \
--property security.protocol=SSL \
--property ssl.truststore.location=#{options.config['ssl.truststore.location']} \
--property ssl.truststore.password=#{options.config['ssl.truststore.password']} \
--consumer.config #{options.conf_dir}/consumer.properties \
--from-beginning \
--max-messages 1 \
| grep 'hello #{options.hostname}'
"""
SASL_PLAINTEXT Protocol
Check Message by writing to a test topic on the SASL_PLAINTEXT channel.
@call
header: 'SASL_PLAINTEXT'
retry: 3
if: 'SASL_PLAINTEXT' in options.protocols
, ->
test_topic = "check-#{options.hostname}-client-sasl-plaintext-topic"
@system.execute
cmd: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
--partitions #{Math.min(options.brokers['SASL_PLAINTEXT'].length-1,1)} \
--replication-factor #{Math.min(options.brokers['SASL_PLAINTEXT'].length-1,0)} \
--topic #{test_topic}
"""
unless_exec: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
| grep #{test_topic}
"""
@system.execute
unless: !!options.ranger_admin
cmd: mkcmd.kafka options.admin, """
(
sleep 1
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add \
--allow-principal User:#{options.test.user.name} \
--operation Read \
--operation Write \
--topic #{test_topic}
)&
(
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add \
--allow-principal User:#{options.test.user.name} \
--consumer --group #{options.config['group.id']} \
--topic #{test_topic}
)
"""
unless_exec: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh --list \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--topic #{test_topic} \
| grep 'User:#{options.test.user.name} has Allow permission for operations: Write from hosts: *'
"""
@system.execute
cmd: mkcmd.test options.test_krb5_user, """
(
sleep 5
echo 'hello #{options.hostname}' | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
--producer-property security.protocol=SASL_PLAINTEXT \
--broker-list #{options.brokers['SASL_PLAINTEXT'].join ','} \
--security-protocol SASL_PLAINTEXT \
--producer.config #{options.conf_dir}/producer.properties \
--topic #{test_topic}
)&
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
--delete-consumer-offsets \
--bootstrap-server #{options.brokers['SASL_PLAINTEXT'].join ','} \
--topic #{test_topic} \
--security-protocol SASL_PLAINTEXT \
--consumer.config #{options.conf_dir}/consumer.properties \
--from-beginning \
--max-messages 1 \
| grep 'hello #{options.hostname}'
"""
SASL_SSL Protocol
Check Message by writing to a test topic on the SASL_SSL channel. Trustore location and password given to line command because if executed before producer install '/etc/kafka/conf/producer.properties' might be empty.
@call
header: 'SASL_SSL'
retry: 3
if: 'SASL_SSL' in options.protocols
, ->
test_topic = "check-#{options.hostname}-client-sasl-ssl-topic"
@system.execute
cmd: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
--partitions #{Math.min(options.brokers['SASL_SSL'].length-1,1)} \
--replication-factor #{Math.min(options.brokers['SASL_SSL'].length-1,0)} \
--topic #{test_topic}
"""
unless_exec: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh \
--list \
--zookeeper #{options.consumer.config['zookeeper.connect']} \
| grep #{test_topic}
"""
@system.execute
unless: !!options.ranger_admin
cmd: mkcmd.kafka options.admin, """
(
sleep 1
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add \
--allow-principal User:#{options.test.user.name} \
--operation Read \
--operation Write \
--topic #{test_topic}
)&
(
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--add \
--allow-principal User:#{options.test.user.name} \
--consumer \
--producer \
--group #{options.consumer.config['group.id']} \
--topic #{test_topic}
)
"""
unless_exec: mkcmd.kafka options.admin, """
/usr/hdp/current/kafka-broker/bin/kafka-acls.sh --list \
--authorizer-properties zookeeper.connect=#{options.consumer.config['zookeeper.connect']} \
--topic #{test_topic} \
| grep 'User:#{options.test.user.name} has Allow permission for operations: Write from hosts: *'
"""
@system.execute
cmd: mkcmd.test options.test_krb5_user, """
(
sleep 5
echo 'hello #{options.hostname}' | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh \
--producer-property security.protocol=SASL_SSL \
--broker-list #{options.brokers['SASL_SSL'].join ','} \
--security-protocol SASL_SSL \
--producer-property ssl.truststore.location=#{options.config['ssl.truststore.location']} \
--producer-property ssl.truststore.password=#{options.config['ssl.truststore.password']} \
--producer.config #{options.conf_dir}/producer.properties \
--topic #{test_topic}
)&
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh \
--delete-consumer-offsets \
--bootstrap-server #{options.brokers['SASL_SSL'].join ','} \
--topic #{test_topic} \
--security-protocol SASL_SSL \
--property security.protocol=SASL_SSL \
--property ssl.truststore.location=#{options.config['ssl.truststore.location']} \
--property ssl.truststore.password=#{options.config['ssl.truststore.password']} \
--consumer.config #{options.conf_dir}/consumer.properties \
--from-beginning \
--max-messages 1 \
| grep 'hello #{options.hostname}'
"""
Dependencies
mkcmd = require '../../lib/mkcmd'