Menu

Apache Spark Configure

module.exports = (service) ->
  options = service.options

Identities

  # Group
  options.group ?= {}
  options.group = name: options.group if typeof options.group is 'string'
  options.group.name ?= 'spark'
  options.group.system ?= true
  # User
  options.user ?= {}
  options.user = name: options.user if typeof options.user is 'string'
  options.user.name ?= 'spark'
  options.user.system ?= true
  options.user.comment ?= 'Spark User'
  options.user.home ?= '/var/lib/spark'
  options.user.groups ?= 'hadoop'
  options.user.gid ?= options.group.name

Kerberos

  # Kerberos HDFS Admin
  options.hdfs_krb5_user ?= service.deps.hadoop_core.options.hdfs.krb5_user
  # Kerberos Test Principal
  options.test_krb5_user ?= service.deps.test_user.options.krb5.user

Environment

  # Layout
  options.client_dir ?= '/usr/hdp/current/spark-client'
  options.conf_dir ?= '/etc/spark/conf'
  # Misc
  options.hostname = service.node.hostname
  # options.hdfs_defaultfs = service.deps.hdfs_nn[0].options.core_site['fs.defaultFS']

Tez

Tez configuration directory is injected into "spark-env.sh".

  options.tez_conf_dir = service.deps.tez.options.env['TEZ_CONF_DIR'] if service.deps.tez

Hive server2

Test

  options.ranger_admin ?= service.deps.ranger_admin.options.admin if service.deps.ranger_admin
  options.ranger_install = service.deps.ranger_hive[0].options.install if service.deps.ranger_hive
  options.test = merge {}, service.deps.test_user.options, options.test
  # Hive Server2
  if service.deps.hive_server2
    options.hive_server2 = for srv in service.deps.hive_server2
      fqdn: srv.options.fqdn
      hostname: srv.options.hostname
      hive_site: srv.options.hive_site

Configuration

  options.conf ?= {}
  options.conf['spark.master'] ?= "local[*]"
  # For [Spark on YARN deployments][[secu]], configuring spark.authenticate to true
  # will automatically handle generating and distributing the shared secret.
  # Each application will use a unique shared secret. 
  # http://spark.apache.org/docs/1.6.0/configuration.html#security
  options.conf['spark.authenticate'] ?= "true"
  if options.conf['spark.authenticate']
    options.conf['spark.authenticate.secret'] ?= 'my-secret-key' 
    throw Error 'spark.authenticate.secret is needed when spark.authenticate is true' unless options.conf['spark.authenticate.secret']
  # This causes Spark applications running on this client to write their history to the directory that the history server reads.
  options.conf['spark.eventLog.enabled'] ?= "true"
  options.conf['spark.yarn.services'] ?= "org.apache.spark.deploy.yarn.history.YarnHistoryService"
  # set to only supported one http://spark.apache.org/docs/1.6.0/monitoring.html#viewing-after-the-fact
  # https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.4.0/bk_upgrading_hdp_manually/content/upgrade-spark-23.html
  options.conf['spark.history.provider'] ?= 'org.apache.spark.deploy.history.FsHistoryProvider'
  # Base directory in which Spark events are logged, if spark.eventLog.enabled is true.
  # Within this base directory, Spark creates a sub-directory for each application, and logs the events specific to the application in this directory.
  # Users may want to set this to a unified location like an HDFS directory so history files can be read by the history server.
  options.conf['spark.eventLog.dir'] ?= "#{service.deps.hdfs_nn[0].options.core_site['fs.defaultFS']}/user/#{options.user.name}/applicationHistory"
  options.conf['spark.history.fs.logDirectory'] ?= "#{options.conf['spark.eventLog.dir']}"
  options.conf['spark.eventLog.overwrite'] ?= 'true'
  options.conf['spark.yarn.jar'] ?= "hdfs:///apps/#{options.user.name}/spark-assembly.jar"
  # options.conf['spark.yarn.applicationMaster.waitTries'] = null Deprecated in favor of "spark.yarn.am.waitTime"
  options.conf['spark.yarn.am.waitTime'] ?= '10'
  options.conf['spark.yarn.containerLauncherMaxThreads'] ?= '25'
  options.conf['spark.yarn.driver.memoryOverhead'] ?= '384'
  options.conf['spark.yarn.executor.memoryOverhead'] ?= '384'
  options.conf['spark.yarn.max.executor.failures'] ?= '3'
  options.conf['spark.yarn.preserve.staging.files'] ?= 'false'
  options.conf['spark.yarn.queue'] ?= 'default'
  options.conf['spark.yarn.scheduler.heartbeat.interval-ms'] ?= '5000'
  options.conf['spark.yarn.services'] ?= 'org.apache.spark.deploy.yarn.history.YarnHistoryService'
  options.conf['spark.yarn.submit.file.replication'] ?= '3'
  options.dist_files ?= []

