最后编写一段日志收集的agent:
perl
#!/usr/bin/perl
package NginxLogCollector;
use Moo;
use MooX::Options;
use Message::Passing::DSL;
use MooX::Types::MooseLike::Base qw/ Str /;
use namespace::clean -except => [qw( meta _options_data _options_config )];
with 'Message::Passing::Role::Script';
option filename => (
is => 'ro',
isa => Str,
default => sub { '/data/nginx/logs/access.log' },
);
option rabbitmq => (
is => 'ro',
isa => Str,
default => sub { '10.3.18.199' },
);
sub build_chain {
my $self = shift;
message_chain {
output rabbitmq => (
class => 'AMQP',
exchange_name => 'logcollect',
# 目前测试结果,发现Input::AMQP无法接收到非topic的exchange
# CPAN上有关RabbitMQ的模块都是这个哥们写的,POD简略到没有一样,表示无语下
# exchange_type => 'direct',
hostname => $self->rabbitmq,
username => 'guest',
password => 'guest',
);
output debug => (
class => 'STDOUT',
);
encoder("encoder",
class => 'JSON',
output_to => 'rabbitmq',
output_to => 'debug',
);
filter grok => (
class => 'GrokLike',
output_to => 'encoder',
);
filter logstash => (
class => 'ToLogstash',
output_to => 'grok',
);
decoder("decoder",
class => 'JSON',
output_to => 'logstash',
);
input nginxlog => (
class => 'FileTail',
output_to => 'decoder',
filename => $self->filename,
);
};
}
__PACKAGE__->start unless caller;
1;
目前就做到这步,之后从rabbitmq里往elasticsearch写的还没搞。从上面的chain可以很清除的看到和logstash一样的管道思想,input->decoder->filter->encoder->output。巧的是两种写法中,decode/encode那个写法跟puppet的DSL定义特别的像。哈哈~
继续阅读……