Menu

NiFi Configure

module.exports = (service) ->
  options = service.options

  # Set it to true if both hdf and hdp are installed on the cluster
  options.hdf_hdp ?= false
  zk_hosts = service.deps.zookeeper_server.filter( (srv) -> srv.options.config['peerType'] is 'participant')

Environment

  options.conf_dir ?= '/etc/nifi/conf'
  options.log_dir ?= '/var/log/nifi'

Identities

  # Group
  options.group = name: options.group if typeof options.group is 'string'
  options.group ?= {}
  options.group.name ?= 'nifi'
  options.group.system ?= true
  # User
  options.user = name: options.user if typeof options.user is 'string'
  options.user ?= {}
  options.user.name ?= 'nifi'
  options.user.gid = options.group.name
  options.user.system ?= true
  options.user.comment ?= 'NiFi User'
  options.user.home ?= '/var/lib/nifi'
  options.user.limits ?= {}
  options.user.limits.nofile ?= 64000
  options.user.limits.nproc ?= 10000

Configuration

  #Misc
  options.fqdn ?= service.node.fqdn
  options.shortname ?= service.node.hostname
  options.iptables ?= !!service.deps.iptables and service.deps.iptables.action is 'start'
  options.properties ?= {}
  options.properties['nifi.version'] ?= '1.2.0.3.0.0.0-453'
  options.properties['nifi.flow.configuration.file'] ?= "#{options.user.home}/flow.xml.gz"
  options.properties['nifi.flow.configuration.archive.dir'] ?= "#{options.user.home}/archive"
  options.properties['nifi.flowcontroller.autoResumeState'] ?= 'true'
  options.properties['nifi.flowcontroller.graceful.shutdown.period'] ?= '10 sec'
  options.properties['nifi.flowservice.writedelay.interval'] ?= '500 ms'
  options.properties['nifi.administrative.yield.duration'] ?= '30 sec'
  # If a component has no work to do (is "bored"), how long should we wait before checking again for work?'
  options.properties['nifi.bored.yield.duration'] ?= '10 millis'
  # timeout [properties][nifi-properties] before node disconnect
  options.properties['nifi.cluster.node.read.timeout'] ?= '15 sec'
  options.properties['nifi.authorizer.configuration.file'] ?= "#{options.conf_dir}/authorizers.xml"
  options.properties['nifi.login.identity.provider.configuration.file'] ?= "#{options.conf_dir}/login-identity-providers.xml"
  options.properties['nifi.templates.directory'] ?= "#{options.user.home}/templates"
  options.properties['nifi.ui.banner.text'] ?= ''
  options.properties['nifi.ui.autorefresh.interval'] ?= '30 sec'
  options.properties['nifi.nar.library.directory'] ?= '/usr/hdf/current/nifi/lib'
  options.properties['nifi.nar.working.directory'] ?= "#{options.user.home}/work/nar/"
  options.properties['nifi.documentation.working.directory'] ?= "#{options.user.home}/work/docs/components"

State Management

  options.properties['nifi.state.management.configuration.file'] ?= "#{options.conf_dir}/state-management.xml"
  # The ID of the local state provider
  options.properties['nifi.state.management.provider.local'] ?= 'local-provider'
  # The ID of the cluster-wide state provider. This will be ignored if NiFi is not clustered but must be populated if running in a cluster.
  options.properties['nifi.state.management.provider.cluster'] ?= 'zk-provider'
  # Specifies whether or not this instance of NiFi should run an embedded ZooKeeper server
  options.properties['nifi.state.management.embedded.zookeeper.start'] ?= 'false'
  # Properties file that provides the ZooKeeper properties to use if <nifi.state.management.embedded.zookeeper.start> is set to true
  options.properties['nifi.state.management.embedded.zookeeper.properties'] ?= "#{options.conf_dir}/zookeeper.properties"

H2 Settings

  options.properties['nifi.database.directory'] ?= "#{options.user.home}/database_repository"
  options.properties['nifi.h2.url.append'] ?= ';LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE'

