最后编写一段日志收集的agent:

    #!/usr/bin/perl
    package NginxLogCollector;
    use Moo;
    use MooX::Options;
    use Message::Passing::DSL;
    use MooX::Types::MooseLike::Base qw/ Str /;
    use namespace::clean -except => [qw( meta _options_data _options_config )];
    with 'Message::Passing::Role::Script';
    option filename => (
        is => 'ro',
        isa => Str,
        default => sub { '/data/nginx/logs/access.log' },
    );
    option rabbitmq => (
        is => 'ro',
        isa => Str,
        default => sub { '10.3.18.199' },
    );
    sub build_chain {
        my $self = shift;
        message_chain {
            output rabbitmq => (
                class => 'AMQP',
                exchange_name => 'logcollect',
    # 目前测试结果,发现Input::AMQP无法接收到非topic的exchange
    # CPAN上有关RabbitMQ的模块都是这个哥们写的,POD简略到没有一样,表示无语下
    #            exchange_type => 'direct',
                hostname => $self->rabbitmq,
                username => 'guest',
                password => 'guest',
            );
            output debug => (
                class => 'STDOUT',
            );
            encoder("encoder",
                class => 'JSON',
                output_to => 'rabbitmq',
                output_to => 'debug',
            );
            filter grok => (
                class => 'GrokLike',
                output_to => 'encoder',
            );
            filter logstash => (
                class => 'ToLogstash',
                output_to => 'grok',
            );
            decoder("decoder",
                class => 'JSON',
                output_to => 'logstash',
            );
            input nginxlog => (
                class => 'FileTail',
                output_to => 'decoder',
                filename => $self->filename,
           );
        };
    }
    __PACKAGE__->start unless caller;
    1;

目前就做到这步,之后从rabbitmq里往elasticsearch写的还没搞。从上面的chain可以很清除的看到和logstash一样的管道思想,input->decoder->filter->encoder->output。巧的是两种写法中,decode/encode那个写法跟puppet的DSL定义特别的像。哈哈~

继续贴汇总入库的agent代码:

#!/usr/bin/perl
use Moo;
use MooX::Options;
use Message::Passing::DSL;
use MooX::Types::MooseLike::Base qw/ Str /;
use namespace::clean -except => [qw( meta _options_data _options_config )];

with 'Message::Passing::Role::Script';

option elasticsearch_servers => (
    is => 'ro',
    isa => Str,
    default => sub { '10.3.18.199:9200' },
);

option rabbitmq => (
    is => 'ro',
    isa => Str,
    default => sub { '10.3.18.199' },
);

sub build_chain {
    my $self = shift;
    message_chain {
        output elasticsearch => (
            class => 'ElasticSearch',
            elasticsearch_servers => [$self->elasticsearch_servers],
        );
        decoder decoder => (
            class => 'JSON',
            output_to => 'elasticsearch',
        );
        input rabbitmq => (
            class => 'AMQP',
            exchange_name => 'logcollect',
            queue_name => 'logcollect',
            hostname => $self->rabbitmq,
            username => 'guest',
            password => 'guest',
            output_to => 'decoder',
        );
    };
}
__PACKAGE__->start unless caller;
1;

总的来说,原有模块相互之间都有些不太协调,用的时候还是要自己改改。比如这里,原版的Output::ElasticSearch是吧之前传递过来的整个message放进@fields里的,这跟Filter::ToLogstash的message结构是完全冲突的。所以需要修改Output::ElasticSearch里的bulk_index()的data=>$data,就可以了,不要多改动。

整个代码变动,参加个人github:https://github.com/chenryn/Message-Passing.git