SSL

For now on spark 1.3, SSL is falling even after distributing keystore and truststore on worker nodes as suggested in official documentation. Maybe we shall share and deploy public keys instead of just the cacert Disabling for now

Note: 20160928, wdavidw, there was some issue where truststore and keystore usage was messed up, the code in install is fixed but ssl is still disable because I have no time to test it.

  options.ssl = merge {}, service.deps.ssl.options, options.ssl
  options.conf['spark.ssl.enabled'] ?= "false" # `!!service.deps.ssl`
  options.conf['spark.ssl.enabledAlgorithms'] ?= "MD5"
  options.conf['spark.ssl.keyPassword'] ?= service.deps.ssl.options.keystore.password
  options.conf['spark.ssl.keyStore'] ?= "#{options.conf_dir}/keystore"
  options.conf['spark.ssl.keyStorePassword'] ?= service.deps.ssl.options.keystore.password
  options.conf['spark.ssl.protocol'] ?= "SSLv3"
  options.conf['spark.ssl.trustStore'] ?= "#{options.conf_dir}/truststore"
  options.conf['spark.ssl.trustStorePassword'] ?= service.deps.ssl.options.truststore.password

Client Metastore Configuration

Spark needs hive-site.xml in order to create hive spark context. Spark is client towards hive/hcatalog and needs its client configuration. the hive-site.xml is set inside /etc/spark/conf/ dir.

  options.hive_site ?= {}
  for property in [
    'hive.metastore.uris'
    'hive.security.authorization.enabled'
    'hive.security.authorization.manager'
    'hive.security.metastore.authorization.manager'
    'hive.security.authenticator.manager'
    # Transaction, read/write locks
    'hive.support.concurrency'
    'hive.enforce.bucketing'
    'hive.exec.dynamic.partition.mode'
    'hive.txn.manager'
    'hive.txn.timeout'
    'hive.txn.max.open.batch'
    'hive.cluster.delegation.token.store.zookeeper.connectString'
    # 'hive.cluster.delegation.token.store.class'
    # 'hive.metastore.local'
    # 'fs.hdfs.impl.disable.cache'
    'hive.metastore.sasl.enabled'
    # 'hive.metastore.cache.pinobjtypes'
    # 'hive.metastore.kerberos.keytab.file'
    'hive.metastore.kerberos.principal'
    # 'hive.metastore.pre.event.listeners'
    'hive.optimize.mapjoin.mapreduce'
    'hive.heapsize'
    'hive.auto.convert.sortmerge.join.noconditionaltask'
    'hive.exec.max.created.files'
    'hive.metastore.warehouse.dir'
    # Transaction, read/write locks
  ] then options.hive_site[property] ?= service.deps.hive_hcatalog[0].options.hive_site[property]
  options.hive_site['hive.execution.engine'] ?= 'mr'

Metrics

