module Fluent::PluginHelper::Storage
Constants
- StorageState
Attributes
_storages[R]
Public Class Methods
included(mod)
click to toggle source
# File lib/fluent/plugin_helper/storage.rb, line 88 def self.included(mod) mod.include StorageParams end
new()
click to toggle source
Calls superclass method
Fluent::PluginHelper::Timer.new
# File lib/fluent/plugin_helper/storage.rb, line 94 def initialize super @_storages_started = false @_storages = {} # usage => storage_state end
Public Instance Methods
after_shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 166 def after_shutdown storage_operate(:after_shutdown) super end
before_shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 154 def before_shutdown storage_operate(:before_shutdown) super end
close()
click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 171 def close storage_operate(:close){|s| s.running = false } super end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 100 def configure(conf) super @storage_configs.each do |section| if !section.usage.empty? && section.usage !~ /^[a-zA-Z][-_.a-zA-Z0-9]*$/ raise Fluent::ConfigError, "Argument in <storage ARG> uses invalid characters: '#{section.usage}'" end if @_storages[section.usage] raise Fluent::ConfigError, "duplicated storages configured: #{section.usage}" end storage = Plugin.new_storage(section[:@type], parent: self) storage.configure(section.corresponding_config_element) @_storages[section.usage] = StorageState.new(wrap_instance(storage), false) end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin_helper/storage.rb, line 159 def shutdown storage_operate(:shutdown) do |s| s.storage.save if s.storage.save_at_shutdown end super end
start()
click to toggle source
Calls superclass method
Fluent::PluginHelper::Timer#start
# File lib/fluent/plugin_helper/storage.rb, line 116 def start super @_storages_started = true @_storages.each_pair do |usage, s| s.storage.start s.storage.load if s.storage.autosave && !s.storage.persistent timer_execute(:storage_autosave, s.storage.autosave_interval, repeat: true) do begin s.storage.save rescue => e log.error "plugin storage failed to save its data", usage: usage, type: type, error: e end end end s.running = true end end
stop()
click to toggle source
Calls superclass method
Fluent::PluginHelper::Timer#stop
# File lib/fluent/plugin_helper/storage.rb, line 148 def stop super # timer stops automatically in super storage_operate(:stop) end
storage_create(usage: '', type: nil, conf: nil, default_type: nil)
click to toggle source
# File lib/fluent/plugin_helper/storage.rb, line 32 def storage_create(usage: '', type: nil, conf: nil, default_type: nil) if conf && conf.respond_to?(:arg) && !conf.arg.empty? usage = conf.arg end if !usage.empty? && usage !~ /^[a-zA-Z][-_.a-zA-Z0-9]*$/ raise Fluent::ConfigError, "Argument in <storage ARG> uses invalid characters: '#{usage}'" end s = @_storages[usage] if s && s.running return s.storage elsif s # storage is already created, but not loaded / started else # !s type = if type type elsif conf && conf.respond_to?(:[]) raise Fluent::ConfigError, "@type is required in <storage>" unless conf['@type'] conf['@type'] elsif default_type default_type else raise ArgumentError, "BUG: both type and conf are not specified" end storage = Plugin.new_storage(type, parent: self) config = case conf when Fluent::Config::Element conf when Hash # in code, programmer may use symbols as keys, but Element needs strings conf = Hash[conf.map{|k,v| [k.to_s, v]}] Fluent::Config::Element.new('storage', usage, conf, []) when nil Fluent::Config::Element.new('storage', usage, {'@type' => type}, []) else raise ArgumentError, "BUG: conf must be a Element, Hash (or unspecified), but '#{conf.class}'" end storage.configure(config) if @_storages_started storage.start end s = @_storages[usage] = StorageState.new(wrap_instance(storage), false) end s.storage end
storage_operate(method_name, &block)
click to toggle source
# File lib/fluent/plugin_helper/storage.rb, line 137 def storage_operate(method_name, &block) @_storages.each_pair do |usage, s| begin block.call(s) if block_given? s.storage.send(method_name) rescue => e log.error "unexpected error while #{method_name}", usage: usage, storage: s.storage, error: e end end end
terminate()
click to toggle source
Calls superclass method
Fluent::PluginHelper::Timer#terminate
# File lib/fluent/plugin_helper/storage.rb, line 176 def terminate storage_operate(:terminate) @_storages = {} super end
wrap_instance(storage)
click to toggle source
# File lib/fluent/plugin_helper/storage.rb, line 182 def wrap_instance(storage) if storage.persistent && storage.persistent_always? storage elsif storage.persistent PersistentWrapper.new(storage) elsif !storage.synchronized? SynchronizeWrapper.new(storage) else storage end end