Menu

Capacity Planning for Hadoop Cluster

Capacity planning is the science and art of estimating the space, computer hardware, software and connection infrastructure resources that will be needed over some future period of time.

In the Hadoop context of this script, Capacity Planning is about discovering the Memory, Disk and CPU resources for every node of the cluster and estimating default setting for Yarn and its client application such as MapReduce or Tez.

Source Code

exports = module.exports = (params, config, callback) ->
  # construct the clusters' graph for capacity planning
  clusters = []
  for cluster, conf of config.clusters
    nodes = {}
    for node, c of config.nodes
      for srv in c.services
        if srv.cluster is cluster
          nodes[node] = c
          break;
        else
          continue;
    clusters.push
      name: cluster
      nodes: nodes
    do_cluster_capacity = (index, callback) ->
      cluster = clusters[index]
      config.cluster = cluster.name
      config.nodes = cluster.nodes
      exports.nodes params, config, (err, nodes) ->
        cluster.nodes = nodes
        return callback err if err
        # note: lucasbak
        # note sure to always do remote as the inital cluster may have no file configured
        nikita.each [
          'configure', 'disks', 'cores', 'memory'
          'yarn_nm', 'yarn_rm'
          'hdfs_client', 'hdfs_nn', 'hdfs_dn'
          'hbase_m', 'hbase_rs',
          'mapred_client',
          'nifi','tez_client'
          'hive_client', 'kafka_broker'
          # 'remote'
          ]
        , (opts, cb) ->
          handler = opts.key
          console.log "#{handler}: ok"
          handler = exports[handler]
          if handler.length is 2
            handler nodes, cb
          else
            handler nodes
            cb()
        .next (err) ->
          # ctx.emit 'end' for ctx in contexts
          return console.log 'ERROR', err.message, err.stack if err
          if index is clusters.length-1
            callback err
          else
            index++
            do_cluster_capacity index, callback
    do_cluster_capacity 0, (err) ->
      # ctx.emit 'end' for ctx in contexts
      return console.log 'ERROR', err.message, err.stack if err
      exports.write params, config, clusters, (err) ->
        return console.log 'ERROR', err if err
        console.log 'SUCCESS'

SSH

exports.nodes = (params, config, callback) ->
  # params.end ?= true
  nodes = []
  engine = require('nikita/lib/core/kv/engines/memory')()
  config.nikita.no_ssh = true
  # context.call -> console.log opts.value.ip
  nikita
  .each config.nodes, (opts, cb) ->
    # node was context before migration
    fqdn = opts.key
    node = nikita merge {}, config.nikita
    node.log.cli host: fqdn, pad: host: 20, header: 60
    node.ssh.open header: 'SSH Open', host: opts.value.ip
    node.call 'masson/core/info'
    .next (err) ->
      n = {}
      n.ssh = node.options.ssh
      n.fqdn = fqdn
      n.params = params
      n[prop] = node[prop] for prop in ['cpuinfo', 'meminfo','diskinfo','kernel_name',
      'nodename','kernel_release', 'kernel_version', 'processor', 'operating_system']
      n.services = config.nodes[fqdn].services.map( (srv) -> srv.service )
      n.config ?= {}
      n.config.capacity ?= {}
      # migration: lucasbak
      # prepare options read from configuration ie
      # n.config.capacity.memory_datanode should be read from service.instance.hdfs.dn.heapsize
      # n.config.capacity.memory_nodemanager should be read from service.instance.yarn.nm.heapsize
      n.config.capacity.total_memory ?=  if params.total_memory_gb? then  params.total_memory_gb * 1024 * 1024 * 1024 else null
      n.config.capacity.memory_system ?= if params.reserved_memory_gb? then  params.reserved_memory_gb * 1024 * 1024 * 1024 else null
      n.config.capacity.memory_hbase ?= if params.regionserver_memory_gb? then  params.regionserver_memory_gb * 1024 * 1024 * 1024 else null
      n.config.capacity.memory_datanode ?= if params.datanode_memory_gb? then  params.datanode_memory_gb * 1024 * 1024 * 1024 else null
      n.config.capacity.memory_nodemanager ?= if params.nodemanager_memory_gb? then  params.nodemanager_memory_gb * 1024 * 1024 * 1024 else null
      n.config.capacity.memory_yarn ?= if params.yarn_memory_gb? then  params.yarn_memory_gb * 1024 * 1024 * 1024 else null
      n.config.capacity ?= {}
      n.config.capacity.remote ?= {}
      n.has_service = ->
        has = false
        has = has || (n.services.indexOf(arg) isnt -1) for arg in arguments
        return has
      nodes.push n
      cb err
  .next (err) ->
    callback null, nodes

Configuration

Normalize configuration.

exports.configure = (nodes) ->
  for node in nodes
    mapred_client_services = [
      'ryba/hadoop/mapred_client'
      'ryba/hadoop/yarn_nm'
      'ryba/hbase/regionserver'
      'ryba/hbase/client'
      'ryba/hive/client'
      'ryba/hive/beeline'
      'ryba/hive/server2'
      'ryba/tez'
    ]
    should_configure_mapred_client = false
    for service in mapred_client_services
      should_configure_mapred_client = should_configure_mapred_client or node.has_service(service)
    node.should_configure_mapred_client =  should_configure_mapred_client
    for conf in ['nn_hdfs_site', 'hdfs_site' , 'dn_hdfs_site', 'rm_yarn_site', 'yarn_site', 'mapred_site', 'tez_site', 'hive_site', 'capacity_scheduler', 'hbase_site', 'kafka_broker','nifi_properties']
      node.config.capacity[conf] ?= {}
      node.config.capacity.remote[conf] ?= {}
    node.config.capacity.capacity_scheduler['yarn.scheduler.capacity.resource-calculator'] ?= 'org.apache.hadoop.yarn.util.resource.DominantResourceCalculator'

Capacity Planning for Disks

Discover the most relevant partitions on each node.