Flow Configuration

  options.properties['nifi.flow.configuration.archive.enabled'] ?= 'true'
  options.properties['nifi.flow.configuration.archive.max.time'] ?= '30 days'
  options.properties['nifi.flow.configuration.archive.max.storage'] ?= '500 MB'
  # FlowFile Repository
  options.properties['nifi.flowfile.repository.implementation'] ?= 'org.apache.nifi.controller.repository.WriteAheadFlowFileRepository'
  options.properties['nifi.flowfile.repository.directory'] ?= "#{options.user.home}/flowfile_repository"
  options.properties['nifi.flowfile.repository.partitions'] ?= '256'
  options.properties['nifi.flowfile.repository.checkpoint.interval'] ?= '2 mins'
  options.properties['nifi.flowfile.repository.always.sync'] ?= 'false'

Swap Configuration

  options.properties['nifi.swap.manager.implementation'] ?= 'org.apache.nifi.controller.FileSystemSwapManager'
  options.properties['nifi.queue.swap.threshold'] ?= '20000'
  options.properties['nifi.swap.in.period'] ?= '5 sec'
  options.properties['nifi.swap.in.threads'] ?= '1'
  options.properties['nifi.swap.out.period'] ?= '5 sec'
  options.properties['nifi.swap.out.threads'] ?= '4'

Content Configuration

  options.properties['nifi.content.repository.implementation'] ?= 'org.apache.nifi.controller.repository.FileSystemRepository'
  # the content repository should be in dedicated folders.
  # if some content repositories are already configured, ryba considers that the default is disabled
  # administrator can still enable it using 'nifi.content.repository.directory.default' property
  options.use_content_default ?= true
  for k in Object.keys options.properties
    options.use_content_default = false if k.indexOf 'nifi.content.repository.directory.' isnt -1
  options.properties['nifi.content.repository.directory.default'] ?= "#{options.user.home}/content_repository" if options.use_content_default
  options.properties['nifi.content.claim.max.appendable.size'] ?= '10 MB'
  options.properties['nifi.content.claim.max.flow.files'] ?= '100'
  options.properties['nifi.content.repository.archive.max.retention.period'] ?= '12 hours'
  options.properties['nifi.content.repository.archive.max.usage.percentage'] ?= '50%'
  options.properties['nifi.content.repository.archive.enabled'] ?= 'true'
  options.properties['nifi.content.repository.always.sync'] ?= 'false'
  options.properties['nifi.content.viewer.url'] ?= '/nifi-content-viewer/'

Provenance Configuration

  options.properties['nifi.provenance.repository.implementation'] ?= 'org.apache.nifi.provenance.PersistentProvenanceRepository'
  # the content repository should be in dedicated folders.
  # if some content repositories are already configured, ryba considers that the default is disabled
  # administrator can still enable it using 'nifi.content.repository.directory.default' property
  options.use_provenance_default ?= true
  for k in Object.keys options.properties
    options.use_provenance_default = false if k.indexOf 'nifi.provenance.repository.directory.' isnt -1
  options.properties['nifi.provenance.repository.directory.default'] ?= "#{options.user.home}/provenance_repository" if options.use_provenance_default
  options.properties['nifi.provenance.repository.max.storage.time'] ?= '24 hours'
  options.properties['nifi.provenance.repository.max.storage.size'] ?= '1 GB'
  options.properties['nifi.provenance.repository.rollover.time'] ?= '30 secs'
  options.properties['nifi.provenance.repository.rollover.size'] ?= '100 MB'
  options.properties['nifi.provenance.repository.query.threads'] ?= '2'
  options.properties['nifi.provenance.repository.index.threads'] ?= '1'
  options.properties['nifi.provenance.repository.compress.on.rollover'] ?= 'true'
  options.properties['nifi.provenance.repository.always.sync'] ?= 'false'
  options.properties['nifi.provenance.repository.journal.count'] ?= '16'
  # Comma-separated list of fields. Fields that are not indexed will not be searchable. Valid fields are: 
  # EventType, FlowFileUUID, Filename, TransitURI, ProcessorID, AlternateIdentifierURI, Relationship, Details
  options.properties['nifi.provenance.repository.indexed.fields'] ?= 'EventType, FlowFileUUID, Filename, ProcessorID, Relationship'
  # FlowFile Attributes that should be indexed and made searchable.  Some examples to consider are filename, uuid, mime.type
  options.properties['nifi.provenance.repository.indexed.attributes'] ?= ''
  # Large values for the shard size will result in more Java heap usage when searching the Provenance Repository
  # but should provide better performance
  options.properties['nifi.provenance.repository.index.shard.size'] ?= '500 MB'
  # Indicates the maximum length that a FlowFile attribute can be when retrieving a Provenance Event from
  # the repository. If the length of any attribute exceeds this value, it will be truncated when the event is retrieved.
  options.properties['nifi.provenance.repository.max.attribute.length'] ?= '65536'
  # Volatile Provenance Respository Properties
  options.properties['nifi.provenance.repository.buffer.size'] ?= '100000'

