class Fluent::Plugin::TailInput::TailWatcher::IOHandler

Public Class Methods

new(watcher, &receive_lines) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 703
def initialize(watcher, &receive_lines)
  @watcher = watcher
  @receive_lines = receive_lines
  @fifo = FIFO.new(@watcher.from_encoding || Encoding::ASCII_8BIT, @watcher.encoding || Encoding::ASCII_8BIT)
  @iobuf = ''.force_encoding('ASCII-8BIT')
  @lines = []
  @io = nil
  @notify_mutex = Mutex.new
  @watcher.log.info "following tail of #{@watcher.path}"
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 752
def close
  if @io && !@io.closed?
    @io.close
    @io = nil
  end
end
handle_notify() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 718
def handle_notify
  with_io do |io|
    begin
      read_more = false

      if !io.nil? && @lines.empty?
        begin
          while true
            @fifo << io.readpartial(2048, @iobuf)
            while (line = @fifo.next_line)
              @lines << line
            end
            if @lines.size >= @watcher.read_lines_limit
              # not to use too much memory in case the file is very large
              read_more = true
              break
            end
          end
        rescue EOFError
        end
      end

      unless @lines.empty?
        if @receive_lines.call(@lines)
          @watcher.pe.update_pos(io.pos - @fifo.bytesize)
          @lines.clear
        else
          read_more = false
        end
      end
    end while read_more
  end
end
on_notify() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 714
def on_notify
  @notify_mutex.synchronize { handle_notify }
end
open() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 763
def open
  io = Fluent::FileWrapper.open(@watcher.path)
  io.seek(@watcher.pe.read_pos + @fifo.bytesize)
  io
rescue RangeError
  io.close if io
  raise WatcherSetupError, "seek error with #{@watcher.path}: file position = #{@watcher.pe.read_pos.to_s(16)}, reading bytesize = #{@fifo.bytesize.to_s(16)}"
rescue Errno::ENOENT
  nil
end
opened?() click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 759
def opened?
  !!@io
end
with_io() { |io| ... } click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 774
def with_io
  begin
    if @watcher.open_on_every_update
      io = open
      begin
        yield io
      ensure
        io.close unless io.nil?
      end
    else
      @io ||= open
      yield @io
    end
  rescue WatcherSetupError => e
    close
    raise e
  rescue
    @watcher.log.error $!.to_s
    @watcher.log.error_backtrace
    close
  end
end