上一篇说到在 JRuby 上利用 netty 库实现事件驱动。事实上,为了让 Ruby 程序员更习惯,foxbat 模块是把 netty 库封装成 eventmachine 的接口来提供给用户使用的。所以,我们可以把程序写得更通用一些:
if defined?(JRUBY_VERSION)
require 'foxbat'
end
require 'eventmachine'
require 'socket'
module SyslogRecv
def initialize(options)
@output_queue = options[:queue]
@codec = options[:codec]
@grok_filter = options[:grok_filter]
@date_filter = options[:date_filter]
end
def syslog_relay(event)
@grok_filter.filter(event)
if event["tags"].nil? || !event["tags"].include?("_grokparsefailure")
event["timestamp"] = event["timestamp8601"] if event.include?("timestamp8601")
@date_filter.filter(event)
else
@logger.info? && @logger.info("NOT SYSLOG", :message => event["message"])
end
end
def post_init
(@@connections ||= []) << self
end
def receive_data(data)
@@connections.each do |client|
if defined?(JRUBY_VERSION)
ip = client.get_peername.getAddress.getHostAddress
port = client.get_peername.getPort
else
port, ip = Socket.unpack_sockaddr_in(client.get_peername)
end
::LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")
@codec.decode(data) do |event|
event["host"] = ip
syslog_relay(event)
@output_queue << event
end
end
end
end
end
def run(output_queue)
@logger.info("Starting syslog tcp listener", :address => "#{@host}:#{@port}")
EventMachine::run do
EventMachine::start_server @host, @port, SyslogRecv, {
:queue => output_queue,
:codec => @codec,
:grok_filter => @grok_filter,
:date_filter => @date_filter
}
end
end
初次用 EventMachine,发现写法还蛮奇怪的。start_server
传递参数必须是 module 或者 class,然后变量只能随后通过额外的哈希传递进去。
木有看 CPP 的 EM 实现,看这里 foxbat 的实现,发现在 JRuby 里使用 Java 还真是简单啊:
#!/usr/bin/env ruby
require "java"
require File.join(File.dirname(__FILE__), "netty-3.2.4.Final.jar")
require File.join(File.dirname(__FILE__), "syslogdecoder.jar")
java_import "com.loggly.syslog.SyslogDecoder"
java_import "org.jboss.netty.channel.SimpleChannelHandler"
java_import "org.jboss.netty.channel.ChannelPipelineFactory"
java_import "org.jboss.netty.channel.Channels"
java_import "org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory"
java_import "org.jboss.netty.bootstrap.ServerBootstrap"
class SyslogServerHandler < SimpleChannelHandler
class << self
include ChannelPipelineFactory
def getPipeline
return Channels.pipeline(SyslogDecoder.new, self.new)
end # def getPipeline
end # class << self
def initialize
super
end # def initialize
def messageReceived(context, event)
e = event.getMessage.toString
print('.')
end # def messageReceived
def exceptionCaught(context, exception)
exception.getCause.printStackTrace
exception.getChannel.close
end # def exceptionCaught
end # class SyslogServerHandler
class RubySyslogServer
def initialize(host, port)
@factory = NioServerSocketChannelFactory.new(
java.util.concurrent.Executors.newCachedThreadPool(),
java.util.concurrent.Executors.newCachedThreadPool()
)
@bootstrap = ServerBootstrap.new(@factory)
@bootstrap.setPipelineFactory(SyslogServerHandler)
@bootstrap.setOption("child.tcpNoDelay", true);
@bootstrap.setOption("child.keepAlive", true);
@host = host
@port = port
end # def initialize
def start
address = java.net.InetSocketAddress.new(@host, @port)
return @bootstrap.bind(address)
end # def start
end # class SyslogServer
if __FILE__ == $0
host = ARGV[0]
port = ARGV[1].to_i
RubySyslogServer.new(host, port).start
end
直接加载 jar 包,导入各种类。然后就能照样用了。