exports.disks = (nodes) ->
  for node in nodes
    # continue unless ctx.has_service 'ryba/hadoop/yarn_nm'
    continue if node.config.capacity.disks
    # Provided by user
    if node.params.partitions
      node.config.capacity.disks = node.params.partitions
      continue
    # Search common partition names
    found_common_names = []
    for disk in node.diskinfo
      found_common_names.push disk if /^\/data\/\d+/.test disk.mountpoint # Cloudera style, eg /data/1
      found_common_names.push disk if /^\/grid\/\d+/.test disk.mountpoint # HDP style, eg /data/1
    found_large_partitions_with_spaces = []
    found_large_partitions_with_spaces_is_root = null
    for disk in node.diskinfo
      available = disk.available / 1024 / 1024 / 1024 # Go
      found_large_partitions_with_spaces.push disk if available > prink.filesize.parse '200 MB'
      found_large_partitions_with_spaces_and_root = disk if available > 200 and disk.mountpoint is '/'
    found_root = null
    for disk in node.diskinfo
      found_root = disk if disk.mountpoint is '/'
    # Choose
    if found_common_names.length
      node.config.capacity.disks = found_common_names
    else if found_large_partitions_with_spaces > 4 and found_large_partitions_with_spaces_and_root
      # Exclude root partition
      node.config.capacity.disks = for disk in found_large_partitions_with_spaces
        continue if disk.mountpoint is '/'
        disk
    else if found_large_partitions_with_spaces > 2
      node.config.capacity.disks = found_large_partitions_with_spaces
    else if found_root
      node.config.capacity.disks = [found_root]
    else next Error 'No Appropriate Disk Found'
    node.config.capacity.disks = node.config.capacity.disks.map (disk) -> disk.mountpoint

Capacity Planning for CPU

exports.cores = (nodes) ->
  for node in nodes
    node.config.capacity.cores ?= node.cpuinfo.length
    node.config.capacity.cores_yarn ?= 100

Capacity Planning for Memory

Estimates the memory available to the system, YARN and HBase. The ratio vary depending on the total amout of memory. December 2017: The memory thw following order:

  • total memory avaible on the host

  • memory reserved for the os

  • memory allocated for hbase regionserver

  • memory allocated for yarn_nodemanager process

  • memory allocated for hdfs_datanode process

  • memory available for yarn running containers The calculus are made in this order to guaranty the reserverd memory for the os and for hbase. All the different amount can be overrided by parameters.

    exports.memory_system_gb = [[1,.2], [2,.2], [4,1], [7,2], [8,2], [16,2], [24,4], [48,6], [64,8], [72,8], [96,12], [128,24], [256,32], [512,64]] exports.memory_hbase_gb = [[1,.2], [2,.4], [4,1], [8,1], [16,2], [24,4], [48,8], [64,8], [72,8], [96,16], [128,24], [256,32], [512,64]] exports.memory = (nodes) -> for node in nodes # Total Memory available node.config.capacity.total_memory ?= node.meminfo.MemTotal continue unless node.has_service 'ryba/hadoop/yarn_nm' {total_memory} = node.config.capacity total_memory_gb = total_memory / 1024 / 1024 / 1024 # Reserved Memory memory_system_gb = 0 if total_memory_gb < exports.memory_system_gb[0][0] # Memory less than first item (1GB) memory_system_gb += exports.memory_system_gb[0][1] else if total_memory_gb >= exports.memory_system_gb[exports.memory_system_gb.length-1][0] # Memory greater than last item (512GB) memory_system_gb += exports.memory_system_gb[exports.memory_system_gb.length-1][1] else for mem in exports.memory_system_gb [total, reserved] = mem break if total_memory_gb < total memory_system_gb = reserved node.config.capacity.memory_system ?= exports.rounded_memory memory_system_gb * 1024 * 1024 * 1024 memory_hbase_gb = 0 # HBase regionserver memory if node.has_service 'ryba/hbase/regionserver' if total_memory_gb < exports.memory_hbase_gb[0][0] memory_hbase_gb += exports.memory_hbase_gb[0][1] # Memory less than minimal expectation else if total_memory_gb >= exports.memory_hbase_gb[exports.memory_hbase_gb.length-1][0] memory_hbase_gb += exports.memory_hbase_gb[exports.memory_hbase_gb.length-1][1] else for mem in exports.memory_hbase_gb [total, reserved] = mem break if total_memory_gb < total memory_hbase_gb = reserved node.config.capacity.memory_hbase ?= memory_hbase = exports.rounded_memory memory_hbase_gb * 1024 * 1024 * 1024 # Yarn Nodemanager memory if total_memory_gb < exports.memory_system_gb[4][0] node.config.capacity.memory_nodemanager ?= 256 * 1024 * 1024 #default to 256m if memory less than 8GB else if total_memory_gb < exports.memory_system_gb[8][0] and total_memory_gb >= exports.memory_system_gb[4][0] node.config.capacity.memory_nodemanager ?= 512 * 1024 * 1024 #default to 512m if memory less than 16GB else if total_memory_gb >= exports.memory_system_gb[exports.memory_system_gb.length-1][0] node.config.capacity.memory_nodemanager ?= 8 * 1024 * 1024 * 1024 else node.config.capacity.memory_nodemanager ?= 1 * 1024 * 1024 * 1024 # HDFS datanode memory if total_memory_gb < exports.memory_system_gb[4][0] node.config.capacity.memory_datanode ?= 256 * 1024 * 1024 #default to 256m if memory less than 8GB else if total_memory_gb < exports.memory_system_gb[8][0] and total_memory_gb >= exports.memory_system_gb[4][0] node.config.capacity.memory_datanode ?= 512 * 1024 * 1024 #default to 512m if memory less than 16GB else if total_memory_gb >= exports.memory_system_gb[exports.memory_system_gb.length-1][0] node.config.capacity.memory_datanode ?= 8 * 1024 * 1024 * 1024 else node.config.capacity.memory_datanode ?= 1 * 1024 * 1024 * 1024 # Yarn Containers dedicated memory node.config.capacity.memory_yarn ?= memory_yarn = exports.rounded_memory ( node.config.capacity.total_memory - (node.config.capacity.memory_nodemanager + node.config.capacity.memory_datanode + node.config.capacity.memory_hbase + node.config.capacity.memory_system) )

Yarn NodeManager

# helper function which takes a memory in Bytes and rounded down it to nearest 1024 number
exports.round_down_two_power_ten = (memory, power_in_Bytes) ->
  power_in_Bytes ?= 1024 * (1024 * 1024)
  return if memory >= 0 then Math.floor(memory / power_in_Bytes) * power_in_Bytes else (Math.floor(memory - power_in_Bytes + 1) / power_in_Bytes) * power_in_Bytes
