Menu

Apache Spark Configure

module.exports = (service) ->
  options = service.options

Identities

  # Group
  options.group ?= {}
  options.group = name: options.group if typeof options.group is 'string'
  options.group.name ?= 'spark'
  options.group.system ?= true
  # User
  options.user ?= {}
  options.user = name: options.user if typeof options.user is 'string'
  options.user.name ?= 'spark'
  options.user.system ?= true
  options.user.comment ?= 'Spark User'
  options.user.home ?= '/var/lib/spark'
  options.user.groups ?= 'hadoop'
  options.user.gid ?= options.group.name

Kerberos

  # Kerberos HDFS Admin
  options.hdfs_krb5_user ?= service.deps.hadoop_core.options.hdfs.krb5_user
  # Kerberos Test Principal
  options.test_krb5_user ?= service.deps.test_user.options.krb5.user

Environment

  # Layout
  options.client_dir ?= '/usr/hdp/current/spark2-client'
  options.conf_dir ?= '/etc/spark2/conf'
  # Misc
  options.hostname = service.node.hostname
  # options.hdfs_defaultfs = service.deps.hdfs_nn[0].options.core_site['fs.defaultFS']

Tez

Tez configuration directory is injected into "spark-env.sh".

  options.tez_conf_dir = service.deps.tez.options.env['TEZ_CONF_DIR'] if service.deps.tez

Hive server2

Test

  options.ranger_admin ?= service.deps.ranger_admin.options.admin if service.deps.ranger_admin
  options.ranger_install = service.deps.ranger_hive[0].options.install if service.deps.ranger_hive
  options.test = merge {}, service.deps.test_user.options, options.test
  # Hive Server2
  if service.deps.hive_server2
    options.hive_server2 = for srv in service.deps.hive_server2
      fqdn: srv.options.fqdn
      hostname: srv.options.hostname
      hive_site: srv.options.hive_site

Configuration

  options.conf ?= {}
  options.conf['spark.master'] ?= "local[*]"
  # For [Spark on YARN deployments][[secu]], configuring spark.authenticate to true
  # will automatically handle generating and distributing the shared secret.
  # Each application will use a unique shared secret. 
  # http://spark.apache.org/docs/1.6.0/configuration.html#security
  options.conf['spark.authenticate'] ?= "true"
  if options.conf['spark.authenticate']
    options.conf['spark.authenticate.secret'] ?= 'my-secret-key' 
    throw Error 'spark.authenticate.secret is needed when spark.authenticate is true' unless options.conf['spark.authenticate.secret']
  # This causes Spark applications running on this client to write their history to the directory that the history server reads.
  options.conf['spark.eventLog.enabled'] ?= "true"
  options.conf['spark.yarn.services'] ?= "org.apache.spark.deploy.yarn.history.YarnHistoryService"
  # set to only supported one http://spark.apache.org/docs/1.6.0/monitoring.html#viewing-after-the-fact
  # https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_upgrading_hdp_manually/content/upgrade-spark-23.html
  options.conf['spark.history.provider'] ?= 'org.apache.spark.deploy.history.FsHistoryProvider'
  # Base directory in which Spark events are logged, if spark.eventLog.enabled is true.
  # Within this base directory, Spark creates a sub-directory for each application, and logs the events specific to the application in this directory.
  # Users may want to set this to a unified location like an HDFS directory so history files can be read by the history server.
  options.conf['spark.eventLog.dir'] ?= "#{service.deps.hdfs_nn[0].options.core_site['fs.defaultFS']}/user/#{options.user.name}/applicationHistory"
  options.conf['spark.history.fs.logDirectory'] ?= "#{options.conf['spark.eventLog.dir']}"
  options.conf['spark.eventLog.overwrite'] ?= 'true'
  options.conf['spark.yarn.jar'] ?= "hdfs:///apps/#{options.user.name}/spark-assembly.jar"
  # options.conf['spark.yarn.applicationMaster.waitTries'] = null Deprecated in favor of "spark.yarn.am.waitTime"
  options.conf['spark.yarn.am.waitTime'] ?= '10'
  options.conf['spark.yarn.containerLauncherMaxThreads'] ?= '25'
  options.conf['spark.yarn.driver.memoryOverhead'] ?= '384'
  options.conf['spark.yarn.executor.memoryOverhead'] ?= '384'
  options.conf['spark.yarn.max.executor.failures'] ?= '3'
  options.conf['spark.yarn.preserve.staging.files'] ?= 'false'
  options.conf['spark.yarn.queue'] ?= 'default'
  options.conf['spark.yarn.scheduler.heartbeat.interval-ms'] ?= '5000'
  options.conf['spark.yarn.services'] ?= 'org.apache.spark.deploy.yarn.history.YarnHistoryService'
  options.conf['spark.yarn.submit.file.replication'] ?= '3'
  options.dist_files ?= []

SSL

  options.ssl = merge {}, service.deps.ssl.options, options.ssl
  options.conf['spark.ssl.enabled'] ?= "false" # `!!service.deps.ssl`
  # options.conf['spark.ssl.enabledAlgorithms'] ?= "MD5"
  options.conf['spark.ssl.keyPassword'] ?= service.deps.ssl.options.keystore.password
  options.conf['spark.ssl.keyStore'] ?= "#{options.conf_dir}/keystore"
  options.conf['spark.ssl.keyStorePassword'] ?= service.deps.ssl.options.keystore.password
  options.conf['spark.ssl.protocol'] ?= "SSLv3"
  options.conf['spark.ssl.trustStore'] ?= "#{options.conf_dir}/truststore"
  options.conf['spark.ssl.trustStorePassword'] ?= service.deps.ssl.options.truststore.password

