Skip to content

Commit

Permalink
rewrite example script as module
Browse files Browse the repository at this point in the history
  • Loading branch information
sni committed Jul 26, 2023
1 parent 26667cd commit 39d2a3a
Showing 1 changed file with 82 additions and 37 deletions.
119 changes: 82 additions & 37 deletions examples/worker_command_tester
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ eval 'exec perl -x $0 ${1+"$@"} ;'
#line 35

##############################################
package WorkerCommandTester;

use strict;
use warnings;

use POSIX ();
use Pod::Usage;
use Getopt::Long;
Expand All @@ -49,21 +52,27 @@ use Thruk::Utils::Filter ();
# must come after the other modules
use Thread::Queue ();

##############################################
my $max_session_per_worker = 10;
our $max_session_per_worker = 10;
our @sshpids;
my $sockets_queue = Thread::Queue->new();

exit(main());

sub END {
if(scalar @sshpids > 0) {
CORE::kill(2, @sshpids)
}
}

##############################################
sub main {
sub new {
my ($class, %arg) = @_;
my $self = {};
bless $self, $class;
return $self;
}

##############################################
sub run {
my($self) = @_;
my $opt ={
'help' => 0,
'hostfilter' => undef,
Expand All @@ -73,6 +82,8 @@ sub main {
'target' => [],
'verbose' => 0,
'timeout' => 30,
'continue' => undef,
'retry' => undef,
};
Getopt::Long::Configure('no_ignore_case');
Getopt::Long::Configure('bundling');
Expand All @@ -83,6 +94,8 @@ sub main {
"b|backend=s" => \$opt->{'backend'},
"w|worker=i" => \$opt->{'worker'},
"t|timeout=i" => \$opt->{'timeout'},
"c|continue" => \$opt->{'continue'},
"r|retry=s" => \$opt->{'retry'},
"v|verbose" => sub { $opt->{'verbose'}++ },
"<>" => sub { push @{$opt->{'target'}}, $_[0]; },
) or pod2usage( { -verbose => 2, -message => 'error in options', -exit => 3 } );
Expand All @@ -97,12 +110,15 @@ sub main {
my $c = $cli->get_c();
Thruk::Action::AddDefaults::add_defaults($c);

$self->{'c'} = $c;
$self->{'opt'} = $opt;

if($opt->{'worker'} eq 'auto') {
$opt->{'worker'} = 10;
}

for my $x (1..(POSIX::ceil($opt->{'worker'} / $max_session_per_worker ))) {
my $pid = _start_ssh_master($c, $opt, $x-1);
my $pid = $self->_start_ssh_master($x-1);
_debug("ssh control master %d started, pid:%d", $x-1, $pid);
push @sshpids, $pid;
my $ctrl_path = sprintf(".ssh-wrk-%s.%d", $opt->{'target'}->[0], $x-1);
Expand All @@ -111,7 +127,7 @@ sub main {
}
}

my $rc = _run_checks($c, $opt);
my $rc = $self->_run_checks();
$sockets_queue->end();
if($rc) {
return(0);
Expand All @@ -121,9 +137,9 @@ sub main {

##############################################
sub _run_checks {
my($c, $opt) = @_;
my($self) = @_;

our $jobs = _get_jobs($c, $opt);
our $jobs = $self->_get_jobs();
our $failed = 0;
our $summary = {};

Expand All @@ -140,21 +156,9 @@ sub _run_checks {
my $numsize = length("$num_jobs");
my $nr = 0;
Thruk::Utils::scale_out(
scale => $opt->{'worker'},
scale => $self->{'opt'}->{'worker'},
jobs => $jobs,
worker => sub {
my($obj) = @_;
local $SIG{'INT'} = "DEFAULT";
my $err;
eval {
($err) = _check_object($c, $opt, $obj);
};
$err = $@ if $@;
if($err) {
_error($err);
}
return($err, $obj);
},
worker => sub { return($self->_worker(@_)); },
collect => sub {
my($item) = @_;
my($err, $obj) = @{$item};
Expand All @@ -177,7 +181,7 @@ sub _run_checks {
$rem = ($elapsed / $nr) * $rem_jobs;
$end = time() + $rem;
$rem = Thruk::Utils::Filter::duration($rem, 6);
$end = Thruk::Utils::Filter::date_format($c, $end);
$end = Thruk::Utils::Filter::date_format($self->{'c'}, $end);
}
}
printf("\033[JStatus: %0".$numsize."d/%d (%0.1f%%) check rate: %.1f/s | remaining duration: %s | expected end: %s \033[G",
Expand All @@ -194,11 +198,29 @@ sub _run_checks {
return _print_summary();
}

##############################################
sub _worker {
my($self, $obj) = @_;

local $SIG{'INT'} = "DEFAULT";
my $err;
eval {
($err) = $self->_check_object($obj);
};
$err = $@ if $@;
if($err) {
_error($err);
}
return($err, $obj);
}

##############################################
sub _get_jobs {
my($c, $opt) = @_;
my($self) = @_;
my $jobs = [];
my $all_commands = {};
my $c = $self->{'c'};
my $opt = $self->{'opt'};
my $commands = $c->db->get_commands();
_info("fetched %d commands", scalar @{$commands});
for my $cmd (@{$commands}) {
Expand Down Expand Up @@ -265,8 +287,10 @@ sub _print_summary {

##############################################
sub _check_object {
my($c, $opt, $obj) = @_;
my($self, $obj) = @_;

my $c = $self->{'c'};
my $opt = $self->{'opt'};
my($chkobj, $command, $name);
# host check
if($obj->{'name'}) {
Expand All @@ -278,7 +302,7 @@ sub _check_object {
$chkobj = $chkobj->[0];
$command = $c->db->expand_command('host' => $chkobj, 'command' => $obj->{'command'}, 'obfuscate' => 0 );
} else {
$name = sprintf("%s -%s", $obj->{'host_name'}, $obj->{'description'});
$name = sprintf("%s - %s", $obj->{'host_name'}, $obj->{'description'});
$chkobj = $c->db->get_services(filter => [{ host_name => $obj->{'host_name'}, description => $obj->{'description'} }], backend => [$obj->{'peer_key'}]);
if(scalar @{$chkobj} != 1) {
return sprintf("found %d services for name %s", scalar @{$chkobj}, $name);
Expand All @@ -289,17 +313,19 @@ sub _check_object {
}

if($command->{'note'}) {
_info("-" x 105);
_info("check command: %s", $command->{'line_expanded'});
return sprintf("failed to expand check command (%s): %s", $name, $command->{'note'});
}
if(!$command->{'line_expanded'}) {
return sprintf("got no command line for %s", $name);
}
my($rc, $output, $err) = _check_command($c, $opt, $command);
my($rc, $output, $err) = $self->_check_command($command);
return $err if $err;
$output = "" unless $output;
$output =~ s/\n.*$//sgmx; # limit to first line
$output =~ s/^(.*?)\|.*$/$1/gmx; # strip perf data
if(_compare_result($c, $opt, $name, $chkobj, $rc, $output)) {
if($self->_compare_result($name, $chkobj, $rc, $output)) {
# everything fine
return;
}
Expand All @@ -315,7 +341,7 @@ sub _check_object {

##############################################
sub _compare_result {
my($c, $opt, $name, $chkobj, $rc, $output) = @_;
my($self, $name, $chkobj, $rc, $output) = @_;
# translate host status
if($chkobj->{'name'}) {
if($chkobj->{'state'} == 0 && $rc != 0) {
Expand Down Expand Up @@ -350,6 +376,11 @@ sub _make_plugin_output_comparable {
$output =~ s/\QNo address associated with hostname\E/<DNS LOOKUP ERROR>/gmx; # rhel 9
$output =~ s/\QName or service not known\E/<DNS LOOKUP ERROR>/gmx; # rhel 7

$output =~ s/^\s+//gmx;
$output =~ s/\s+$//gmx;

$output =~ s/;/:/gmx; # naemon replaces semicolon with colons

return($output);
}

Expand All @@ -364,14 +395,14 @@ sub _shorten {

##############################################
sub _check_command {
my($c, $opt, $command) = @_;
my($self, $command) = @_;

my $ctrl_path = $sockets_queue->dequeue_timed(3); # get next free ctrl path
die("got no ctrl path in time") unless $ctrl_path;
my $target = $opt->{'target'}->[0];
my $target = $self->{'opt'}->{'target'}->[0];
my $cmd = [
"timeout",
$opt->{'timeout'},
$self->{'opt'}->{'timeout'},
"ssh",
"-o", "PasswordAuthentication=no",
"-o", "PreferredAuthentications=publickey",
Expand All @@ -382,25 +413,26 @@ sub _check_command {
$target,
$command->{'line_expanded'},
];
my($rc, $output) = Thruk::Utils::IO::cmd($c, $cmd, undef, undef, undef, undef, undef, 1);
my($rc, $output) = Thruk::Utils::IO::cmd($self->{'c'}, $cmd, undef, undef, undef, undef, undef, 1);
if($rc == 124) {
$rc = 3;
$output = sprintf("check timed out after %d seconds", $opt->{'timeout'});
$output = sprintf("check timed out after %d seconds", $self->{'opt'}->{'timeout'});
}
$sockets_queue->enqueue($ctrl_path); # put it back
return($rc, $output, undef);
}

##############################################
sub _start_ssh_master {
my($c, $opt, $nr) = @_;
my($self, $nr) = @_;
my $target = $self->{'opt'}->{'target'}->[0];

my $pid = fork();
if($pid == -1) { die("fork failed"); }
if($pid) { return($pid); }

undef $Thruk::Globals::c;
my $target = $opt->{'target'}->[0];
undef $self;
my $ctrl_path = ".ssh-wrk-$target.".$nr;
unlink($ctrl_path);
my $cmd = [
Expand Down Expand Up @@ -443,6 +475,17 @@ sub _start_ssh_master {
CORE::kill(2, $pid);
}

1;

##############################################
use strict;
use warnings;

##############################################

my $wrk = WorkerCommandTester->new();
exit($wrk->run());

##############################################

=head1 NAME
Expand All @@ -466,6 +509,8 @@ exceptions and similar.
--servicefilter filter service names by regular expression
-w|--worker specify the number of parallel checks
-t|--timeout specify timeout (in seconds) for each check
-c|--continue try to continue previuous run
-r|--retry use retry file and verify failed runs
-v|--verbose print additional debug information

=head1 EXAMPLE
Expand Down

0 comments on commit 39d2a3a

Please sign in to comment.