exports.yarn_nm = (nodes) ->
  minimum_allocation_mb = null
  maximum_allocation_mb = 0
  maximum_allocation_vcores = 0
  return if nodes.filter( (node) -> node.has_service 'ryba/hadoop/yarn_nm').length is 0
  for node in nodes
    continue unless node.has_service 'ryba/hadoop/yarn_nm'
    {cores, disks, cores_yarn, memory_yarn, rm_yarn_site, yarn_site} = node.config.capacity
    minimum_container_size = if memory_yarn <= 2*1024*1024*1024 then 128*1024*1024 # 128 MB
    else if memory_yarn <= 4*1024*1024*1024 then 256*1024*1024 # 256 MB
    else if memory_yarn <= 8*1024*1024*1024 then 512*1024*1024 # 512 MB
    else if memory_yarn <= 24*1024*1024*1024 then 1024*1024*1024 # 1 GB
    else 2*1024*1024*1024 # 2 GB
    # min (2*CORES, 1.8*DISKS, (Total available RAM / MIN_CONTAINER_SIZE) )
    unless max_number_of_containers = node.config.capacity.max_number_of_containers
      # Possible incoherence, here we multiply number of cores by 2 while
      # NodeManager vcores is set to number of cores only
      max_number_of_containers = Math.floor Math.min cores * 2, Math.ceil(disks.length * 1.8), (memory_yarn / minimum_container_size)
    # Amount of RAM per container
    # max(MIN_CONTAINER_SIZE, (Total Available RAM) / containers))
    unless memory_per_container = node.config.capacity.memory_per_container
      memory_per_container = Math.max minimum_container_size, exports.round_down_two_power_ten( ( memory_yarn / max_number_of_containers), minimum_container_size)
    # # Work with small VM
    # if memory_per_container < 512 * 1024 * 1024
    #   unless max_number_of_containers = ctx.config.capacity.max_number_of_containers
    #     max_number_of_containers = Math.floor Math.min cores, disks.length, (memory_yarn / minimum_container_size)
    #   unless memory_per_container = ctx.config.capacity.memory_per_container
    #     memory_per_container = Math.floor Math.max minimum_container_size, memory_yarn / max_number_of_containers
    # Export number of containers
    node.config.capacity.max_number_of_containers = max_number_of_containers
    # Export RAM per container
    memory_per_container = exports.rounded_memory memory_per_container
    node.config.capacity.memory_per_container = memory_per_container

    minimum_allocation_mb ?= Math.round memory_per_container / 1024 / 1024
    minimum_allocation_mb = Math.round Math.min minimum_allocation_mb, memory_per_container / 1024 / 1024

Pourcent of CPU dedicated to yarn

    yarn_site['yarn.nodemanager.resource.percentage-physical-cpu-limit'] ?="#{cores_yarn}"

Amount of physical memory, in MB, dedicated by the node and that can be allocated for containers.

    yarn_site['yarn.nodemanager.resource.memory-mb'] ?= Math.max Math.round( memory_per_container * max_number_of_containers / 1024 / 1024),  Math.round( node.config.capacity.memory_yarn / 1024 / 1024)

    maximum_allocation_mb = Math.max maximum_allocation_mb, yarn_site['yarn.nodemanager.resource.memory-mb']

The property "yarn.nodemanager.vmem-pmem-rati" defines the virtual memory (physical + paged memory) upper limit for each Map and Reduce task is determined by the virtual memory ratio each YARN Container is allowed. The default value "2.1" means virtual memory will be double the size of physical memory.

    yarn_site['yarn.nodemanager.vmem-pmem-ratio'] ?= '2.1'

Number of Virtual Cores dedicated by the node and that can be allocated for containers.

    yarn_site['yarn.nodemanager.resource.cpu-vcores'] ?= cores

The property "yarn.nodemanager.local-dirs" defines multiple disks for localization. It enforces fail-over, preventing one disk to affect the containers, and load-balancing by spliting the access to the disks.

    {yarn_nm_local_dir, yarn_nm_log_dir} = node.params
    if /^\//.test yarn_nm_local_dir
      yarn_site['yarn.nodemanager.local-dirs'] ?= yarn_nm_local_dir.split ','
    else
      yarn_site['yarn.nodemanager.local-dirs'] ?= disks.map (disk) ->
        path.resolve disk, yarn_nm_local_dir or './yarn/local'
    if /^\//.test yarn_nm_log_dir
      yarn_site['dfs.datanode.data.dir'] ?= yarn_nm_log_dir.split ','
    else
      yarn_site['yarn.nodemanager.log-dirs'] ?= disks.map (disk) ->
        path.resolve disk, yarn_nm_log_dir or './yarn/log'

Raise the number of vcores later allocated for the ResourceManager.

    maximum_allocation_vcores = Math.max maximum_allocation_vcores, yarn_site['yarn.nodemanager.resource.cpu-vcores']

  for node in nodes
    node.config.capacity.minimum_allocation_mb = minimum_allocation_mb
    node.config.capacity.maximum_allocation_mb = maximum_allocation_mb
    node.config.capacity.maximum_allocation_vcores = maximum_allocation_vcores

Yarn ResourceManager

exports.yarn_rm = (nodes) ->
  for node in nodes
    continue unless node.has_service 'ryba/hadoop/yarn_rm'
    {minimum_allocation_mb, maximum_allocation_mb, maximum_allocation_vcores, rm_yarn_site} = node.config.capacity
    rm_yarn_site['yarn.scheduler.minimum-allocation-mb'] ?= minimum_allocation_mb
    rm_yarn_site['yarn.scheduler.maximum-allocation-mb'] ?= maximum_allocation_mb
    rm_yarn_site['yarn.scheduler.minimum-allocation-vcores'] ?= 1

The property "yarn.scheduler.maximum-allocation-vcores" should not be larger than the value for the yarn.nodemanager.resource.cpu-vcores parameter on any NodeManager. Document states that resource requests are capped at the maximum allocation limit and a container is eventually granted. Tests in version 2.4 instead shows that the containers are never granted, and no progress is made by the application (zombie state).

    rm_yarn_site['yarn.scheduler.maximum-allocation-vcores'] ?= maximum_allocation_vcores

HDFS Client

exports.hdfs_client = (nodes) ->
  replication = 0
  for node in nodes
    replication++ if node.has_service 'ryba/hadoop/hdfs_dn'
  for node in nodes
    continue unless node.has_service 'ryba/hadoop/hdfs_nn', 'ryba/hadoop/hdfs_dn', 'ryba/hadoop/hdfs_client'
    {hdfs_site} = node.config.capacity
    hdfs_site['dfs.replication'] ?= Math.min 3, replication # Not sure if this really is a client property

HDFS DataNode

