class Fluent::Plugin::HttpInput::Handler

Attributes

content_type[R]

Public Class Methods

new(io, km, callback, body_size_limit, format_name, log, cors_allow_origins) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_http.rb, line 270
def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins)
  super(io)
  @km = km
  @callback = callback
  @body_size_limit = body_size_limit
  @next_close = false
  @format_name = format_name
  @log = log
  @cors_allow_origins = cors_allow_origins
  @idle = 0
  @km.add(self)

  @remote_port, @remote_addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil
end

Public Instance Methods

closing?() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 488
def closing?
  @next_close
end
handle_options_request() click to toggle source

Web browsers can send an OPTIONS request before performing POST to check if cross-origin requests are supported.

# File lib/fluent/plugin/in_http.rb, line 376
def handle_options_request
  # Is CORS enabled in the first place?
  if @cors_allow_origins.nil?
    return send_response_and_close("403 Forbidden", {}, "")
  end

  # in_http does not support HTTP methods except POST
  if @access_control_request_method != 'POST'
    return send_response_and_close("403 Forbidden", {}, "")
  end

  header = {
    "Access-Control-Allow-Methods" => "POST",
    "Access-Control-Allow-Headers" => @access_control_request_headers || "",
  }

  # Check the origin and send back a CORS response
  if @cors_allow_origins.include?('*')
    header["Access-Control-Allow-Origin"] = "*"
    send_response_and_close("200 OK", header, "")
  elsif @cors_allow_origins.include?(@origin)
    header["Access-Control-Allow-Origin"] = @origin
    send_response_and_close("200 OK", header, "")
  else
    send_response_and_close("403 Forbidden", {}, "")
  end
end
on_body(chunk) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 364
def on_body(chunk)
  if @body.bytesize + chunk.bytesize > @body_size_limit
    unless closing?
      send_response_and_close("413 Request Entity Too Large", {}, "Too large")
    end
    return
  end
  @body << chunk
end
on_close() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 289
def on_close
  @km.delete(self)
end
on_connect() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 293
def on_connect
  @parser = Http::Parser.new(self)
end
on_headers_complete(headers) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 310
def on_headers_complete(headers)
  expect = nil
  size = nil

  if @parser.http_version == [1, 1]
    @keep_alive = true
  else
    @keep_alive = false
  end
  @env = {}
  @content_type = ""
  @content_encoding = ""
  headers.each_pair {|k,v|
    @env["HTTP_#{k.gsub('-','_').upcase}"] = v
    case k
    when /Expect/i
      expect = v
    when /Content-Length/i
      size = v.to_i
    when /Content-Type/i
      @content_type = v
    when /Content-Encoding/i
      @content_encoding = v
    when /Connection/i
      if v =~ /close/i
        @keep_alive = false
      elsif v =~ /Keep-alive/i
        @keep_alive = true
      end
    when /Origin/i
      @origin  = v
    when /X-Forwarded-For/i
      # For multiple X-Forwarded-For headers. Use first header value.
      v = v.first if v.is_a?(Array)
      @remote_addr = v.split(",").first
    when /Access-Control-Request-Method/i
      @access_control_request_method = v
    when /Access-Control-Request-Headers/i
      @access_control_request_headers = v
    end
  }
  if expect
    if expect == '100-continue'
      if !size || size < @body_size_limit
        send_response_nobody("100 Continue", {})
      else
        send_response_and_close("413 Request Entity Too Large", {}, "Too large")
      end
    else
      send_response_and_close("417 Expectation Failed", {}, "")
    end
  end
end
on_message_begin() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 306
def on_message_begin
  @body = ''
end
on_message_complete() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 404
def on_message_complete
  return if closing?

  if @parser.http_method == 'OPTIONS'
    return handle_options_request()
  end

  # CORS check
  # ==========
  # For every incoming request, we check if we have some CORS
  # restrictions and white listed origins through @cors_allow_origins.
  unless @cors_allow_origins.nil?
    unless @cors_allow_origins.include?('*') or @cors_allow_origins.include?(@origin)
      send_response_and_close("403 Forbidden", {'Connection' => 'close'}, "")
      return
    end
  end

  # Content Encoding
  # =================
  # Decode payload according to the "Content-Encoding" header.
  # For now, we only support 'gzip' and 'deflate'.
  begin
    if @content_encoding == 'gzip'
      @body = Zlib::GzipReader.new(StringIO.new(@body)).read
    elsif @content_encoding == 'deflate'
      @body = Zlib::Inflate.inflate(@body)
    end
  rescue
    @log.warn 'fails to decode payload', error: $!.to_s
    send_response_and_close("400 Bad Request", {}, "")
    return
  end

  @env['REMOTE_ADDR'] = @remote_addr if @remote_addr

  uri = URI.parse(@parser.request_url)
  params = WEBrick::HTTPUtils.parse_query(uri.query)

  if @format_name != 'default'
    params[EVENT_RECORD_PARAMETER] = @body
  elsif @content_type =~ /^application\/x-www-form-urlencoded/
    params.update WEBrick::HTTPUtils.parse_query(@body)
  elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/
    boundary = WEBrick::HTTPUtils.dequote($1)
    params.update WEBrick::HTTPUtils.parse_form_data(@body, boundary)
  elsif @content_type =~ /^application\/json/
    params['json'] = @body
  elsif @content_type =~ /^application\/msgpack/
    params['msgpack'] = @body
  end
  path_info = uri.path

  params.merge!(@env)
  @env.clear

  code, header, body = *@callback.call(path_info, params)
  body = body.to_s

  unless @cors_allow_origins.nil?
    if @cors_allow_origins.include?('*')
      header['Access-Control-Allow-Origin'] = '*'
    elsif @cors_allow_origins.include?(@origin)
      header['Access-Control-Allow-Origin'] = @origin
    end
  end

  if @keep_alive
    header['Connection'] = 'Keep-Alive'
    send_response(code, header, body)
  else
    send_response_and_close(code, header, body)
  end
end
on_read(data) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 297
def on_read(data)
  @idle = 0
  @parser << data
rescue
  @log.warn "unexpected error", error: $!.to_s
  @log.warn_backtrace
  close
end
on_write_complete() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 479
def on_write_complete
  close if @next_close
end
send_response(code, header, body) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 492
def send_response(code, header, body)
  header['Content-Length'] ||= body.bytesize
  header['Content-Type'] ||= 'text/plain'

  data = %[HTTP/1.1 #{code}\r\n]
  header.each_pair {|k,v|
    data << "#{k}: #{v}\r\n"
  }
  data << "\r\n"
  write data

  write body
end
send_response_and_close(code, header, body) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 483
def send_response_and_close(code, header, body)
  send_response(code, header, body)
  @next_close = true
end
send_response_nobody(code, header) click to toggle source
# File lib/fluent/plugin/in_http.rb, line 506
def send_response_nobody(code, header)
  data = %[HTTP/1.1 #{code}\r\n]
  header.each_pair {|k,v|
    data << "#{k}: #{v}\r\n"
  }
  data << "\r\n"
  write data
end
step_idle() click to toggle source
# File lib/fluent/plugin/in_http.rb, line 285
def step_idle
  @idle += 1
end