class Fluent::Plugin::ForwardOutput::Node

Constants

RequestInfo

Attributes

available[R]
failure[R]
host[R]
name[R]
port[R]
sockaddr[R]
standby[R]
state[R]
usock[RW]
weight[R]

Public Class Methods

new(sender, server, failure:) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 541
def initialize(sender, server, failure:)
  @sender = sender
  @log = sender.log
  @compress = sender.compress

  @name = server.name
  @host = server.host
  @port = server.port
  @weight = server.weight
  @standby = server.standby
  @failure = failure
  @available = true

  # @hostname is used for certificate verification & TLS SNI
  host_is_hostname = !(IPAddr.new(@host) rescue false)
  @hostname = case
              when host_is_hostname then @host
              when @name then @name
              else nil
              end

  @usock = nil

  @username = server.username
  @password = server.password
  @shared_key = server.shared_key || (sender.security && sender.security.shared_key) || ""
  @shared_key_salt = generate_salt

  @unpacker = Fluent::Engine.msgpack_unpacker

  @resolved_host = nil
  @resolved_time = 0
  @resolved_once = false
end

Public Instance Methods

available?() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 588
def available?
  @available
end
check_helo(ri, message) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 801
def check_helo(ri, message)
  @log.debug "checking helo"
  # ['HELO', options(hash)]
  unless message.size == 2 && message[0] == 'HELO'
    return false
  end
  opts = message[1] || {}
  # make shared_key_check failed (instead of error) if protocol version mismatch exist
  ri.shared_key_nonce = opts['nonce'] || ''
  ri.auth = opts['auth'] || ''
  true
end
check_pong(ri, message) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 833
def check_pong(ri, message)
  @log.debug "checking pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)]
  unless message.size == 5 && message[0] == 'PONG'
    return false, 'invalid format for PONG message'
  end
  _pong, auth_result, reason, hostname, shared_key_hexdigest = message

  unless auth_result
    return false, 'authentication failed: ' + reason
  end

  if hostname == @sender.security.self_hostname
    return false, 'same hostname between input and output: invalid configuration'
  end

  clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(ri.shared_key_nonce).update(@shared_key).hexdigest
  unless shared_key_hexdigest == clientside
    return false, 'shared key mismatch'
  end

  return true, nil
end
disable!() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 592
def disable!
  @available = false
end
establish_connection(sock, ri) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 613
def establish_connection(sock, ri)
  while available? && ri.state != :established
    begin
      # TODO: On Ruby 2.2 or earlier, read_nonblock doesn't work expectedly.
      # We need rewrite around here using new socket/server plugin helper.
      buf = sock.read_nonblock(@sender.read_length)
      if buf.empty?
        sleep @sender.read_interval
        next
      end
      @unpacker.feed_each(buf) do |data|
        on_read(sock, ri, data)
      end
    rescue IO::WaitReadable
      # If the exception is Errno::EWOULDBLOCK or Errno::EAGAIN, it is extended by IO::WaitReadable.
      # So IO::WaitReadable can be used to rescue the exceptions for retrying read_nonblock.
      # https//docs.ruby-lang.org/en/2.3.0/IO.html#method-i-read_nonblock
      sleep @sender.read_interval unless ri.state == :established
    rescue SystemCallError => e
      @log.warn "disconnected by error", host: @host, port: @port, error: e
      disable!
      break
    rescue EOFError
      @log.warn "disconnected", host: @host, port: @port
      disable!
      break
    end
  end
end
generate_ping(ri) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 814
def generate_ping(ri)
  @log.debug "generating ping"
  # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt)
    .update(@sender.security.self_hostname)
    .update(ri.shared_key_nonce)
    .update(@shared_key)
    .hexdigest
  ping = ['PING', @sender.security.self_hostname, @shared_key_salt, shared_key_hexdigest]
  if !ri.auth.empty?
    password_hexdigest = Digest::SHA512.new.update(ri.auth).update(@username).update(@password).hexdigest
    ping.push(@username, password_hexdigest)
  else
    ping.push('','')
  end
  ping
end
generate_salt() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 797
def generate_salt
  SecureRandom.hex(16)
end
heartbeat(detect=true) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 785
def heartbeat(detect=true)
  now = Time.now.to_f
  @failure.add(now)
  if detect && !@available && @failure.sample_size > @sender.recover_sample_size
    @available = true
    @log.warn "recovered forwarding server '#{@name}'", host: @host, port: @port
    true
  else
    nil
  end
end
on_read(sock, ri, data) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 858
def on_read(sock, ri, data)
  @log.trace __callee__

  case ri.state
  when :helo
    unless check_helo(ri, data)
      @log.warn "received invalid helo message from #{@name}"
      disable! # shutdown
      return
    end
    sock.write(generate_ping(ri).to_msgpack)
    ri.state = :pingpong
  when :pingpong
    succeeded, reason = check_pong(ri, data)
    unless succeeded
      @log.warn "connection refused to #{@name || @host}: #{reason}"
      disable! # shutdown
      return
    end
    ri.state = :established
    @log.debug "connection established", host: @host, port: @port
  else
    raise "BUG: unknown session state: #{ri.state}"
  end