exports.hdfs_dn = (nodes) ->
  for node in nodes
    continue unless node.has_service 'ryba/hadoop/hdfs_dn'
    {disks, hdfs_site, dn_hdfs_site} = node.config.capacity
    {hdfs_dn_data_dir} = node.params
    if /^\//.test hdfs_dn_data_dir
      dn_hdfs_site['dfs.datanode.data.dir'] ?= hdfs_dn_data_dir.split ','
    else
      dn_hdfs_site['dfs.datanode.data.dir'] ?= disks.map (disk) ->
        path.resolve disk, hdfs_dn_data_dir or './hdfs/data'

HDFS NameNode

In HDFS High Availabity (HA) mode, we only set one name directory by default located inside "/var/hdfs/name" because the Journal Node are responsible for distributing logs into the passive NameNode (please get back to us if this isnt safe enough). In non-HA mode, we store as many copies as partitions inside the partition "./hdfs/name" directory.

This behavior may be altered with the "hdfs_nn_name_dir" parameter.

exports.hdfs_nn = (nodes) ->
  nns = 0
  for node in nodes
    nns++ if node.has_service 'ryba/hadoop/hdfs_nn'
  for node in nodes
    continue unless node.has_service 'ryba/hadoop/hdfs_nn'
    {disks, nn_hdfs_site} = node.config.capacity
    {hdfs_nn_name_dir} = node.params
    if /^\//.test hdfs_nn_name_dir
      nn_hdfs_site['dfs.namenode.name.dir'] ?= hdfs_nn_name_dir.split ','
    else
      if nns > 1
        nn_hdfs_site['dfs.namenode.name.dir'] ?= ['file://' + path.resolve '/var', hdfs_nn_name_dir or './hdfs/name']
      else
        nn_hdfs_site['dfs.namenode.name.dir'] ?= disks.map (disk) ->
          disk = '/var' if disk is '/'
          'file://' + path.resolve disk, hdfs_nn_name_dir or './hdfs/name'

HBase Master

exports.hbase_m = (nodes) ->
  for node in nodes
    continue unless node.has_service 'ryba/hbase/master'
    # Nothing to do for now, eg 'ryba.hbase.master_opts="..."'

HBase RegionServer

exports.hbase_rs = (nodes) ->
  for node in nodes
    continue unless node.has_service 'ryba/hbase/regionserver'
    {memory_hbase} = node.config.capacity
    memory_hbase_mb = Math.floor memory_hbase / 1024 / 1024
    node.config.capacity.regionserver_opts ?= "#{memory_hbase_mb}m" #i.e. 256m

MapReduce Client

exports.mapred_client = (nodes) ->
  # migration: lucasbak 171117
  # the properties should be read from a node_manager configuration
  # and not calculated based on the local node
  # TODO: take in count the different nodemanager is heterogenous config
  nm_capacity = null
  for node in nodes
    if node.has_service 'ryba/hadoop/yarn_nm'
      nm_capacity = node.config.capacity
      break;
    else
      continue;
  for node in nodes
    continue unless node.should_configure_mapred_client
    {minimum_allocation_mb, maximum_allocation_mb} = nm_capacity
    {mapred_site} = node.config.capacity ?= {}

The property "yarn.app.mapreduce.am.resource.mb" defines the amount of memory that the Application Master for MR framework would need. This needs to be set with care as a larger allocation for the AM would mean lesser concurrency, as you can spin up only so many AMs before exhausting the containers on a busy system. This value also needs to be less than what is defined in "yarn.scheduler.maximum-allocation-mb", if not, it will create an error condition. Can be set at site level with "mapred-site.xml", or can be set at the job level. This change does not require a service restart.

    map_memory_mb = mapred_site['mapreduce.map.memory.mb'] or minimum_allocation_mb
    map_memory_mb = Math.min map_memory_mb, maximum_allocation_mb
    mapred_site['mapreduce.map.memory.mb'] = "#{map_memory_mb}"

    reduce_memory_mb = mapred_site['mapreduce.reduce.memory.mb'] or (if map_memory_mb < 2048 then 2 * minimum_allocation_mb else 2 * map_memory_mb)
    reduce_memory_mb = Math.min reduce_memory_mb, maximum_allocation_mb
    mapred_site['mapreduce.reduce.memory.mb'] = "#{reduce_memory_mb}"

    mapreduce_am_memory_mb = mapred_site['yarn.app.mapreduce.am.resource.mb'] or Math.max map_memory_mb, reduce_memory_mb
    mapreduce_am_memory_mb = Math.min mapreduce_am_memory_mb, maximum_allocation_mb
    mapred_site['yarn.app.mapreduce.am.resource.mb'] = mapreduce_am_memory_mb

    mapreduce_am_opts = /-Xmx(.*?)m/.exec(mapred_site['yarn.app.mapreduce.am.command-opts'])?[1] or Math.floor .8 * mapreduce_am_memory_mb
    mapreduce_am_opts = Math.min mapreduce_am_opts, maximum_allocation_mb
    mapred_site['yarn.app.mapreduce.am.command-opts'] = "-Xmx#{mapreduce_am_opts}m"

The value of "mapreduce.map.java.opts" and "mapreduce.reduce.java.opts" are used to configure the maximum and minimum JVM heap size with respectively the java options "-Xmx" and "-Xms". The values must be less than their "mapreduce.map.memory.mb" and "mapreduce.reduce.memory.mb" counterpart.

    mapred_site['mapreduce.map.java.opts'] ?= "-Xmx#{Math.floor .8 * map_memory_mb}m" # 0.8 * RAM-per-container
    mapred_site['mapreduce.reduce.java.opts'] ?= "-Xmx#{Math.floor .8 * reduce_memory_mb}m" # 0.8 * 2 * RAM-per-container

    mapred_site['mapreduce.task.io.sort.mb'] = "#{Math.floor .4 * map_memory_mb}"

    # The number of virtual CPU cores allocated for each map task of a job
    mapred_site['mapreduce.map.cpu.vcores'] ?= 1
    #  The number of virtual CPU cores for each reduce task of a job
    mapred_site['mapreduce.reduce.cpu.vcores'] ?= 1

Tez