Component Status Repository

  options.properties['nifi.components.status.repository.implementation'] ?= 'org.apache.nifi.controller.status.history.VolatileComponentStatusRepository'
  options.properties['nifi.components.status.repository.buffer.size'] ?= '1440'
  options.properties['nifi.components.status.snapshot.frequency'] ?= '1 min'

Site to site properties

  options.properties['nifi.remote.input.socket.host'] ?= service.node.fqdn
  # Set a specific port in order to use RAW socket as transport protocol for Site-to-Site
  options.properties['nifi.remote.input.socket.port'] ?= ''

Web Properties

  options.properties['nifi.web.war.directory'] ?= '/usr/hdf/current/nifi/lib'
  options.properties['nifi.web.jetty.working.directory'] ?= "#{options.user.home}/work/jetty"
  options.properties['nifi.web.jetty.threads'] ?= '200'

Common Properties

  # cluster common properties (cluster manager and nodes must have same values) #
  options.properties['nifi.cluster.protocol.heartbeat.interval'] ?= '5 sec'
  options.properties['nifi.cluster.protocol.socket.timeout'] ?= '30 sec'
  options.properties['nifi.cluster.protocol.connection.handshake.timeout'] ?= '45 sec'
  # if multicast is used, then nifi.cluster.protocol.multicast.xxx properties must be configured #
  options.properties['nifi.cluster.protocol.use.multicast'] ?= 'false'
  options.properties['nifi.cluster.protocol.multicast.address'] ?= ''
  options.properties['nifi.cluster.protocol.multicast.port'] ?= '9872'
  options.properties['nifi.cluster.protocol.multicast.service.broadcast.delay'] ?= '500 ms'
  options.properties['nifi.cluster.protocol.multicast.service.locator.attempts'] ?= '3'
  options.properties['nifi.cluster.protocol.multicast.service.locator.attempts.delay'] ?= '1 sec'
  # cluster node properties (only configure for cluster nodes) #
  options.properties['nifi.cluster.is.node'] ?= 'true'
  if options.properties['nifi.cluster.is.node'] is 'true'
    options.properties['nifi.cluster.node.address'] ?= service.node.fqdn
    options.properties['nifi.cluster.node.protocol.port'] ?= '9870'
    options.properties['nifi.cluster.node.protocol.threads'] ?= '10'
    options.properties['nifi.zookeeper.connect.string'] ?= zk_hosts.map( (srv) -> "#{srv.node.fqdn}:#{srv.options.config['clientPort']}").join ','
    options.properties['nifi.zookeeper.root.node'] ?= '/nifi'
    options.properties['nifi.cluster.request.replication.claim.timeout'] ?= '15 sec'
  options.properties['nifi.cluster.is.manager'] ?= 'false'
  if options.properties['nifi.cluster.is.manager'] is 'true'
    options.properties['nifi.cluster.manager.address'] ?= service.node.fqdn
    options.properties['nifi.cluster.manager.protocol.port'] ?= '9871'
    options.properties['nifi.cluster.manager.node.firewall.file'] ?= ''
    options.properties['nifi.cluster.manager.node.event.history.size'] ?= '10'
    options.properties['nifi.cluster.manager.node.api.connection.timeout'] ?= '30 sec'
    options.properties['nifi.cluster.manager.node.api.read.timeout'] ?= '30 sec'
    options.properties['nifi.cluster.manager.node.api.request.threads'] ?= '10'
    options.properties['nifi.cluster.manager.flow.retrieval.delay'] ?= '5 sec'
    options.properties['nifi.cluster.manager.protocol.threads'] ?= '10'
    options.properties['nifi.cluster.manager.safemode.duration'] ?= '0 sec'
  options.properties['nifi.cluster.flow.election.max.wait.time'] ?= '5 mins'
  options.properties['nifi.cluster.flow.election.max.candidates'] ?= "#{service.deps.nifi.length}"

