class Fluent::Counter::CleanupThread

Constants

CLEANUP_INTERVAL

Public Class Methods

new(store, mutex_hash, mutex) click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 104
def initialize(store, mutex_hash, mutex)
  @store = store
  @mutex_hash = mutex_hash
  @mutex = mutex
  @thread = nil
  @running = false
end

Public Instance Methods

start() click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 112
def start
  @running = true
  @thread = Thread.new do
    while @running
      sleep CLEANUP_INTERVAL
      run_once
    end
  end
end
stop() click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 122
def stop
  return unless @running
  @running = false
  begin
    # Avoid waiting CLEANUP_INTERVAL
    Timeout.timeout(1) do
      @thread.join
    end
  rescue Timeout::Error
    @thread.kill
  end
end

Private Instance Methods

run_once() click to toggle source
# File lib/fluent/counter/mutex_hash.rb, line 137
def run_once
  @mutex.synchronize do
    last_cleanup_at = (Time.now - CLEANUP_INTERVAL).to_i
    @mutex_hash.each do |(key, mutex)|
      v = @store.get(key, raw: true)
      next unless v
      next if last_cleanup_at < v['last_modified_at'][0] # v['last_modified_at'] = [sec, nsec]
      next unless mutex.try_lock

      @mutex_hash[key] = nil
      mutex.unlock

      # Check that a waiting thread is in a lock queue.
      # Can't get a lock here means this key is used in other places.
      # So restore a mutex value to a corresponding key.
      if mutex.try_lock
        @mutex_hash.delete(key)
        mutex.unlock
      else
        @mutex_hash[key] = mutex
      end
    end
  end
end