exports.tez_client = (nodes) ->
  # migration: lucasbak 171117
  # the properties should be read from a mapred_client configuration
  # all te using host should have the tez_site properties it its enabled on the cluster
  # for example clients hostm server2 host, workers nodes
  # mapred_client must be identical on every host
  is_tez_enabled = nodes.filter( (node) -> node.has_service 'ryba/tez' ).length > 0
  for node in nodes
    continue unless node.has_service 'ryba/tez' ,'ryba/hive/server2', 'ryba/hadoop/mapred_client', 'ryba/hadoop/yarn_nm'
    {mapred_site, tez_site} = node.config.capacity
    # Memory allocated for the Application Master
    tez_site['tez.am.resource.memory.mb'] ?= mapred_site['yarn.app.mapreduce.am.resource.mb']
    # Memory allocated for the task
    tez_site['tez.task.resource.memory.mb'] ?= mapred_site['mapreduce.map.memory.mb']
    tez_site['tez.runtime.io.sort.mb'] ?= mapred_site['mapreduce.task.io.sort.mb']

Hive Client

exports.hive_client = (nodes) ->
  nm_capacity = null
  for node in nodes
    if node.has_service 'ryba/hadoop/yarn_nm'
      nm_capacity = node.config.capacity
      break;
    else
      continue;
  for node in nodes
    continue unless node.has_service 'ryba/hive/client', 'ryba/hive/server2', 'ryba/tez', 'ryba/hive/beeline'
    {minimum_allocation_mb, maximum_allocation_mb} = nm_capacity
    {hive_site} = node.config.capacity ?= {}

The memory (in MB) to be used for Tez tasks. If this is not specified (-1), the memory settings from the MapReduce configurations (mapreduce.map.memory.mb) will be used by default for map tasks.

    tez_memory_mb = hive_site['hive.tez.container.size'] or minimum_allocation_mb
    tez_memory_mb = Math.min tez_memory_mb, maximum_allocation_mb
    hive_site['hive.tez.container.size'] = "#{tez_memory_mb}"

Java command line options for Tez. If this is not specified, the MapReduce java opts settings (mapreduce.map.java.opts) will be used by default for map tasks.

    hive_site['hive.tez.java.opts'] ?= "-Xmx#{Math.floor .8 * tez_memory_mb}m" # 0.8 * RAM-per-container

Kafka Broker

exports.kafka_broker = (nodes) ->
  for node in nodes
    continue unless node.has_service 'ryba/kafka/broker'
    {disks, kafka_broker} = node.config.capacity
    {kafka_data_dir} = node.params
    if /^\//.test kafka_data_dir
      kafka_broker['log.dirs'] ?= kafka_data_dir.split(',').map((target) -> path.resolve target, 'kafka' )
    else
      kafka_broker['log.dirs'] ?= disks.map (disk) ->
        path.resolve disk, kafka_data_dir or './kafka'

Nifi

exports.nifi = (nodes) ->
  for node in nodes
    continue unless node.has_service 'ryba/nifi'
    {disks, nifi_properties} = node.config.capacity
    {nifi_content_dir,nifi_provenance_dir} = node.params
    if (nifi_content_dir? and nifi_max_partition?) or (nifi_content_dir? and nifi_max_partition?)
      throw Error 'Can not use conjointly nifi content/provenance dir and nifi_max_partition options'
    nifi_max_partition = node.params.nifi_max_partition ?= disks.length
    #Content Repository directories
    if /^\//.test nifi_content_dir
      for k,dir of nifi_content_dir.split ','
        nifi_properties["nifi.content.repository.directory.cr#{k+1}"] = dir
    else
      target_dirs = disks.map (disk) ->
        path.resolve disk, nifi_content_dir or './nifi/content_repository'
      for k in [0..Math.min(target_dirs.length,nifi_max_partition-1)]
        dir = target_dirs[k]
        nifi_properties["nifi.content.repository.directory.cr#{k+1}"] = dir
    #Provenance Repository directories
    if /^\//.test nifi_provenance_dir
      for k,dir of nifi_provenance_dir.split ','
        nifi_properties["nifi.provenance.repository.directory.pr#{k+1}"] = dir
    else
      target_dirs = disks.map (disk) ->
        path.resolve disk, nifi_provenance_dir or './nifi/provenance_repository'
      for k in [0..Math.min(target_dirs.length,nifi_max_partition-1)]
        dir = target_dirs[k]
        nifi_properties["nifi.provenance.repository.directory.pr#{k+1}"] = dir

exports.remote = (nodes, next) ->
  each(nodes)
  .parallel(true)
  .call (ctx, next) ->
    do_hdfs = ->
      return do_yarn_capacity_scheduler() unless ctx.has_service 'ryba/hadoop/hdfs_nn', 'ryba/hadoop/hdfs_dn'
      properties.read ctx.ssh, '/etc/hadoop/conf/hdfs-site.xml', (err, hdfs_site) ->
        ctx.config.capacity.remote.hdfs_site = hdfs_site unless err
        do_yarn_capacity_scheduler()
    do_yarn_capacity_scheduler = ->
      return do_yarn() unless ctx.has_service 'ryba/hadoop/yarn_nm'
      properties.read ctx.ssh, '/etc/hadoop/conf/yarn-site.xml', (err, capacity_scheduler) ->
        ctx.config.capacity.remote.capacity_scheduler = capacity_scheduler unless err
        do_yarn()
    do_yarn = ->
      return do_mapred() unless ctx.has_service 'ryba/hadoop/yarn_rm', 'ryba/hadoop/yarn_nm'
      properties.read ctx.ssh, '/etc/hadoop/conf/yarn-site.xml', (err, yarn_site) ->
        ctx.config.capacity.remote.yarn_site = yarn_site unless err
        do_mapred()
    do_mapred = ->
      return do_tez() unless ctx.should_configure_mapred_client
      properties.read ctx.ssh, '/etc/hadoop/conf/mapred-site.xml', (err, mapred_site) ->
        ctx.config.capacity.remote.mapred_site = mapred_site unless err
        do_tez()
    do_tez = ->
      return do_hive() unless ctx.has_service 'ryba/tez'
      properties.read ctx.ssh, '/etc/tez/conf/tez-site.xml', (err, tez_site) ->
        ctx.config.capacity.remote.tez_site = tez_site unless err
        do_hive()
    do_hive = ->
      return do_kafka_broker() unless ctx.has_service 'ryba/hive/client', 'ryba/hive/beeline'
      properties.read ctx.ssh, '/etc/hive/conf/hive-site.xml', (err, hive_site) ->
        ctx.config.capacity.remote.hive_site = hive_site unless err
        do_kafka_broker()
    do_kafka_broker = ->
      return do_nifi() unless ctx.has_service 'ryba/kafka/broker'
      ssh2fs.readFile ctx.ssh, '/etc/kafka-broker/conf/broker.properties', 'ascii', (err, content) ->
        return do_end() if err
        properties = {}
        for line in string.lines content
          continue if /^#.*$/.test line
          continue unless /.+=.+/.test line
          [key, value] = line.split '='
          properties[key.trim()] = value.trim()
        properties
        ctx.config.capacity.remote.kafka_broker = properties
        do_nifi()
    do_nifi = ->
      return do_end() unless ctx.has_service 'ryba/nifi'
      ssh2fs.readFile ctx.ssh, '/etc/nifi/conf/nifi.properties', 'ascii', (err, content) ->
        return do_end() if err
        properties = {}
        for line in string.lines content
          continue if /^#.*$/.test line
          continue unless /.+=.+/.test line
          [key, value] = line.split '='
          properties[key.trim()] = value.trim()
        properties
        ctx.config.capacity.remote.nifi_properties = properties
    # do_hbase = ->
    #   return do_end() unless ctx.has_service 'ryba/hbase/regionserver'
    #   properties.read ctx.ssh, '/etc/hive/conf/hbase-site.xml', (err, hive_site) ->
    #     ctx.config.capacity.remote.hive_site = hive_site unless err
    #     do_end()
    do_end = ->
      next()
      # ctx.options.ssh.end()
      # ctx.options.ssh.on 'end', next
    do_hdfs()
  .next next