Security

  options.krb5 ?= {}
  options.krb5.realm ?= service.deps.krb5_client.options.etc_krb5_conf?.libdefaults?.default_realm
  throw Error 'Required Options: "realm"' unless options.krb5.realm
  options.krb5.admin ?= service.deps.krb5_client.options.admin[options.krb5.realm]
  #Sensitive value encryption
  options.properties['nifi.sensitive.props.key'] ?= '' #'nifi_master_secret_123'
  options.properties['nifi.sensitive.props.algorithm'] ?= 'PBEWITHMD5AND256BITAES-CBC-OPENSSL'
  options.properties['nifi.sensitive.props.provider'] ?= 'BC'
  # Kerberos
  if service.deps.hadoop_core[0].options.core_site['hadoop.security.authentication'] is 'kerberos'
    options.properties['nifi.kerberos.krb5.file'] ?= '/etc/krb5.conf'

SSL

    options.ssl = merge {}, service.deps.ssl?.options, options.ssl
    options.ssl.enabled ?= !!service.deps.ssl
    options.truststore ?= {}
    options.keystore ?= {}
    if options.ssl.enabled
      throw Error "Required Option: ssl.cert" if  not options.ssl.cert
      throw Error "Required Option: ssl.key" if not options.ssl.key
      throw Error "Required Option: ssl.cacert" if not options.ssl.cacert
      options.truststore.target ?= "#{options.conf_dir}/truststore"
      throw Error "Required Property: truststore.password" if not options.truststore.password
      options.keystore.target ?= "#{options.conf_dir}/keystore"
      throw Error "Required Property: keystore.password" if not options.keystore.password
      options.truststore.caname ?= 'hadoop_root_ca'
  options.properties['nifi.cluster.protocol.is.secure'] ?= 'true'
  if options.properties['nifi.cluster.protocol.is.secure'] is 'true'
    options.properties['nifi.web.http.host'] = ''
    options.properties['nifi.web.http.port'] = ''
    options.properties['nifi.web.https.host'] ?= service.node.fqdn
    options.properties['nifi.web.https.port'] ?= '9760'
    options.properties['nifi.security.keystore'] ?= options.keystore.target
    options.properties['nifi.security.keystoreType'] ?= 'JKS'
    options.properties['nifi.security.keystorePasswd'] ?= options.keystore.password
    options.properties['nifi.security.keyPasswd'] ?= 'nifi123'
    options.properties['nifi.security.truststore'] ?= options.truststore.target
    options.properties['nifi.security.truststoreType'] ?= 'JKS'
    options.properties['nifi.security.truststorePasswd'] ?= options.truststore.password
    options.properties['nifi.security.needClientAuth'] ?= 'true'
    # Valid Authorities include: ROLE_MONITOR,ROLE_DFM,ROLE_ADMIN,ROLE_PROVENANCE,ROLE_NIFI
    # role given to anonymous users
    options.properties['nifi.security.anonymous.authorities'] ?= '' # no role given to anonymous
    # secure inner connection and remote
    options.properties['nifi.remote.input.secure'] ?= 'true'
  else
    options.properties['nifi.web.http.host'] ?= service.node.fqdn
    options.properties['nifi.web.http.port'] ?= '9750'
    options.properties['nifi.web.https.host'] = ''
    options.properties['nifi.web.https.port'] = ''

