class Fluent::MessagePackEventStream
Public Class Methods
new(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
click to toggle source
Keep cached_unpacker argument for existing plugins
# File lib/fluent/event.rb, line 203 def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil) @data = data @size = size @unpacked_times = unpacked_times @unpacked_records = unpacked_records end
Public Instance Methods
dup()
click to toggle source
# File lib/fluent/event.rb, line 214 def dup if @unpacked_times self.class.new(@data.dup, nil, @size, unpacked_times: @unpacked_times, unpacked_records: @unpacked_records.map(&:dup)) else self.class.new(@data.dup, nil, @size) end end
each(&block)
click to toggle source
# File lib/fluent/event.rb, line 253 def each(&block) if @unpacked_times @unpacked_times.each_with_index do |time, i| block.call(time, @unpacked_records[i]) end else @unpacked_times = [] @unpacked_records = [] msgpack_unpacker.feed_each(@data) do |time, record| @unpacked_times << time @unpacked_records << record block.call(time, record) end @size = @unpacked_times.size end nil end
empty?()
click to toggle source
# File lib/fluent/event.rb, line 210 def empty? @data.empty? end
ensure_unpacked!()
click to toggle source
# File lib/fluent/event.rb, line 233 def ensure_unpacked! return if @unpacked_times && @unpacked_records @unpacked_times = [] @unpacked_records = [] msgpack_unpacker.feed_each(@data) do |time, record| @unpacked_times << time @unpacked_records << record end # @size should be updated always right after unpack. # The real size of unpacked objects are correct, rather than given size. @size = @unpacked_times.size end
repeatable?()
click to toggle source
# File lib/fluent/event.rb, line 229 def repeatable? true end
size()
click to toggle source
# File lib/fluent/event.rb, line 222 def size # @size is unbelievable always when @size == 0 # If the number of events is really zero, unpacking events takes very short time. ensure_unpacked! if @size == 0 @size end
slice(index, num)
click to toggle source
This method returns MultiEventStream, because there are no reason to surve binary serialized by msgpack.
# File lib/fluent/event.rb, line 248 def slice(index, num) ensure_unpacked! MultiEventStream.new(@unpacked_times.slice(index, num), @unpacked_records.slice(index, num)) end
to_msgpack_stream(time_int: false)
click to toggle source
# File lib/fluent/event.rb, line 271 def to_msgpack_stream(time_int: false) # time_int is always ignored because @data is always packed binary in this class @data end