end
resolved_host() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 726
def resolved_host
  case @sender.expire_dns_cache
  when 0
    # cache is disabled
    resolve_dns!

  when nil
    # persistent cache
    @resolved_host ||= resolve_dns!

  else
    now = Fluent::Engine.now
    rh = @resolved_host
    if !rh || now - @resolved_time >= @sender.expire_dns_cache
      rh = @resolved_host = resolve_dns!
      @resolved_time = now
    end
    rh
  end
end
send_data(tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 674
def send_data(tag, chunk)
  sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
  begin
    send_data_actual(sock, tag, chunk)
  rescue
    sock.close rescue nil
    raise
  end

  if @sender.require_ack_response
    return sock # to read ACK from socket
  end

  sock.close_write rescue nil
  sock.close rescue nil
  heartbeat(false)
  nil
end
send_data_actual(sock, tag, chunk) click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 643
def send_data_actual(sock, tag, chunk)
  ri = RequestInfo.new(@sender.security ? :helo : :established)
  if ri.state != :established
    establish_connection(sock, ri)
  end

  unless available?
    raise ConnectionClosedError, "failed to establish connection with node #{@name}"
  end

  option = { 'size' => chunk.size, 'compressed' => @compress }
  option['chunk'] = Base64.encode64(chunk.unique_id) if @sender.require_ack_response

  # https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#packedforward-mode
  # out_forward always uses str32 type for entries.
  # str16 can store only 64kbytes, and it should be much smaller than buffer chunk size.

  tag = tag.dup.force_encoding(Encoding::UTF_8)

  sock.write @sender.forward_header                    # array, size=3
  sock.write tag.to_msgpack                            # 1. tag: String (str)
  chunk.open(compressed: @compress) do |chunk_io|
    entries = [0xdb, chunk_io.size].pack('CN')
    sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32)
    IO.copy_stream(chunk_io, sock)                     #    writeRawBody(packed_es)
  end
  sock.write option.to_msgpack                         # 3. option: Hash(map)

  # TODO: use bin32 for non-utf8 content(entries) when old msgpack-ruby (0.5.x or earlier) not supported
end
send_heartbeat() click to toggle source

FORWARD_TCP_HEARTBEAT_DATA = FORWARD_HEADER + ''.to_msgpack + [].to_msgpack

# File lib/fluent/plugin/out_forward.rb, line 694
def send_heartbeat
  begin
    dest_addr = resolved_host
    @resolved_once = true
  rescue ::SocketError => e
    if !@resolved_once && @sender.ignore_network_errors_at_startup
      @log.warn "failed to resolve node name in heartbeating", server: @name || @host, error: e
      return
    end
    raise
  end

  case @sender.heartbeat_type
  when :transport
    @sender.create_transfer_socket(dest_addr, port, @hostname) do |sock|
      ## don't send any data to not cause a compatibility problem
      # sock.write FORWARD_TCP_HEARTBEAT_DATA

      # successful tcp connection establishment is considered as valid heartbeat.
      # When heartbeat is succeeded after detached, return true. It rebuilds weight array.
      heartbeat(true)
    end
  when :udp
    @usock.send "\0", 0, Socket.pack_sockaddr_in(@port, resolved_host)
    nil
  when :none # :none doesn't use this class
    raise "BUG: heartbeat_type none must not use Node"
  else
    raise "BUG: unknown heartbeat_type '#{@sender.heartbeat_type}'"
  end
end
standby?() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 596
def standby?
  @standby
end
tick() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 755
def tick
  now = Time.now.to_f
  if !@available
    if @failure.hard_timeout?(now)
      @failure.clear
    end
    return nil
  end

  if @failure.hard_timeout?(now)
    @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, hard_timeout: true
    @available = false
    @resolved_host = nil  # expire cached host
    @failure.clear
    return true
  end

  if @sender.phi_failure_detector
    phi = @failure.phi(now)
    if phi > @sender.phi_threshold
      @log.warn "detached forwarding server '#{@name}'", host: @host, port: @port, phi: phi, phi_threshold: @sender.phi_threshold
      @available = false
      @resolved_host = nil  # expire cached host
      @failure.clear
      return true
    end
  end
  false
end
validate_host_resolution!() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 584
def validate_host_resolution!
  resolved_host
end
verify_connection() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 600
def verify_connection
  sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
  begin
    ri = RequestInfo.new(@sender.security ? :helo : :established)
    if ri.state != :established
      establish_connection(sock, ri)
      raise if ri.state != :established
    end
  ensure
    sock.close
  end
end

Private Instance Methods

resolve_dns!() click to toggle source
# File lib/fluent/plugin/out_forward.rb, line 747
def resolve_dns!
  addrinfo_list = Socket.getaddrinfo(@host, @port, nil, Socket::SOCK_STREAM)
  addrinfo = @sender.dns_round_robin ? addrinfo_list.sample : addrinfo_list.first
  @sockaddr = Socket.pack_sockaddr_in(addrinfo[1], addrinfo[3]) # used by on_heartbeat
  addrinfo[3]
end