exports.write = (params, config, clusters, next) ->
  # return next() unless params.output
  formats = ['xml', 'json', 'js', 'coffee']
  if params.format is 'text'
    # ok, print to stdout
  else if params.format
    return next Error "Insupported Extension #{extname}" unless params.format in formats
    # unless params.output
    #   # ok, print to stdout
    # else if (basename = path.basename(params.output, ".#{params.format}")) isnt params.output
    #   params.output = "#{basename}.#{params.format}" if params.format in ['json', 'js', 'coffee']
  else if params.output
    extname = path.extname params.output
    format = extname.substr 1
    return next Error "Could not guess format from arguments" unless format in formats
    params.format = format
  else
    params.format = 'text'
  exports["write_#{params.format}"] params, config, clusters, (err, content) ->
    next err

exports.write_text = (params, config, clusters, next) ->
  do_open = ->
    return do_write process.stdout unless params.output
    return next() unless params.output
    fs.stat params.output, (err, stat) ->
      return next err if err and err.code isnt 'ENOENT'
      return next Error 'File Already Exists, use --overwrite' unless err or params.overwrite
      do_write fs.createWriteStream params.output, encoding: 'utf8'
  do_write = (ws) ->
    nodes = {}
    nodes = merge nodes, exports.capacity_to_ryba  params, config, cluster.nodes for cluster in clusters
    print = (config, properties) ->
      {capacity} = ctx.config
      for property in properties
        suggested_value = capacity[config][property]
        remote_value = capacity.remote[config][property]
        if Array.isArray suggested_value
          ws.write "    #{config}['#{property}'] = [\n"
          for v, i in suggested_value
            ws.write "      " if i % 3 is 0
            ws.write "#{v}"
            if i % 3 is 2 and i isnt suggested_value.length - 1
              ws.write "\n"
            else if i isnt suggested_value.length - 1
              ws.write ', '
          ws.write "\n    ]"
          remote_value = remote_value.split(',') if typeof remote_value is 'string'
          if suggested_value.join(',') isnt remote_value?.join(',')
            ws.write " (currently [\n"
            if remote_value then for v, i in remote_value
              ws.write "      " if i % 3 is 0
              ws.write "#{v}"
              if i % 3 is 2 and i isnt suggested_value.length# - 1
                ws.write "\n"
              else if i isnt suggested_value.length# - 1
                ws.write ', '
            else
              ws.write '      undefined'
            ws.write "\n    ])"
          ws.write "\n"
        else
          diff = if remote_value is "#{suggested_value}" then 'identical'
          else if remote_value? then "'#{remote_value}'"
          else 'not defined'
          diff = "(currently #{diff})"
          ws.write "    #{config}['#{property}'] = '#{suggested_value}' #{diff}\n"
    for ctx in nodes
      continue if params.hosts and not multimatch(params.hosts, ctx.config.host).length
      {capacity} = ctx.config
      # mods = [
      #   'ryba/hadoop/hdfs_nn', 'ryba/hadoop/hdfs_dn'
      #   'ryba/hadoop/yarn_rm', 'ryba/hadoop/yarn_nm'
      #   'ryba/hadoop/mapred_client', 'ryba/hadoop/hive_client'
      # ]
      # if ctx.has_service mods
      ws.write "#{ctx.config.host}\n"
      ws.write "  Number of core: #{capacity.cores}\n"
      ws.write "  Number of partitions: #{capacity.disks.length}\n"
      ws.write "  Memory Total: #{prink.filesize capacity.total_memory, 3}\n"
      ws.write "  Memory System: #{prink.filesize capacity.memory_system, 3}\n"
      print_hdfs_client = not params.modules or multimatch(params.modules, 'ryba/hadoop/hdfs_client').length
      if ctx.has_service('ryba/hadoop/hdfs_client') and print_hdfs_client
        ws.write "  HDFS Client\n"
        print 'hdfs_site', ['dfs.replication']
      print_hdfs_nn = not params.modules or multimatch(params.modules, 'ryba/hadoop/hdfs_nn').length
      if ctx.has_service('ryba/hadoop/hdfs_nn') and print_hdfs_nn
        ws.write "  HDFS NameNode\n"
        print 'nn_hdfs_site', ['dfs.namenode.name.dir']
      print_hdfs_dn = not params.modules or multimatch(params.modules, 'ryba/hadoop/hdfs_dn').length
      if ctx.has_service('ryba/hadoop/hdfs_dn') and print_hdfs_dn
        ws.write "  HDFS DataNode\n"
        print 'hdfs_site', ['dfs.datanode.data.dir']
      print_yarn_rm = not params.modules or multimatch(params.modules, 'ryba/hadoop/yarn_rm').length
      if ctx.has_service('ryba/hadoop/yarn_rm') and print_yarn_rm
        ws.write "  YARN ResourceManager\n"
        print 'capacity_scheduler', ['yarn.scheduler.capacity.resource-calculator']
        print 'rm_yarn_site', ['yarn.scheduler.minimum-allocation-mb', 'yarn.scheduler.maximum-allocation-mb', 'yarn.scheduler.minimum-allocation-vcores', 'yarn.scheduler.maximum-allocation-vcores']
      print_yarn_nm = not params.modules or multimatch(params.modules, 'ryba/hadoop/yarn_nm').length
      if ctx.has_service('ryba/hadoop/yarn_nm') and print_yarn_nm
        ws.write "  YARN NodeManager\n"
        ws.write "  Memory YARN: #{prink.filesize capacity.memory_yarn, 3}\n"
        ws.write "  Number of Cores: #{capacity.cores}\n"
        ws.write "  Number of Containers: #{capacity.max_number_of_containers}\n"
        ws.write "  Memory per Containers: #{prink.filesize capacity.memory_per_container, 3}\n"
        print 'yarn_site', ['yarn.nodemanager.resource.memory-mb', 'yarn.nodemanager.vmem-pmem-ratio', 'yarn.nodemanager.resource.cpu-vcores', 'yarn.nodemanager.local-dirs', 'yarn.nodemanager.log-dirs']
      print_mapred_client = not params.modules or multimatch(params.modules, 'ryba/hadoop/mapred_client').length
      if ctx.should_configure_mapred_client and print_mapred_client
        ws.write "  MapReduce Client\n"
        print 'mapred_site', ['yarn.app.mapreduce.am.resource.mb', 'yarn.app.mapreduce.am.command-opts', 'mapreduce.map.memory.mb', 'mapreduce.map.java.opts', 'mapreduce.reduce.memory.mb', 'mapreduce.reduce.java.opts', 'mapreduce.task.io.sort.mb', 'mapreduce.map.cpu.vcores', 'mapreduce.reduce.cpu.vcores']
      print_tez_client = not params.modules or multimatch(params.modules, 'ryba/tez').length
      if ctx.has_service('ryba/tez') and print_tez_client
        ws.write "  Tez Client\n"
        print 'tez_site', ['tez.am.resource.memory.mb', 'tez.task.resource.memory.mb', 'tez.runtime.io.sort.mb']
      print_hive_client = not params.modules or multimatch(params.modules, 'ryba/hadoop/hive_client').length
      if ctx.has_service('ryba/hive/client') and print_hive_client
        ws.write "  Hive Client\n"
        print 'hive_site', ['hive.tez.container.size', 'hive.tez.java.opts']
      print_hbase_regionserver = not params.modules or multimatch(params.modules, 'ryba/hadoop/regionserver').length
      if ctx.has_service('ryba/hbase/regionserver') and print_hbase_regionserver
        ws.write "  Memory HBase: #{prink.filesize capacity.memory_hbase, 3}\n"
        ws.write "  HBase RegionServer\n"
        {regionserver_opts} = ctx.config.capacity
        ws.write "    hbase-env: -Xms#{regionserver_opts} -Xmx#{regionserver_opts}\n"
      print_kafka_broker = not params.modules or multimatch(params.modules, 'ryba/kafka/broker').length
      if ctx.has_service('ryba/kafka/broker') and print_kafka_broker
        ws.write "  Kafka Broker\n"
        print 'kafka_broker', ['log.dirs']
      print_nifi = not params.modules or multimatch(params.modules, 'ryba/nifi').length
      if ctx.has_service('ryba/nifi')
        print 'nifi_properties', 'Content/Provenance Repositories'
        ws.write "  Nifi\n", capacity.nifi_properties
    do_end ws
  do_end = (ws) ->
    ws.end() if params.output
    next()
  do_open()

