class Fluent::Compat::BufferedOutput
Constants
- BUFFER_PARAMS
Private Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Plugin::Output.new
# File lib/fluent/compat/output.rb, line 384 def initialize super unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end
propagate_default_params()
click to toggle source
# File lib/fluent/compat/output.rb, line 248 def self.propagate_default_params BUFFER_PARAMS end
Private Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::Output#configure
# File lib/fluent/compat/output.rb, line 253 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => "interval", "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end conf.elements << Fluent::Config::Element.new('buffer', '', buf_params, []) end @includes_record_filter = self.class.ancestors.include?(Fluent::Compat::RecordFilterMixin) methods_of_plugin = self.class.instance_methods(false) @overrides_emit = methods_of_plugin.include?(:emit) # RecordFilter mixin uses its own #format_stream method implementation @overrides_format_stream = methods_of_plugin.include?(:format_stream) || @includes_record_filter ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 unless @buffer_config.chunk_keys.empty? raise Fluent::ConfigError, "this plugin '#{self.class}' cannot handle arguments for <buffer ...> section" end end self.extend BufferedChunkMixin if @overrides_emit self.singleton_class.module_eval do attr_accessor :last_emit_via_buffer end output_plugin = self m = Module.new do define_method(:emit) do |key, data, chain| # receivers of this method are buffer instances output_plugin.last_emit_via_buffer = [key, data] end end @buffer.extend m end end
detach_multi_process(&block)
click to toggle source
# File lib/fluent/compat/output.rb, line 409 def detach_multi_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end
detach_process(&block)
click to toggle source
# File lib/fluent/compat/output.rb, line 404 def detach_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end
emit(tag, es, chain, key="")
click to toggle source
original implementation of v0.12 BufferedOutput
# File lib/fluent/compat/output.rb, line 311 def emit(tag, es, chain, key="") # this method will not be used except for the case that plugin calls super @emit_count += 1 data = format_stream(tag, es) if @buffer.emit(key, data, chain) submit_flush end end
extract_placeholders(str, metadata)
click to toggle source
# File lib/fluent/compat/output.rb, line 380 def extract_placeholders(str, metadata) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end
format_stream(tag, es)
click to toggle source
# File lib/fluent/compat/output.rb, line 324 def format_stream(tag, es) # this method will not be used except for the case that plugin calls super out = '' es.each do |time, record| out << format(tag, time, record) end out end
handle_stream_simple(tag, es, enqueue: false)
click to toggle source
This method overrides Fluent::Plugin::Output#handle_stream_simple because v0.12 BufferedOutput may overrides format_stream, but original handle_stream_simple method doesn't consider about it
# File lib/fluent/compat/output.rb, line 338 def handle_stream_simple(tag, es, enqueue: false) if @overrides_emit current_emit_count = @emit_count size = es.size key = data = nil begin emit(tag, es, NULL_OUTPUT_CHAIN) key, data = self.last_emit_via_buffer ensure @emit_count = current_emit_count self.last_emit_via_buffer = nil end # on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically meta = @buffer.metadata(variables: (key && !key.empty? ? {key: key} : nil)) write_guard do @buffer.write({meta => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } return [meta] end if @overrides_format_stream meta = metadata(nil, nil, nil) size = es.size bulk = format_stream(tag, es) write_guard do @buffer.write({meta => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } return [meta] end meta = metadata(nil, nil, nil) size = es.size data = es.map{|time,record| format(tag, time, record) } write_guard do @buffer.write({meta => data}, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += size } [meta] end
start()
click to toggle source
Calls superclass method
Fluent::Plugin::Output#start
# File lib/fluent/compat/output.rb, line 391 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end
submit_flush()
click to toggle source
# File lib/fluent/compat/output.rb, line 320 def submit_flush # nothing todo: blank method to be called from #emit of 3rd party plugins end
support_in_v12_style?(feature)
click to toggle source
# File lib/fluent/compat/output.rb, line 210 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then true end end