一年前搞的一套小脚本,今天翻博客发现没发过,现在发上来好了。主要背景是这样:考虑到有 DNS 和 HTTP 劫持需要监控,但是很多 DNS 服务器对非本区域本运营商的来源请求是拒绝做出响应的,所以得把监控点分散到各地去。其实做这个事情用 nagios 的分布式就足够了,不过如果想做即时触发的紧急任务,就算在 nagios 页面上点击立刻执行,到返回全部结果也得有一阵子。所以选择了自己写一套分布式的异步系统。

中控端脚本如下:

#!/usr/bin/perl
use Modern::Perl;
use AnyEvent;
use AnyEvent::Redis::RipeRedis;
use Storable qw/freeze thaw/;
use YAML::Syck;
use utf8;
my $area = $ARGV[0];
my $domain = 'fmn.xnimg.cn';
my $master = '10.4.1.21';
my $cv     = AnyEvent->condvar;
my $redis  = AnyEvent::Redis::RipeRedis->new(
    host     => $master,
    port     => 6379,
    encoding => 'utf8',
);
my $dnslist = LoadFile("DNS.yml");
for my $isp ( sort keys %$dnslist ) {
    if ( defined $area ) {
        next unless defined $dnslist->{$isp}->{$area};
        say $area, $isp, join ", ", @{ $dnslist->{$isp}->{$area} };
        my $data = freeze({ domain => $domain, dnslist => $dnslist->{$isp}->{$area} });
        $redis->publish( 'task', $data );
    } else {
        for my $list ( sort keys %{ $dnslist->{$isp} } ) {
            my $data = freeze({ domain => $domain, dnslist => $dnslist->{$isp}->{$list} });
            $cv->begin;
            $redis->publish( 'task', $data );
        }
    }
}
$redis->subscribe(
    qw( report ),
    {
        on_done => sub {
            my $ch_name  = shift;
            my $subs_num = shift;
            print "Subscribed: $ch_name. Active: $subs_num\n";
        },
        on_message => sub {
            my $ch_name = shift;
            my $msg     = thaw( shift );
            printf "%s A %s @%s in %s got %s length %s\n", $domain, $msg->{ip}, $msg->{dns}, $msg->{local}, $msg->{status}, $msg->{len};
            $cv->end;
        },
        on_error => sub {
            print @_;
        },
    }
);
$cv->recv;

分布在各地的客户端脚本如下:

#!/usr/bin/perl
use Modern::Perl;
use AnyEvent;
use AnyEvent::Socket;
use AnyEvent::DNS;
use AnyEvent::Redis::RipeRedis;
use AnyEvent::HTTP;
use Storable qw/freeze thaw/;
use Digest::MD5 qw/md5_hex/;
use utf8;
my $master = '10.4.1.21';
my $local  = '192.168.0.2';
my $cv     = AnyEvent->condvar;
my $redisr = AnyEvent::Redis::RipeRedis->new(
    host          => $master,
    port          => 6379,
    encoding      => 'utf8',
);
my $redisp = AnyEvent::Redis::RipeRedis->new(
    host          => $master,
    port          => 6379,
    encoding      => 'utf8',
);
$redisr->subscribe(
    'task',
    {
        on_done => sub {
            my $ch_name  = shift;
            my $subs_num = shift;
            print "Subscribed: $ch_name. Active: $subs_num\n";
        },
        on_message => sub {
            my $ch_name = shift;
            my $msg     = thaw(shift);
            for my $dns ( @{ $msg->{dnslist} } ) {
                resolv( $dns, $msg->{domain} );
            }
        },
        on_error => sub {
            my $err_msg  = shift;
            my $err_code = shift;
            print "Error: ($err_code) $err_msg\n";
        },
    }
);
$cv->recv;
sub resolv {
    my ( $dns, $domain ) = @_;
    return unless $dns =~ m/^\d+/;
    my $resolver =
      AnyEvent::DNS->new( server => [ AnyEvent::Socket::parse_address $dns ], );
    $resolver->resolve(
        "$domain" => 'a',
        sub {
            httptest($dns, $domain, $_->[-1]) for @_;
        }
    );
}
sub httptest {
    my ($dns, $domain, $ip) = @_;
    my $url = "http://$domain/10k.html";
    my $begin = time;
    http_get $url, proxy => [$ip, 80], want_body_handle => 1, sub {
        my ($hdl, $hdr) = @_;
        my ($port, $peer) = AnyEvent::Socket::unpack_sockaddr getpeername $hdl->{'fh'};
        my $data = freeze( { dns => $dns, status => $hdr->{Status}, local => $local, ip => $peer, len => $hdr->{'content-length'} } );
        $redisp->publish('report', $data);
    };
}

这里需要单独建立两个 $redisr$redisp ,因为前一个已经用来 subscribe 之后就不能同时用于 publish 了,会报错。从理解上这是个很扯淡的事情,不过实际运行结果就是如此。。。