User Authentication

  options.properties['nifi.security.user.login.identity.provider'] ?= 'kerberos-provider'
  options.properties['nifi.login.identity.provider.configuration.file'] ?= "#{options.conf_dir}/login-identity-providers.xml"
  options.login_providers ?= {}
  switch options.properties['nifi.security.user.login.identity.provider']
    when 'ldap-provider'
      ldap_provider = options.login_providers.ldap_provider ?= {}
      ldap_provider['auth_strategy'] ?= 'SIMPLE'
      throw Error 'ldap_provider.auth_strategy must be "ANONYMOUS", "SIMPLE", or "START_TLS"' if ldap_provider['auth_strategy'] not in ['ANONYMOUS', 'SIMPLE', 'START_TLS']
      ldap_provider['tls_keystore'] ?= "#{options.properties['nifi.security.keystore']}"
      ldap_provider['tls_keystore_pwd'] ?= "#{options.properties['nifi.security.keystorePasswd']}"
      ldap_provider['tls_keystore_type'] ?= "#{options.properties['nifi.security.keystoreType']}"
      ldap_provider['tls_truststore'] ?= "#{options.properties['nifi.security.truststore']}"
      ldap_provider['tls_truststore_pwd'] ?= "#{options.properties['nifi.security.truststorePasswd']}"
      ldap_provider['tls_truststore_type'] ?= "#{options.properties['nifi.security.truststoreType']}"
      ldap_provider['tls_truststore_protocol'] ?= 'TLS'
      ldap_provider['tls_client_auth'] ?= 'NONE'
      ldap_provider['ref_strategy'] ?= 'FOLLOW'
      unless ldap_provider['manager_dn']?
        throw Error 'no openldap server configured' unless service.deps.openldap_server.length?
        ldap_provider['manager_dn'] ?= "#{service.deps.openldap_server[0].options.root_dn}"
        ldap_provider['manager_pwd'] ?= "#{service.deps.openldap_server[0].options.root_password}"
        ldap_provider['url'] ?= "#{service.deps.openldap_server[0].options.uri}:636"
        ldap_provider['usr_search_base'] ?= service.deps.openldap_server[0].options.users_dn
        ldap_provider['usr_search_filter'] ?= 'uid={0}'#'ou=groups,dc=ryba'
    when 'kerberos-provider'
      krb5_provider = options.login_providers.krb5_provider ?= {}
      krb5_provider['realm'] ?= options.krb5.realm
      options.properties['nifi.kerberos.service.principal'] ?= "HTTP/#{service.node.fqdn}@#{options.krb5.realm}"
      options.properties['nifi.kerberos.keytab.location'] ?= '/etc/security/keytabs/spnego.service.keytab'
      options.admin ?= {}
      options.admin.krb5_principal ?= "#{options.user.name}@#{options.krb5.realm}"
      options.admin.krb5_password ?= 'nifi123'
    else
      throw Error 'login provider is not supported'
  options.properties['nifi.security.identity.mapping.pattern.dn'] ?= '^CN=(.*?),(.*)$'
  options.properties['nifi.security.identity.mapping.value.dn'] ?= '$1'
  options.properties['nifi.security.identity.mapping.pattern.kerb'] ?= '^(.*?)@(.*?)$'
  options.properties['nifi.security.identity.mapping.value.kerb'] ?= '$1'

