class Fluent::Plugin::ElasticsearchInput
Constants
- DEFAULT_RELOAD_AFTER
- DEFAULT_STORAGE_TYPE
- METADATA
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elasticsearch.rb, line 58 def initialize super end
Public Instance Methods
backend_options()
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 100 def backend_options case @http_backend when :excon { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass } when :typhoeus require 'typhoeus' { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass } end rescue LoadError => ex log.error_backtrace(ex.backtrace) raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}" end
client(host = nil)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 205 def client(host = nil) # check here to see if we already have a client connection for the given host connection_options = get_connection_options(host) @_es = nil unless is_existing_connection(connection_options[:hosts]) @_es ||= begin @current_config = connection_options[:hosts].clone adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options } local_reload_connections = @reload_connections if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER local_reload_connections = @reload_after end headers = { 'Content-Type' => "application/json" }.merge(@custom_headers) transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new( connection_options.merge( options: { reload_connections: local_reload_connections, reload_on_failure: @reload_on_failure, resurrect_after: @resurrect_after, logger: @transport_logger, transport_options: { headers: headers, request: { timeout: @request_timeout }, ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } }, http: { user: @user, password: @password }, sniffer_class: @sniffer_class, }), &adapter_conf) Elasticsearch::Client.new transport: transport end end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elasticsearch.rb, line 62 def configure(conf) super @timestamp_parser = create_time_parser @backend_options = backend_options raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil? if @user && m = @user.match(/%{(?<user>.*)}/) @user = URI.encode_www_form_component(m["user"]) end if @password && m = @password.match(/%{(?<password>.*)}/) @password = URI.encode_www_form_component(m["password"]) end @transport_logger = nil if @with_transporter_log @transport_logger = log log_level = conf['@log_level'] || conf['log_level'] log.warn "Consider to specify log_level with @log_level." unless log_level end @current_config = nil # Specify @sniffer_class before calling #client. @sniffer_class = nil begin @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name rescue Exception => ex raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}" end @options = { :index => @index_name, :scroll => @scroll, :size => @size } @base_query = @query end
convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z")
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 193 def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z") numeric_time_parser = Fluent::NumericTimeParser.new(:float) Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(timestamp_key_format) end
create_time_parser()
click to toggle source
once fluent v0.14 is released we might be able to use Fluent::Parser::TimeParser, but it doesn't quite do what we want - if gives
- sec,nsec
-
where as we want something we can call `strftime` on…
# File lib/fluent/plugin/in_elasticsearch.rb, line 166 def create_time_parser if @timestamp_key_format begin # Strptime doesn't support all formats, but for those it does it's # blazingly fast. strptime = Strptime.new(@timestamp_key_format) Proc.new { |value| value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric) strptime.exec(value).to_time } rescue # Can happen if Strptime doesn't recognize the format; or # if strptime couldn't be required (because it's not installed -- it's # ruby 2 only) Proc.new { |value| value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric) DateTime.strptime(value, @timestamp_key_format).to_time } end else Proc.new { |value| value = convert_numeric_time_into_string(value) if value.is_a?(Numeric) DateTime.parse(value).to_time } end end
get_connection_options(con_host=nil)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 125 def get_connection_options(con_host=nil) hosts = if con_host || @hosts (con_host || @hosts).split(',').map do |host_str| # Support legacy hosts format host:port,host:port,host:port... if host_str.match(%r{^[^:]+(\:\d+)?$}) { host: host_str.split(':')[0], port: (host_str.split(':')[1] || @port).to_i, scheme: @scheme.to_s } else # New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic uri = URI(get_escaped_userinfo(host_str)) %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key| hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == '' hash end end end.compact else [{host: @host, port: @port, scheme: @scheme.to_s}] end.each do |host| host.merge!(user: @user, password: @password) if !host[:user] && @user host.merge!(path: @path) if !host[:path] && @path end { hosts: hosts } end
get_escaped_userinfo(host_str)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 113 def get_escaped_userinfo(host_str) if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/) m["scheme"] + URI.encode_www_form_component(m["user"]) + ':' + URI.encode_www_form_component(m["password"]) + m["path"] else host_str end end
is_existing_connection(host)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 243 def is_existing_connection(host) # check if the host provided match the current connection return false if @_es.nil? return false if @current_config.nil? return false if host.length != @current_config.length for i in 0...host.length if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port] return false end end return true end
parse_time(value, event_time, tag)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 198 def parse_time(value, event_time, tag) @timestamp_parser.call(value) rescue => e router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e) return Time.at(event_time).to_time end
process_events(hit, es)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 300 def process_events(hit, es) event = hit["_source"] time = Fluent::Engine.now if @parse_timestamp if event.has_key?(TIMESTAMP_FIELD) rts = event[TIMESTAMP_FIELD] time = parse_time(rts, time, @tag) end end if @docinfo docinfo_target = event[@docinfo_target] || {} unless docinfo_target.is_a?(Hash) raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event end @docinfo_fields.each do |field| docinfo_target[field] = hit[field] end event[@docinfo_target] = docinfo_target end es.add(time, event) end
process_next_scroll_request(es, scroll_id)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 294 def process_next_scroll_request(es, scroll_id) result = process_scroll_request(scroll_id) result['hits']['hits'].each { |hit| process_events(hit, es) } {'has_hits' => result['hits']['hits'].any?, '_scroll_id' => result['_scroll_id']} end
process_scroll_request(scroll_id)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 290 def process_scroll_request(scroll_id) client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll) end
run()
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 258 def run return run_slice if @num_slices <= 1 log.warn("Large slice number is specified:(#{@num_slices}). Consider reducing num_slices") if @num_slices > 8 @num_slices.times.map do |slice_id| thread_create(:"in_elasticsearch_thread_#{slice_id}") do run_slice(slice_id) end end end
run_slice(slice_id=nil)
click to toggle source
# File lib/fluent/plugin/in_elasticsearch.rb, line 270 def run_slice(slice_id=nil) slice_query = @base_query slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil? result = client.search(@options.merge(:body => Yajl.dump(slice_query) )) es = Fluent::MultiEventStream.new result["hits"]["hits"].each {|hit| process_events(hit, es)} has_hits = result['hits']['hits'].any? scroll_id = result['_scroll_id'] while has_hits && scroll_id result = process_next_scroll_request(es, scroll_id) has_hits = result['has_hits'] scroll_id = result['_scroll_id'] end router.emit_stream(@tag, es) client.clear_scroll(scroll_id: scroll_id) if scroll_id end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_elasticsearch.rb, line 157 def start super timer_execute(:in_elasticsearch_timer, @interval, repeat: @repeat, &method(:run)) end