Menu

Apache Atlas Hive Plugin Configuration

module.exports = (service) ->
  options = service.options

Environment

  options.conf_dir = service.deps.hive_server2.options.conf_dir

Libs

  options.oozie = service.deps.oozie
  options.oozie_user  = options.oozie.user if options.oozie
  options.atlas = service.deps.atlas[0]

Configuration

Mechanism: The Apache Hive Hook (or Atlas hook) is running directly inside the hiveserver2's JVM. As a consequence, Kafka Client configuration must be configured on the hiveserver2 side. Ryba does the follwing steps:

  • Configure Hiveserver2' hive-site.xml file with Atlas Props

Write Atlas Configuration file for hive's bridge. Atlas.Hive.Hook needs the atlas applcation file in the hive-server2's configuration directory. Note: The file needs to be called atlas-application.properties as the name is hard coded.

  service.deps.hive_server2.options.atlas ?= {}
  service.deps.hive_server2.options.atlas.client ?= {}
  options.client ?= {}
  options.client.properties ?= {}
  options.client.properties['atlas.http.authentication.enabled'] ?= options.atlas.options.application.properties['atlas.http.authentication.enabled']
  options.client.properties['atlas.http.authentication.type'] ?= options.atlas.options.application.properties['atlas.http.authentication.type']
  options.application ?= {}
  options.application.properties ?= {}
  options.application.properties['atlas.hook.hive.synchronous'] ?= 'false'
  options.application.properties['atlas.hook.hive.numRetries'] ?= '3'
  options.application.properties['atlas.hook.hive.minThreads'] ?= '5'
  options.application.properties['atlas.hook.hive.maxThreads'] ?= '5'
  options.application.properties['atlas.hook.hive.keepAliveTime'] ?= '10'
  options.application.properties['atlas.hook.hive.queueSize'] ?= '10000'
  service.deps.hive_server2.options.hive_site['atlas.cluster.name'] ?= "#{options.atlas.options.cluster_name}"
  # Step 1 - check if the rest adress already written
  # Step 2 (only if 1) Check if an url is already written
  service.deps.hive_server2.options.hive_site['atlas.rest.address'] = add_prop service.deps.hive_server2.options.hive_site['atlas.rest.address'], options.atlas.options.application.urls, ','
  service.deps.hive_server2.options.hive_site['hive.exec.post.hooks'] = add_prop service.deps.hive_server2.options.hive_site['hive.exec.post.hooks'], 'org.apache.atlas.hive.hook.HiveHook', ','
  # server2.aux_jars = add_prop server2.aux_jars, "/usr/hdp/current/atlas-client/hook/hive", ':'
  #Notifications
  chanels = []
  chanels.push 'SASL_SSL' if service.deps.hadoop_core.options.core_site['hadoop.security.authentication'] is 'kerberos' and service.deps.hive_server2.options.hive_site['hive.server2.use.SSL'] is 'true'
  chanels.push 'SASL_PLAINTEXT' if service.deps.hadoop_core.options.core_site['hadoop.security.authentication'] is 'kerberos'
  chanels.push 'SSL' if service.deps.hive_server2.options.hive_site['hive.server2.use.SSL'] is 'true'
  chanels.push 'PLAINTEXT'
  options.application.properties['atlas.kafka.security.protocol'] ?= chanels[0]
  options.application.properties['atlas.notification.topics'] ?= options.atlas.options.application.properties['atlas.notification.topics']
  options.application.properties['atlas.kafka.bootstrap.servers'] ?= options.atlas.options.application.properties['atlas.kafka.bootstrap.servers']
  # Configure Hive Server2 JAAS Properties for posting notifications to Kafka
  if options.application.properties['atlas.kafka.security.protocol'] in ['SASL_PLAINTEXT','SASL_SSL']
    options.application.properties['atlas.jaas.KafkaClient.loginModuleControlFlag'] ?= 'required'
    options.application.properties['atlas.jaas.KafkaClient.loginModuleName'] ?= 'com.sun.security.auth.module.Krb5LoginModule'
    options.application.properties['atlas.jaas.KafkaClient.option.keyTab'] ?= service.deps.hive_server2.options.hive_site['hive.server2.authentication.kerberos.keytab']
    options.application.properties['atlas.jaas.KafkaClient.option.principal'] ?= service.deps.hive_server2.options.hive_site['hive.server2.authentication.kerberos.principal']
    options.application.properties['atlas.jaas.KafkaClient.option.serviceName'] ?= 'kafka'
    options.application.properties['atlas.jaas.KafkaClient.option.storeKey'] ?= 'true'
    options.application.properties['atlas.jaas.KafkaClient.option.useKeyTab'] ?= 'true'
  if options.application.properties['atlas.kafka.security.protocol'] in ['SSL','SASL_SSL']
    #note: service.deps.hadoop_core.options should retrive the srv of the hive'server2 node
    options.application.properties['atlas.kafka.ssl.truststore.location'] ?= service.deps.hadoop_core.options.ssl_client['ssl.client.truststore.location']
    options.application.properties['atlas.kafka.ssl.truststore.password'] ?= service.deps.hadoop_core.options.ssl_client['ssl.client.truststore.password']
  #Administators can choose a different protocol for Atlas Kafka Notification
  protocol = options.application.properties['atlas.kafka.security.protocol']
  if protocol in service.deps.kafka_broker[0].options.protocols
    brokers = service.deps.kafka_broker.map( (srv) =>
      "#{srv.node.fqdn}:#{srv.options.ports[protocol]}"
    ).join ','
    # construcut the bootstrap listeners string base on channel
    # i.e.: SASL_SSL://master1.ryba:9096,master2.ryba:9096,master3.ryba:9096 for example
    options.application.properties['atlas.kafka.bootstrap.servers'] ?= "#{protocol}://#{brokers}"
  else
    throw Error "Atlas Hive Bridge Hook Selected Protocol #{options.application.kafka_chanel} is not allowed by Kafka Brokers configuration"
  #Kafka Ranger PLugin authorization
  if options.application.properties['atlas.kafka.security.protocol'] in  ['SASL_PLAINTEXT','SASL_SSL']
    options.atlas.options.kafka_policy.policyItems[0].users.push "#{service.deps.hive_server2.options.user.name}"
  if options.application.properties['atlas.kafka.security.protocol'] in  ['PLAINTEXT','SSL']
    options.atlas.options.kafka_policy.policyItems[0].users.push 'ANONYMOUS'
  if (options.application.properties['atlas.kafka.security.protocol'] in ['PLAINTEXT','SSL'])
    options.atlas.options.kafka_policy.policyItems[0].users.push 'ANONYMOUS'

utility function

add_prop = (value, add, separator) ->
  throw Error 'No separator provided' unless separator?
  value ?= ''
  return add if value.length is 0
  return if value.indexOf(add) is -1 then "#{value}#{separator}#{add}" else value