module Fluent::ServerModule

Public Instance Methods

after_run() click to toggle source
# File lib/fluent/supervisor.rb, line 69
def after_run
  stop_rpc_server if @rpc_endpoint
  stop_counter_server if @counter
  Fluent::Supervisor.cleanup_resources
end
before_run() click to toggle source
# File lib/fluent/supervisor.rb, line 43
def before_run
  @start_time = Time.now
  @rpc_server = nil
  @counter = nil

  if config[:rpc_endpoint]
    @rpc_endpoint = config[:rpc_endpoint]
    @enable_get_dump = config[:enable_get_dump]
    run_rpc_server
  end
  install_supervisor_signal_handlers

  if config[:signame]
    @signame = config[:signame]
    install_windows_event_handler
  end

  if counter = config[:counter_server]
    run_counter_server(counter)
  end

  socket_manager_path = ServerEngine::SocketManager::Server.generate_path
  ServerEngine::SocketManager::Server.open(socket_manager_path)
  ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
end
install_supervisor_signal_handlers() click to toggle source
# File lib/fluent/supervisor.rb, line 149
def install_supervisor_signal_handlers
  trap :HUP do
    $log.debug "fluentd supervisor process get SIGHUP"
    supervisor_sighup_handler
  end unless Fluent.windows?

  trap :USR1 do
    $log.debug "fluentd supervisor process get SIGUSR1"
    supervisor_sigusr1_handler
  end unless Fluent.windows?
end
install_windows_event_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 161
def install_windows_event_handler
  Thread.new do
    ev = Win32::Event.new(@signame)
    begin
      ev.reset
      until WaitForSingleObject(ev.handle, 0) == WAIT_OBJECT_0
        sleep 1
      end
      kill_worker
      stop(true)
    ensure
      ev.close
    end
  end
end
kill_worker() click to toggle source
# File lib/fluent/supervisor.rb, line 198
def kill_worker
  if config[:worker_pid]
    pids = config[:worker_pid].clone
    config[:worker_pid].clear
    pids.each_value do |pid|
      if Fluent.windows?
        Process.kill :KILL, pid
      else
        Process.kill :TERM, pid
      end
    end
  end
end
run_counter_server(counter_conf) click to toggle source
# File lib/fluent/supervisor.rb, line 137
def run_counter_server(counter_conf)
  @counter = Fluent::Counter::Server.new(
    counter_conf.scope,
    {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path}
  )
  @counter.start
end
run_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 75
def run_rpc_server
  @rpc_server = RPC::Server.new(@rpc_endpoint, $log)

  # built-in RPC for signals
  @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
    Process.kill :INT, $$
    nil
  }
  @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.killWorkers request"
    Process.kill :TERM, $$
    nil
  }
  @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res|
    $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request"
    if Fluent.windows?
      $log.warn "operation 'flushBuffersAndKillWorkers' is not supported on Windows now."
    else
      Process.kill :USR1, $$
      Process.kill :TERM, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
    $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
    unless Fluent.windows?
      Process.kill :USR1, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.reload') { |req, res|
    $log.debug "fluentd RPC got /api/config.reload request"
    if Fluent.windows?
      # restart worker with auto restarting by killing
      kill_worker
    else
      Process.kill :HUP, $$
    end
    nil
  }
  @rpc_server.mount_proc('/api/config.dump') { |req, res|
    $log.debug "fluentd RPC got /api/config.dump request"
    $log.info "dump in-memory config"
    supervisor_dump_config_handler
    nil
  }

  @rpc_server.mount_proc('/api/config.getDump') { |req, res|
    $log.debug "fluentd RPC got /api/config.dump request"
    $log.info "get dump in-memory config via HTTP"
    res.body = supervisor_get_dump_config_handler
    [nil, nil, res]
  } if @enable_get_dump

  @rpc_server.start
end
stop_counter_server() click to toggle source
# File lib/fluent/supervisor.rb, line 145
def stop_counter_server
  @counter.stop
end
stop_rpc_server() click to toggle source
# File lib/fluent/supervisor.rb, line 133
def stop_rpc_server
  @rpc_server.shutdown
end
supervisor_dump_config_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 212
def supervisor_dump_config_handler
  $log.info config[:fluentd_conf].to_s
end
supervisor_get_dump_config_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 216
def supervisor_get_dump_config_handler
  {conf: config[:fluentd_conf].to_s}
end
supervisor_sighup_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 177
def supervisor_sighup_handler
  kill_worker
end
supervisor_sigusr1_handler() click to toggle source
# File lib/fluent/supervisor.rb, line 181
def supervisor_sigusr1_handler
  if log = config[:logger_initializer]
    # Creating new thread due to mutex can't lock
    # in main thread during trap context
    Thread.new {
      log.reopen!
    }.run
  end

  if config[:worker_pid]
    config[:worker_pid].each_value do |pid|
      Process.kill(:USR1, pid)
      # don't rescue Errno::ESRCH here (invalid status)
    end
  end
end