class Fluent::Plugin::TailInput::TailWatcher::IOHandler
Public Class Methods
new(watcher, &receive_lines)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 703 def initialize(watcher, &receive_lines) @watcher = watcher @receive_lines = receive_lines @fifo = FIFO.new(@watcher.from_encoding || Encoding::ASCII_8BIT, @watcher.encoding || Encoding::ASCII_8BIT) @iobuf = ''.force_encoding('ASCII-8BIT') @lines = [] @io = nil @notify_mutex = Mutex.new @watcher.log.info "following tail of #{@watcher.path}" end
Public Instance Methods
close()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 752 def close if @io && !@io.closed? @io.close @io = nil end end
handle_notify()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 718 def handle_notify with_io do |io| begin read_more = false if !io.nil? && @lines.empty? begin while true @fifo << io.readpartial(2048, @iobuf) while (line = @fifo.next_line) @lines << line end if @lines.size >= @watcher.read_lines_limit # not to use too much memory in case the file is very large read_more = true break end end rescue EOFError end end unless @lines.empty? if @receive_lines.call(@lines) @watcher.pe.update_pos(io.pos - @fifo.bytesize) @lines.clear else read_more = false end end end while read_more end end
on_notify()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 714 def on_notify @notify_mutex.synchronize { handle_notify } end
open()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 763 def open io = Fluent::FileWrapper.open(@watcher.path) io.seek(@watcher.pe.read_pos + @fifo.bytesize) io rescue RangeError io.close if io raise WatcherSetupError, "seek error with #{@watcher.path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}" rescue Errno::ENOENT nil end
opened?()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 759 def opened? !!@io end
with_io() { |io| ... }
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 774 def with_io begin if @watcher.open_on_every_update io = open begin yield io ensure io.close unless io.nil? end else @io ||= open yield @io end rescue WatcherSetupError => e close raise e rescue @watcher.log.error $!.to_s @watcher.log.error_backtrace close end end