class Fluent::Plugin::TailInput::PositionFile

Constants

UNWATCHED_POSITION

Public Class Methods

compact(file) click to toggle source

Clean up unwatched file entries

# File lib/fluent/plugin/in_tail.rb, line 917
def self.compact(file)
  file.pos = 0
  existent_entries = file.each_line.map { |line|
    m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
    unless m
      $log.warn "Unparsable line in pos_file: #{line}"
      next
    end
    path = m[1]
    pos = m[2].to_i(16)
    ino = m[3].to_i(16)
    # 32bit inode converted to 64bit at this phase
    pos == UNWATCHED_POSITION ? nil : ("%s\t%016x\t%016x\n" % [path, pos, ino])
  }.compact

  file.pos = 0
  file.truncate(0)
  file.write(existent_entries.join)
end
new(file, file_mutex, map, last_pos) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 874
def initialize(file, file_mutex, map, last_pos)
  @file = file
  @file_mutex = file_mutex
  @map = map
  @last_pos = last_pos
end
parse(file) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 895
def self.parse(file)
  compact(file)

  file_mutex = Mutex.new
  map = {}
  file.pos = 0
  file.each_line {|line|
    m = /^([^\t]+)\t([0-9a-fA-F]+)\t([0-9a-fA-F]+)/.match(line)
    unless m
      $log.warn "Unparsable line in pos_file: #{line}"
      next
    end
    path = m[1]
    pos = m[2].to_i(16)
    ino = m[3].to_i(16)
    seek = file.pos - line.bytesize + path.bytesize + 1
    map[path] = FilePositionEntry.new(file, file_mutex, seek, pos, ino)
  }
  new(file, file_mutex, map, file.pos)
end

Public Instance Methods

[](path) click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 881
def [](path)
  if m = @map[path]
    return m
  end

  @file_mutex.synchronize {
    @file.pos = @last_pos
    @file.write "#{path}\t0000000000000000\t0000000000000000\n"
    seek = @last_pos + path.bytesize + 1
    @last_pos = @file.pos
    @map[path] = FilePositionEntry.new(@file, @file_mutex, seek, 0, 0)
  }
end