exports.write_json = (params, config, clusters, next) ->
  do_open = ->
    return do_write process.stdout unless params.output
    return next() unless params.output
    fs.stat params.output, (err, stat) ->
      return next err if err and err.code isnt 'ENOENT'
      return next Error 'File Already Exists, use --overwrite' unless err or params.overwrite
      do_write fs.createWriteStream params.output, encoding: 'utf8'
  do_write = (ws) ->
    nodes = {}
    nodes = merge nodes, exports.capacity_to_ryba  params, config, cluster.nodes for cluster in clusters
    ws.write JSON.stringify nodes: nodes, null, 2
  do_end = (ws) ->
    ws.end() if params.output
    next()
  do_open()

exports.write_js = (config, clusters, next) ->
  do_open = ->
    return do_write process.stdout unless params.output
    return next() unless params.output
    fs.stat params.output, (err, stat) ->
      return next err if err and err.code isnt 'ENOENT'
      return next Error 'File Already Exists, use --overwrite' unless err or params.overwrite
      do_write fs.createWriteStream params.output, encoding: 'utf8'
  do_write = (ws) ->
    nodes = {}
    nodes = merge nodes, exports.capacity_to_ryba  params, config, cluster.nodes for cluster in clusters
    source = JSON.stringify nodes: nodes, null, 2
    source = "module.exports = #{source};"
    ws.write source
  do_end = (ws) ->
    ws.end() if params.output
    next()
  do_open()

exports.write_coffee = (params, config, clusters, next) ->
  do_open = ->
    return do_write process.stdout unless params.output
    return next() unless params.output
    fs.stat params.output, (err, stat) ->
      return next err if err and err.code isnt 'ENOENT'
      return next Error 'File Already Exists, use --overwrite' unless err or params.overwrite
      do_write fs.createWriteStream params.output, encoding: 'utf8'
  do_write = (ws) ->
    nodes = {}
    nodes = merge nodes, exports.capacity_to_ryba  params, config, cluster.nodes for cluster in clusters
    source = JSON.stringify nodes: nodes
    source = "module.exports = #{source}"
    argv = process.argv
    argv[1] = path.relative process.cwd(), argv[1]
    ws.write "# #{argv.join(' ')}\n"
    ws.write "\n"
    ws.write js2coffee.build(source).code
    for cluster in clusters
      for ctx in cluster.nodes
        {capacity} = ctx.config
        ws.write "\n"
        ws.write "# #{ctx.fqdn}\n"
        ws.write "#   Number of core: #{capacity.cores}\n"
        ws.write "#   Number of partitions: #{capacity.disks.length}\n"
        ws.write "#   Memory Total: #{prink.filesize capacity.total_memory, 3}\n"
        ws.write "#   Memory System: #{prink.filesize capacity.memory_system, 3}\n"
        print_yarn_nm = not params.modules or multimatch(params.modules, 'ryba/hbase/regionserve').length
        if ctx.has_service('ryba/hbase/regionserver') and print_yarn_nm
          ws.write "#   HBase RegionServer\n"
          ws.write "#     Memory HBase: #{prink.filesize capacity.memory_hbase, 3}\n"
        print_yarn_nm = not params.modules or multimatch(params.modules, 'ryba/hadoop/yarn_nm').length
        if ctx.has_service('ryba/hadoop/yarn_nm') and print_yarn_nm
          ws.write "#   YARN NodeManager\n"
          ws.write "#     Memory YARN: #{prink.filesize capacity.memory_yarn, 3}\n"
          ws.write "#     Number of Cores: #{capacity.cores}\n"
          ws.write "#     Number of Containers: #{capacity.max_number_of_containers}\n"
          ws.write "#     Memory per Containers: #{prink.filesize capacity.memory_per_container, 3}\n"
          ws.write "#   YARN NodeManager Process heapsize: #{prink.filesize capacity.memory_nodemanager, 3}\n"
        print_hdfs_dn = not params.modules or multimatch(params.modules, 'ryba/hadoop/hdfs_dn').length
        if ctx.has_service('ryba/hadoop/hdfs_dn') and print_hdfs_dn
          ws.write "#   HDFS Datanode Process heapsize: #{prink.filesize capacity.memory_datanode, 3}\n"
    do_end ws
  do_end = (ws) ->
    # ws.end() if params.output
    ws.end() # TODO, seems like we can close stdout
    next()
  do_open()