User Authorization

  options.properties['nifi.authorizer.configuration.file'] ?= "#{options.conf_dir}/authorizers.xml"
  options.properties['nifi.security.user.authorizer'] ?= 'file-provider'
  options.authorizers ?= {}
  switch options.properties['nifi.security.user.authorizer']
    when 'file-provider'
      file_provider = options.authorizers.file_provider ?= {}
      file_provider['authorizations_file'] ?= "#{options.conf_dir}/authorizations.xml"
      file_provider['users_file'] ?= "#{options.conf_dir}/users.xml"
      file_provider['initial_admin_identity'] ?= options.user.name
      file_provider['nodes_identities'] ?= service.deps.nifi.map( (srv) -> srv.node.fqdn)
    else
      throw Error 'Authorizer is not supported'

Cluster Management

  switch options.properties['nifi.state.management.provider.cluster']
    when 'zk-provider'
      throw Error 'No zookeeper quorum configured' unless zk_hosts.length
      # used for nifi to authenticate to kerberos sucurized zookeeper ensemble
      if service.deps.hadoop_core[0].options.core_site['hadoop.security.authentication'] is 'kerberos'
        options.krb5_principal ?=  "#{options.user.name}/#{service.node.fqdn}@#{options.krb5.realm}"
        options.krb5_keytab ?=  '/etc/security/keytabs/nifi.service.keytab'
    else
      throw Error 'No other cluster state provider is supported for now'

Java Opts

  options.java_home ?= service.deps.java.options.java_home if service.deps.java?
  options.java_opts ?= [
    '-Dorg.apache.jasper.compiler.disablejsr199=true'
    '-Xms512m'
    '-Xmx512m'
    '-Djava.net.preferIPv4Stack=true'
    '-Dsun.net.http.allowRestrictedHeaders=true'
    '-Djava.protocol.handler.pkgs=sun.net.www.protocol'
    '-XX:+UseG1GC'
    '-Djava.awt.headless=true'
  ]
  options.java_opts.push "-Djava.security.auth.login.config=#{options.conf_dir}/nifi-zookeeper.jaas"

Log4J

  options.log4j = merge {}, service.deps.log4j?.options, options.log4j
  options.log4j.properties ?= {}

  options.logback ?= {}
  options.logback.version ?= "1.1.7"
  options.logback.socketappender ?= {}
  options.logback.socketappender.version ?= "4.8"
  options.logback.socketappender.source ?= "http://central.maven.org/maven2/net/logstash/logback/logstash-logback-encoder/#{options.logback.socketappender.version}/logstash-logback-encoder-#{options.logback.socketappender.version}.jar"
  options.logback.core ?= {}
  options.logback.core.source ?= "http://central.maven.org/maven2/ch/qos/logback/logback-core/#{options.logback.version}/logback-core-#{options.logback.version}.jar"
  options.logback.classic ?= {}
  options.logback.classic.source ?= "http://central.maven.org/maven2/ch/qos/logback/logback-core/#{options.logback.version}/logback-classic-#{options.logback.version}.jar"
  options.logback.access ?= {}
  options.logback.access.source ?= "http://central.maven.org/maven2/ch/qos/logback/logback-core/#{options.logback.version}/logback-access-#{options.logback.version}.jar"

Additional Libs

Set local path of additional libs (for custom processors) in this array.

  options.custom_libs_dir ?= []

Data Directories Layout

  props = Object.keys(options.properties).filter (prop) ->
   prop.indexOf('nifi.content.repository.directory') > -1 or prop.indexOf('nifi.provenance.repository.directory') > -1
  options.data_dirs ?= []
  options.data_dirs.push options.properties[prop] for prop in props

Wait

  protocol = if options.properties['nifi.cluster.protocol.is.secure'] is 'true' then 'https' else 'http'
  options.wait ?= {}
  options.wait.webui = for srv in service.deps.nifi
    host: srv.node.fqdn
    port: srv.options.properties["nifi.web.#{protocol}.port"] or options.properties["nifi.web.#{protocol}.port"] or '9760'

Dependencies

{merge} = require '@nikitajs/core/lib/misc'