class Fluent::StreamInput

obsolete

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Compat::Input.new
# File lib/fluent/plugin/in_unix.rb, line 31
def initialize
  require 'socket'
  require 'yajl'
  super
end

Public Instance Methods

run() click to toggle source

def listen end

# File lib/fluent/plugin/in_unix.rb, line 58
def run
  @loop.run(@blocking_timeout)
rescue
  log.error "unexpected error", error: $!.to_s
  log.error_backtrace
end
shutdown() click to toggle source
Calls superclass method Fluent::Compat::Input#shutdown
# File lib/fluent/plugin/in_unix.rb, line 46
def shutdown
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @lsock.close
  @thread.join

  super
end
start() click to toggle source
Calls superclass method Fluent::Compat::Input#start
# File lib/fluent/plugin/in_unix.rb, line 37
def start
  super

  @loop = Coolio::Loop.new
  @lsock = listen
  @loop.attach(@lsock)
  @thread = Thread.new(&method(:run))
end

Private Instance Methods

on_message(msg) click to toggle source

message Entry {

1: long time
2: object record

}

message Forward {

1: string tag
2: list<Entry> entries

}

message PackedForward {

1: string tag
2: raw entries  # msgpack stream of Entry

}

message Message {

1: string tag
2: long? time
3: object record

}

# File lib/fluent/plugin/in_unix.rb, line 87
def on_message(msg)
  # TODO format error
  tag = msg[0].to_s
  entries = msg[1]

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries)
    router.emit_stream(tag, es)

  elsif entries.class == Array
    # Forward
    es = MultiEventStream.new
    entries.each {|e|
      record = e[1]
      next if record.nil?
      time = e[0]
      time = (now ||= Engine.now) if time.to_i == 0
      es.add(time, record)
    }
    router.emit_stream(tag, es)

  else
    # Message
    record = msg[2]
    return if record.nil?

    time = msg[1]
    time = Engine.now if time.to_i == 0
    router.emit(tag, time, record)
  end
end