Client Metastore Configuration

Spark needs hive-site.xml in order to create hive spark context. Spark is client towards hive/hcatalog and needs its client configuration. the hive-site.xml is set inside /etc/spark/conf/ dir.

  options.hive_site ?= {}
  for property in [
    'hive.metastore.uris'
    'hive.security.authorization.enabled'
    'hive.security.authorization.manager'
    'hive.security.metastore.authorization.manager'
    'hive.security.authenticator.manager'
    # Transaction, read/write locks
    'hive.support.concurrency'
    'hive.enforce.bucketing'
    'hive.exec.dynamic.partition.mode'
    'hive.txn.manager'
    'hive.txn.timeout'
    'hive.txn.max.open.batch'
    'hive.cluster.delegation.token.store.zookeeper.connectString'
    # 'hive.cluster.delegation.token.store.class'
    # 'hive.metastore.local'
    # 'fs.hdfs.impl.disable.cache'
    'hive.metastore.sasl.enabled'
    # 'hive.metastore.cache.pinobjtypes'
    # 'hive.metastore.kerberos.keytab.file'
    'hive.metastore.kerberos.principal'
    # 'hive.metastore.pre.event.listeners'
    'hive.optimize.mapjoin.mapreduce'
    'hive.heapsize'
    'hive.auto.convert.sortmerge.join.noconditionaltask'
    'hive.exec.max.created.files'
    'hive.metastore.warehouse.dir'
    # Transaction, read/write locks
  ] then options.hive_site[property] ?= service.deps.hive_hcatalog[0].options.hive_site[property]
  options.hive_site['hive.execution.engine'] ?= 'mr'

Metrics

Configure the "metrics.properties" to connect Spark to a metrics collector like Ganglia or Graphite. The metrics.properties file needs to be sent to every executor, and spark.metrics.conf=metrics.properties will tell all executors to load that file when initializing their respective MetricsSystems

  # options.conf['spark.metrics.conf'] ?= 'metrics.properties'
  options.conf_metrics ?= false
  if options.conf_metrics
    options.conf['spark.metrics.conf'] ?= "metrics.properties" # Error, spark complain it cant find if value is 'metrics.properties'
    if options.conf['spark.metrics.conf']?
      options.dist_files.push "file://#{options.conf_dir}/metrics.properties" unless options.dist_files.indexOf "file://#{options.conf_dir}/metrics.properties" is -1
    options.metrics =
      'master.source.jvm.class':'org.apache.spark.metrics.source.JvmSource'
      'worker.source.jvm.class':'org.apache.spark.metrics.source.JvmSource'
      'driver.source.jvm.class':'org.apache.spark.metrics.source.JvmSource'
      'executor.source.jvm.class':'org.apache.spark.metrics.source.JvmSource'

    if service.deps.graphite
      options.metrics['*.sink.graphite.class'] = 'org.apache.spark.metrics.sink.GraphiteSink'
      options.metrics['*.sink.graphite.host'] = service.deps.graphite[0].instances.map( (instance) -> instance.node.fqdn ).join ','
      options.metrics['*.sink.graphite.port'] = graphite_ctxs[0].config.ryba.graphite[0].options.carbon_aggregator_port
      options.metrics['*.sink.graphite.prefix'] = "#{graphite_ctxs[0].config.ryba.graphite[0].options.metrics_prefix}.spark"

    # TODO : metrics.MetricsSystem: Sink class org.apache.spark.metrics.sink.GangliaSink cannot be instantialized
    if service.deps.ganglia_collector
      options.metrics['*.sink.ganglia.class'] = 'org.apache.spark.metrics.sink.GangliaSink'
      options.metrics['*.sink.ganglia.host'] = service.deps.ganglia_collector[0].instances.map( (instance) -> instance.node.fqdn ).join ','
      options.metrics['*.sink.ganglia.port'] = service.deps.ganglia_collector[0].options.spark_port
  options.conf['spark.yarn.dist.files'] ?= options.dist_files.join(',') if options.dist_files.length > 0

Dynamic Resource Allocation

Spark mecanism to set up resources based on cluster availability

  #http://spark.apache.org/docs/1.6.0/job-scheduling.html#dynamic-resource-allocation
  options.conf['spark.dynamicAllocation.enabled'] ?= 'false' #disable by default
  options.conf['spark.shuffle.service.enabled'] ?= options.conf['spark.dynamicAllocation.enabled']
  if options.conf['spark.dynamicAllocation.enabled'] is 'true'
    options.conf['spark.shuffle.service.port'] ?= '56789'
    for srv in service.deps.yarn_nm
      aux_services  = srv.options.yarn_site['yarn.nodemanager.aux-services'].split ','
      aux_services.push 'spark_shuffle' unless 'spark_shuffle' in aux_services
      srv.options.yarn_site['yarn.nodemanager.aux-services'] = aux_services.join ','
      srv.options.yarn_site['yarn.nodemanager.aux-services.spark_shuffle.class'] ?= 'org.apache.spark.network.yarn.YarnShuffleService'
      srv.options.yarn_site['spark.shuffle.service.enabled'] ?= 'true'
      srv.options.iptables_rules.push { chain: 'INPUT', jump: 'ACCEPT', dport: options.conf['spark.shuffle.service.port'], protocol: 'tcp', state: 'NEW', comment: "Spark YARN Shuffle Service" }

Wait

  options.wait_yarn_rm = service.deps.yarn_rm[0].options.wait
  options.wait_ranger_admin = service.deps.ranger_admin.options.wait if service.deps.ranger_admin

Dependencies

{merge} = require 'nikita/lib/misc'