当你需要给一个集群的某项服务做简单的排他性管理的时候,强力推荐Veripool公司的一系列模块:IPC::Locker、Schedule::Load。

今天先说IPC::Locker模块。部署很简单,直接在集群所有节点上运行cpanm IPC::Locker即可。该模块依赖几个都是perl的核心模块比如IO::Socket::INET、IO::Poll和POSIX。所以理论上你也可以把代码打个包分发。

随包分发的还有几个现成的脚本程序lockerd、lockersh、pidstat、pidstatd和pidwatch。

后面三个关注的remote设备上的pid是否存在等,但是相信一般情况下,我们不会自己来通过pid管理集群,所以在使用上只要理解lockerd和lockersh其实也是用pidstatd来解决pid问题的就够了。

其实代码很简单,看看就明白,无非就是lockerd用的IPC::Locker::Server是启动了一个IO::Socket::INET做tcp server,主要维护几个东西,一个是@{$self->{lock}}列表,一个是@{$self->{host}}列表,一个是$self->{locked}的Bool值。

而lockersh用的IPC::Locker则是连接上lockerd的端口,检查$self->{locked}状态,如果没locked就发送LOCK请求,然后fork一个进行exec你定义的shell命令,执行完成后,unlock发送UNLOCK请求给lockerd。

做个简单实验:

  1. 在serverA上运行lockerd &
  2. 在serverB上运行lockersh –dhost serverA –lock test_task ‘while true;do echo “OK”;done’
  3. 在serverC上运行lockersh –dhost serverA –lock test_task ‘while true;do echo “OK”;done’
  4. 在serverD上运行lockersh –dhost serverA –lock other_task ‘while true;do echo “OK”;done’

观察一下,结果是在serverB和serverD上同时在执行echo “OK”。而serverC被lock住了。继续:

  1. 在serverB的session上按下Ctrl+C终止程序,然后再次运行上述命令
  2. 在serverC的session上按下Ctrl+C终止程序

观察一下,结果是停止B时C的即开始,停止C的后B的继续。这些都不影响serverD的运行。

  1. 终止serverD的程序,改为运行lockersh –dhost serverA –lock test_task ‘while true;do echo “OK”;done’

观察一下,发现B、C、D是按照lockersh的执行次序解锁的。因为hostlist是一个列表,在server上是用for循环的。

注意:必须要先运行lockerd并且保证不中途退出。经过测试,如果lockerd中途退出再重新运行的话,因为locklist是保存在内存里会丢失的。结果就会出现之前的lockersh还在执行(他已经获得了lock,在unlock之前不会再和server通信的),之后再启动的新lockersh会在新lockerd上又获得一次lock的情况……

后一个Schedule::Load则可以根据集群设备的loadavg,top等,决定在哪台设备上运行job。还没测试。之后再记录。

补充:贴一个脚本,仿照lockersh改写的squid集群重启及报警控制:

#!/usr/bin/perl -w
use FindBin;
use lib "$FindBin::Bin/../lib";

use strict;
use warnings;
use autodie;
use vars qw ($Debug);
use Furl;
use IO::File;
use Getopt::Long;

use IPC::Locker;
use IPC::PidStat;

#======================================================================

my $pscount = `ps aux|grep -v grep|grep $0|wc -l`;
print "Already run, waiting for lock now" and exit unless $pscount == 1;

#======================================================================

my %server_params = (lock=>[]);
my $cluserv;

$Debug = 0;
Getopt::Long::config ("require_order");
if (! GetOptions (
                  "dhost=s"     => sub {shift; $server_params{host} = shift;},
                  "cluster=s"   => sub {shift; push @{$server_params{lock}}, split(':',shift);},
                  "port=i"      => sub {shift; $server_params{port} = shift;},
                  "timeout=i"   => sub {shift; $server_params{timeout} = shift;},
                  "verbose!"    => sub {shift; $server_params{verbose} = shift;},
                  "debug"       => \&debug,
                  "service=s"   => \$cluserv,
                  )) {
    die "%Error: Bad usage, see lockersh --help\n";
}

$#{$server_params{lock}}>=0 or die "%Error: --cluster not specified; see lockersh --help\n";

# Fork once to start parent process
my $foreground_pid = $$;  # Unlike most forks, the job goes in the parent

# Do this while we still have STDERR.
my $lock  = new IPC::Locker (verbose=>0,
                             timeout=>0,
                             autounlock=>1,
                             destroy_unlock=>0,
                             %server_params,
                             );
$lock or die "%Error: Did not connect to lockerd,";
$lock->lock;

if (my $pid = fork()) {  # Parent process, foreground job
    print "\tForeground: $cluserv\n" if $Debug;
    # The child forks again quickly.  Sometimes, SIG_CHLD leaks to us and
    # wrecks the exec'd command, so wait for it now.
    my $rv = waitpid($pid, 0);
    if ($rv != $pid) {
        die "%Error: waitpid() returned $rv: $!";
    } elsif ($?) {
        die "%Error: Child process died with status $?,";
    }

    print "Exec in $$\n" if $Debug;
    &service($cluserv);
}
#else, rest is for child process.

# Disassociate from controlling terminal
POSIX::setsid() or die "%Error: Can't start a new session: $!";

# Change working directory
chdir "/";
open(STDIN,  "+>/dev/null") or die "%Error: Can't re-open STDIN: $!";
if (!$Debug) {
    open(STDOUT, "+>&STDIN");
    open(STDERR, "+>&STDIN");
}
# Prevent possibility of acquiring a controlling terminal
exit(0) if fork();

# Wait for child to complete.  We can't waitpid, as we're not the parent
while (IPC::PidStat::local_pid_exists($foreground_pid)) { sleep 1; }
print "Parent $foreground_pid completed\n" if $Debug;

# Unlock
$lock->unlock; $lock=undef;
print "Child exiting\n" if $Debug;

sub debug {
    $Debug = 1;
    $IPC::Locker::Debug = 1;
}

sub service {
    my $cluserv = shift;
    die "Only support squid now!" unless $cluserv eq "squid";
    die "Reload failed. Check squid.conf!" if eval "${cluserv}_reload";
    while (1) {
        my $hit_rate = eval "${cluserv}_check";
        notify "HIT Ratio: ${hit_rate}% now.\n";
        exit if $hit_rate > 50;
        sleep 300;
    };
}

sub squid_check {
    my $hit_rate;
    print "Run squid_check" if $Debug;
    my $squid_port = `awk '/^http_port/{print $2}' /etc/squid/squid.conf`;
    open my $fh, "squidclient -p ${squid_port} mgr:info |";
    while (<$fh>) {
        next unless /^\s+Request Hit Ratios:\s+5min:\s*(-?\d+\.\d)%,/;
        print "regex $1" if $Debug;
        $hit_rate = $1;
        last;
    }
    close $fh;
    return $hit_rate;
}

sub squid_reload {
    print "Reload squid daemon. Do not reload within 10 mins of squid start" if $Debug;
    system("squid", "-k", "reconfigure");
    return $?;
}

sub notify {
    my $furl = Furl->new(agent => "Clustrol/0.1");
    $furl->post("http://monitor.domain.com/eml/",
        [ data => "$_" ],
    );
}

__END__