class Fluent::Plugin::SecondaryFileOutput

Constants

DIR_PERMISSION
FILE_PERMISSION
PLACEHOLDER_REGEX

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method Fluent::Compat::Output#configure
# File lib/fluent/plugin/out_secondary_file.rb, line 38
def configure(conf)
  super

  unless @as_secondary
    raise Fluent::ConfigError, "This plugin can only be used in the <secondary> section"
  end

  if @basename.include?("/")
    raise Fluent::ConfigError, "basename should not include `/`"
  end

  @path_without_suffix = File.join(@directory, @basename)
  validate_compatible_with_primary_buffer!(@path_without_suffix)

  @suffix = case @compress
            when :text
              ""
            when :gzip
              ".gz"
            end

  test_path = @path_without_suffix
  unless Fluent::FileUtil.writable_p?(test_path)
    raise Fluent::ConfigError, "out_secondary_file: `#{@directory}` should be writable"
  end

  @dir_perm = system_config.dir_permission || DIR_PERMISSION
  @file_perm = system_config.file_permission || FILE_PERMISSION
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 68
def multi_workers_ready?
  ### TODO: add hack to synchronize for multi workers
  true
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 73
def write(chunk)
  path_without_suffix = extract_placeholders(@path_without_suffix, chunk)
  path = generate_path(path_without_suffix)
  FileUtils.mkdir_p File.dirname(path), mode: @dir_perm

  case @compress
  when :text
    File.open(path, "ab", @file_perm) {|f|
      f.flock(File::LOCK_EX)
      chunk.write_to(f)
    }
  when :gzip
    File.open(path, "ab", @file_perm) {|f|
      f.flock(File::LOCK_EX)
      gz = Zlib::GzipWriter.new(f)
      chunk.write_to(gz)
      gz.close
    }
  end

  path
end

Private Instance Methods

generate_path(path_without_suffix) click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 120
def generate_path(path_without_suffix)
  if @append
    "#{path_without_suffix}#{@suffix}"
  else
    i = 0
    loop do
      path = "#{path_without_suffix}.#{i}#{@suffix}"
      return path unless File.exist?(path)
      i += 1
    end
  end
end
has_time_format?(str) click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 116
def has_time_format?(str)
  str != Time.now.strftime(str)
end
validate_compatible_with_primary_buffer!(path_without_suffix) click to toggle source
# File lib/fluent/plugin/out_secondary_file.rb, line 98
def validate_compatible_with_primary_buffer!(path_without_suffix)
  placeholders = path_without_suffix.scan(PLACEHOLDER_REGEX).flat_map(&:first) # to trim suffix [\d+]

  if !@chunk_key_time && has_time_format?(path_without_suffix)
    raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder, remove time formats, like `%Y%m%d`, from basename or directory"
  end

  if !@chunk_key_tag && (ph = placeholders.find { |placeholder| placeholder.match(/tag(\[\d+\])?/) })
    raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove tag placeholder, like `${tag}`, from basename or directory"
  end

  vars = placeholders.reject { |placeholder| placeholder.match(/tag(\[\d+\])?/) || (placeholder == 'chunk_id') }

  if ph = vars.find { |v| !@chunk_keys.include?(v) }
    raise Fluent::ConfigError, "out_secondary_file: basename or directory has an incompatible placeholder #{ph}, remove variable placeholder, like `${varname}`, from basename or directory"
  end
end