exports.write_xml = (params, config, nodes, next) ->
  nodes = exports.capacity_to_ryba params, config,nodes
  next()

exports.capacity_to_ryba = (params, config, ctxs) ->
  nodes = {}
  for ctx in ctxs
    continue if params.hosts and not multimatch(params.hosts, ctx.config.host).length
    {capacity} = ctx.config
    node = {}
    node.services ?= {}
    cluster = config.cluster
    throw Error 'cluster is node defined' unless cluster?
    print_hdfs = not params.modules or multimatch(params.modules, ['ryba/hadoop/hdfs_client', 'ryba/hadoop/hdfs_nn', 'ryba/hadoop/hdfs_dn']).length
    if ctx.has_service('ryba/hadoop/hdfs_nn') and print_hdfs
      service = node.services["#{cluster}:ryba/hadoop/hdfs_nn"] ?= {}
      service.hdfs_site ?= capacity.nn_hdfs_site
    if ctx.has_service('ryba/hadoop/hdfs_client') and print_hdfs
      service = node.services["#{cluster}:ryba/hadoop/hdfs_client"] ?= {}
      service.hdfs_site ?= capacity.hdfs_site
    if ctx.has_service('ryba/hadoop/hdfs_dn') and print_hdfs
      service = node.services["#{cluster}:ryba/hadoop/hdfs_dn"] ?= {}
      service.hdfs_site ?= capacity.dn_hdfs_site
    print_yarn_rm = not params.modules or multimatch(params.modules, 'ryba/hadoop/yarn_rm').length
    if ctx.has_service('ryba/hadoop/yarn_rm') and print_yarn_rm
      service = node.services["#{cluster}:ryba/hadoop/yarn_rm"] ?= {}
      service.yarn_site = capacity.rm_yarn_site
      service.capacity_scheduler = capacity.capacity_scheduler
    print_yarn_nm = not params.modules or multimatch(params.modules, 'ryba/hadoop/yarn_nm').length
    if ctx.has_service('ryba/hadoop/yarn_nm') and print_yarn_nm
      service = node.services["#{cluster}:ryba/hadoop/yarn_nm"] ?= {}
      service.yarn_site = capacity.yarn_site
    print_mapred_client = not params.modules or multimatch(params.modules, 'ryba/hadoop/mapred_client').length
    if ctx.has_service('ryba/hadoop/mapred_client') and print_mapred_client
    # if ctx.should_configure_mapred_client and print_mapred_client
      service = node.services["#{cluster}:ryba/hadoop/mapred_client"] ?= {}
      service.mapred_site = capacity.mapred_site
    print_tez_client = not params.modules or multimatch(params.modules, 'ryba/tez').length
    if ctx.has_service('ryba/tez') and print_tez_client
      service = node.services["#{cluster}:ryba/tez"] ?= {}
      service.tez_site = capacity.tez_site
    print_hive_client = not params.modules or multimatch(params.modules, 'ryba/hive/client').length
    if ctx.has_service('ryba/hive/client') and print_hive_client
      service = node.services["#{cluster}:ryba/hive/client"] ?= {}
      service.hive_site = capacity.hive_site
    if ctx.has_service('ryba/hive/beeline') and print_hive_client
      service = node.services["#{cluster}:ryba/hive/beeline"] ?= {}
      service.hive_site = capacity.hive_site
    print_hbase_regionserver = not params.modules or multimatch(params.modules, 'ryba/hbase/regionserver').length
    if ctx.has_service('ryba/hbase/regionserver') and print_hbase_regionserver
      service = node.services["#{cluster}:ryba/hbase/regionserver"] ?= {}
      service.heapsize ?= capacity.regionserver_opts
    print_kafka_broker = not params.modules or multimatch(params.modules, 'ryba/kafka/broker').length
    if ctx.has_service('ryba/kafka/broker') and print_kafka_broker
      service = node.services["#{cluster}:ryba/kafka/broker"] ?= {}
      service.config = capacity.kafka_broker
    print_nifi = not params.modules or multimatch(params.modules, 'ryba/nifi').length
    if ctx.has_service('ryba/nifi') and print_nifi
      service = node.services["#{cluster}:ryba/nifi"] ?= {}
      service.properties ?= capacity.nifi_properties
    nodes[ctx.fqdn] = node
  nodes

exports.rounded_memory = (memory) ->
  exports.rounded_memory_mb(memory / 1024 / 1024) * 1024 * 1024

exports.rounded_memory_mb = (memory_mb) ->
  denominator_mb = 128
  if memory_mb > 4096
    denominator_mb = 1024
  else if memory_mb > 2048
    denominator_mb = 512
  else if memory_mb > 1024
    denominator_mb = 256
  else
    denominator_mb = 128
  Math.floor( memory_mb / denominator_mb ) * denominator_mb

Resources

Dependencies

fs = require 'fs'
ssh2fs = require 'ssh2-fs'
nikita = require 'nikita'
{merge} = require '@nikitajs/core/lib/misc'
string = require '@nikitajs/core/lib/misc/string'
multimatch = require 'multimatch'
prink = require 'prink'
path = require 'path'
js2coffee = require 'js2coffee'
properties = require '../properties'
each = require 'each'