Configure the "metrics.properties" to connect Spark to a metrics collector like Ganglia or Graphite. The metrics.properties file needs to be sent to every executor, and spark.metrics.conf=metrics.properties will tell all executors to load that file when initializing their respective MetricsSystems

  # options.conf['spark.metrics.conf'] ?= 'metrics.properties'
  options.conf_metrics ?= false
  if options.conf_metrics
    options.conf['spark.metrics.conf'] ?= "metrics.properties" # Error, spark complain it cant find if value is 'metrics.properties'
    if options.conf['spark.metrics.conf']?
      options.dist_files.push "file://#{options.conf_dir}/metrics.properties" unless options.dist_files.indexOf "file://#{options.conf_dir}/metrics.properties" is -1
    options.metrics =
      'master.source.jvm.class':'org.apache.spark.metrics.source.JvmSource'
      'worker.source.jvm.class':'org.apache.spark.metrics.source.JvmSource'
      'driver.source.jvm.class':'org.apache.spark.metrics.source.JvmSource'
      'executor.source.jvm.class':'org.apache.spark.metrics.source.JvmSource'

    if service.deps.graphite
      options.metrics['*.sink.graphite.class'] = 'org.apache.spark.metrics.sink.GraphiteSink'
      options.metrics['*.sink.graphite.host'] = service.deps.graphite[0].instances.map( (instance) -> instance.node.fqdn ).join ','
      options.metrics['*.sink.graphite.port'] = graphite_ctxs[0].config.ryba.graphite[0].options.carbon_aggregator_port
      options.metrics['*.sink.graphite.prefix'] = "#{graphite_ctxs[0].config.ryba.graphite[0].options.metrics_prefix}.spark"

    # TODO : metrics.MetricsSystem: Sink class org.apache.spark.metrics.sink.GangliaSink cannot be instantialized
    if service.deps.ganglia_collector
      options.metrics['*.sink.ganglia.class'] = 'org.apache.spark.metrics.sink.GangliaSink'
      options.metrics['*.sink.ganglia.host'] = service.deps.ganglia_collector[0].instances.map( (instance) -> instance.node.fqdn ).join ','
      options.metrics['*.sink.ganglia.port'] = service.deps.ganglia_collector[0].options.spark_port
  options.conf['spark.yarn.dist.files'] ?= options.dist_files.join(',') if options.dist_files.length > 0

Dynamic Resource Allocation

Spark mecanism to set up resources based on cluster availability

  #http://spark.apache.org/docs/1.6.0/job-scheduling.html#dynamic-resource-allocation
  options.conf['spark.dynamicAllocation.enabled'] ?= 'false' #disable by default
  options.conf['spark.shuffle.service.enabled'] ?= options.conf['spark.dynamicAllocation.enabled']
  if options.conf['spark.dynamicAllocation.enabled'] is 'true'
    options.conf['spark.shuffle.service.port'] ?= '56789'
    for srv in service.deps.yarn_nm
      aux_services  = srv.options.yarn_site['yarn.nodemanager.aux-services'].split ','
      aux_services.push 'spark_shuffle' unless 'spark_shuffle' in aux_services
      srv.options.yarn_site['yarn.nodemanager.aux-services'] = aux_services.join ','
      srv.options.yarn_site['yarn.nodemanager.aux-services.spark_shuffle.class'] ?= 'org.apache.spark.network.yarn.YarnShuffleService'
      srv.options.yarn_site['spark.shuffle.service.enabled'] ?= 'true'
      srv.options.iptables_rules.push { chain: 'INPUT', jump: 'ACCEPT', dport: options.conf['spark.shuffle.service.port'], protocol: 'tcp', state: 'NEW', comment: "Spark YARN Shuffle Service" }

Wait

  options.wait_yarn_rm = service.deps.yarn_rm[0].options.wait
  options.wait_ranger_admin = service.deps.ranger_admin.options.wait if service.deps.ranger_admin

Dependencies

{merge} = require '@nikitajs/core/lib/misc'