Menu

HDFS Put

Put a File in HDFS using webhdfs api.

Options

  • nn_url (String, array)
    The url like https://master01.metal.ryba:50470. RYBA will take every url an will request Namenode's metrics to get the active status.
  • krb5_user (obj)
    A principal which has suffisiant permission to request namenode api.
  • source (string)
    The source file needed to be uploaded. Required.
  • local (boolean)
    is the source file on local compute or remote server. false by default.
  • target (string) the target file name in hdfs. Required.

Source Code

module.exports = ({options}, callback) ->
  return callback Error 'Missing krb5 user' unless options.krb5_user
  return callback Error 'missing source' unless options.source
  return callback Error 'Missing target' unless options.target
  options.local ?= false
  active_url = null
  file_exists = false
  should_upload = true
  file_checksum = null
  local_checksum = null
  options.mode ?= '644'
  count = 0
  if options.target.indexOf('hdfs://') is 0 
    for i in [0..options.target.length]
      count++ if options.target[i] is '/'
      if count == 3
        options.target = options.target.slice(i)
        break
  options.tmpfile = "/tmp/#{path.basename options.target}"
  @registry.register ['hdfs','active'], require './active'
  @registry.register ['hdfs','chown'], require './chown'
  @call ->
    @hdfs.active options
    , (err, obj) ->
      active_url = obj.active
  @call
    shy: true
  ,  (_ , cb)->
    @system.execute
      cmd: mkcmd.hdfs options.krb5_user, "/bin/bash -c 'curl --negotiate -X GET -k -u : \"#{active_url}/webhdfs/v1#{options.target}?op=GETFILESTATUS\"'"
    , (err, data) ->
        return cb err if err
        error = null
        try
          response = JSON.parse data.stdout
          if response['RemoteException']?
            if response['RemoteException']['exception'] is 'FileNotFoundException'
              @log message: "File #{options.target} does not exist", level: 'INFO'
              file_exists = false
            else
              throw Error response['RemoteException']
          else
            if response['FileStatus']?
              throw Error "#{options.target} is not a file " unless  response['FileStatus']['type'] is 'FILE'
              @log message: "File #{options.target} does already exist", level: 'INFO'
              file_exists = true
        catch err
          error = err
        finally
          cb error, false
  @call
    if: -> file_exists
  , ->
    @system.execute
      shy: true
      cmd: mkcmd.hdfs options.krb5_user, "/bin/bash -c 'curl -L --negotiate -X GET -k -u : \"#{active_url}/webhdfs/v1#{options.target}?op=OPEN\" > #{options.tmpfile}'"
    @system.execute
      shy: true
      cmd: """
        openssl md5 #{options.tmpfile}
      """
    , (err, data) ->
        throw err if err
        try
          file_checksum = data?.stdout?.split('=')[1]?.trim()
          @log message: "File hdfs checksum #{options.target} #{file_checksum}", level: 'INFO'
          # if file_checksum?
          file_exists = true
        catch err
          throw err
  @call
    if: -> file_exists
  , ->
    @system.execute
      shy: true
      cmd: """
        openssl md5 #{options.source} | tail -n1
      """
    , (err, data) ->
        throw err if err
        try
          local_checksum = data?.stdout?.split('=')[1]?.trim()
          @log message: "File local checksum #{options.source} #{local_checksum}", level: 'INFO'
          # if local_checksum
          should_upload = !(file_checksum is local_checksum)
        catch err
          throw err            
  @call ->
    @system.execute
      if: -> (not file_exists) or should_upload
      local: options.local
      cmd: mkcmd.hdfs options.krb5_user, "/bin/bash -c 'curl -H \"Content-Type: application/octet-stream\" --negotiate -X PUT -L -k -T  #{options.source} -u : \"#{active_url}/webhdfs/v1#{options.target}?op=CREATE#{if should_upload then '&overwrite=true' else ''}\"'"
  if options.owner or options.group
    @hdfs.chown options
    @next callback
  else
    @next callback

Dependencies

mkcmd = require '../../mkcmd'
path = require 'path'