From 6f6466013b64af7e906dc12b5a49a39c127f0ac5 Mon Sep 17 00:00:00 2001 From: Tobi Oetiker Date: Fri, 27 Jul 2007 10:12:41 +0000 Subject: more changes on the road to the master/slave setup --- bin/smokeping.dist | 2 +- etc/config.dist | 7 ++ etc/smokeping_secrets.dist | 2 + lib/Smokeping.pm | 307 ++++++++++++++++++++++++--------------------- lib/Smokeping/Master.pm | 81 +++++++++++- lib/Smokeping/Slave.pm | 32 ++--- 6 files changed, 271 insertions(+), 160 deletions(-) create mode 100644 etc/smokeping_secrets.dist diff --git a/bin/smokeping.dist b/bin/smokeping.dist index faf3db6..8314102 100755 --- a/bin/smokeping.dist +++ b/bin/smokeping.dist @@ -1,7 +1,7 @@ #!/usr/sepp/bin/perl-5.8.4 -w # -*-perl-*- -use lib qw(/usr/pack/rrdtool-1.0.49-to/lib/perl); +use lib qw(/usr/pack/rrdtool-1.2.23-mo/lib/perl); use lib qw(lib); use Smokeping 2.001001; diff --git a/etc/config.dist b/etc/config.dist index 0a0910b..bf10e17 100644 --- a/etc/config.dist +++ b/etc/config.dist @@ -49,6 +49,12 @@ type = rtt pattern = <10,<10,<10,<10,<10,<100,>100,>100,>100 comment = routing mesed up again ? ++median +type = matcher +# in milli seconds +pattern = Median(old=>5,new=>7,diff=>2) +comment = median crossed + *** Database *** step = 300 @@ -170,6 +176,7 @@ host = www.iu.ali menu = U. C. Berkeley title = U. C. Berkeley Webserver host = www.berkly.udi +alerts = median +++ UCSD diff --git a/etc/smokeping_secrets.dist b/etc/smokeping_secrets.dist new file mode 100644 index 0000000..d44aef1 --- /dev/null +++ b/etc/smokeping_secrets.dist @@ -0,0 +1,2 @@ +host1:mysercert +host2:yoursercert diff --git a/lib/Smokeping.pm b/lib/Smokeping.pm index f6bcdd2..cd0f559 100644 --- a/lib/Smokeping.pm +++ b/lib/Smokeping.pm @@ -1325,135 +1325,104 @@ sub save_sortercache($$$){ rename "$dir/new$ext","$dir/data$ext.storable" } - -sub update_rrds($$$$$$); -sub update_rrds($$$$$$) { +sub check_alerts { my $cfg = shift; - my $probes = shift; my $tree = shift; + my $pings = shift; my $name = shift; - my $justthisprobe = shift; # if defined, update only the targets probed by this probe - my $sortercache = shift; - - my $probe = $tree->{probe}; - foreach my $prop (keys %{$tree}) { - - if (ref $tree->{$prop} eq 'HASH'){ - update_rrds $cfg, $probes, $tree->{$prop}, $name."/$prop", $justthisprobe, $sortercache; - } - # if we are looking down a branche where no probe property is set there is no sense - # in further exploring it - next unless defined $probe; - next if defined $justthisprobe and $probe ne $justthisprobe; - my $probeobj = $probes->{$probe}; - if ($prop eq 'host' and check_filter($cfg,$name)) { - #print "update $name\n"; - my $updatestring = $probeobj->rrdupdate_string($tree); - my $pings = $probeobj->_pings($tree); - if ( $tree->{rawlog} ){ - my $file = POSIX::strftime $tree->{rawlog},localtime(time); - if (open LOG,">>$name.$file.csv"){ - print LOG time,"\t",join("\t",split /:/,$updatestring),"\n"; - close LOG; - } else { - do_log "Warning: failed to open $file for logging: $!\n"; - } - } - my @update = ( $name.".rrd", - '--template',(join ":", "uptime", "loss", "median", - map { "ping${_}" } 1..$pings), - "N:".$updatestring - ); - do_debuglog("Calling RRDs::update(@update)"); - RRDs::update ( @update ); - my $ERROR = RRDs::error(); - do_log "RRDs::update ERROR: $ERROR\n" if $ERROR; - # check alerts - # disabled - my $gotalert; - if ( $tree->{alerts} ) { - my $priority_done; - $tree->{stack} = {loss=>['S'],rtt=>['S']} unless defined $tree->{stack}; - my $x = $tree->{stack}; - my ($loss,$rtt) = - (split /:/, $updatestring)[1,2]; - $loss = undef if $loss eq 'U'; - my $lossprct = $loss * 100 / $pings; - $rtt = undef if $rtt eq 'U'; - push @{$x->{loss}}, $lossprct; - push @{$x->{rtt}}, $rtt; - if (scalar @{$x->{loss}} > $tree->{fetchlength}){ - shift @{$x->{loss}}; - shift @{$x->{rtt}}; - } - for (sort { ($cfg->{Alerts}{$a}{priority}||0) - <=> ($cfg->{Alerts}{$b}{priority}||0)} @{$tree->{alerts}}) { - my $alert = $cfg->{Alerts}{$_}; - if ( not $alert ) { - do_log "WARNING: Empty alert in ".(join ",", @{$tree->{alerts}})." ($name)\n"; - next; - }; - if ( ref $alert->{sub} ne 'CODE' ) { - do_log "WARNING: Alert '$_' did not resolve to a Sub Ref. Skipping\n"; - next; - }; - my $prevmatch = $tree->{prevmatch}{$_} || 0; - - # add the current state of an edge triggered alert to the - # data passed into a matcher, which allows for somewhat - # more intelligent alerting due to state awareness. - $x->{prevmatch} = $prevmatch; - my $priority = $alert->{priority}; - my $match = &{$alert->{sub}}($x) || 0; # Avgratio returns undef - $gotalert = $match unless $gotalert; - my $edgetrigger = $alert->{edgetrigger} eq 'yes'; - my $what; - if ($edgetrigger and $prevmatch != $match) { - $what = ($prevmatch == 0 ? "was raised" : "was cleared"); - } - if (not $edgetrigger and $match) { - $what = "is active"; - } - if ($what and (not defined $priority or not defined $priority_done )) { - $priority_done = $priority if $priority and not $priority_done; - # send something - my $from; - my $line = "$name/$prop"; - my $base = $cfg->{General}{datadir}; - $line =~ s|^$base/||; - $line =~ s|/host$||; - $line =~ s|/|.|g; - do_log("Alert $_ $what for $line"); - my $urlline = $line; - $urlline = $cfg->{General}{cgiurl}."?target=".$line; - my $loss = "loss: ".join ", ",map {defined $_ ? (/^\d/ ? sprintf "%.0f%%", $_ :$_):"U" } @{$x->{loss}}; - my $rtt = "rtt: ".join ", ",map {defined $_ ? (/^\d/ ? sprintf "%.0fms", $_*1000 :$_):"U" } @{$x->{rtt}}; - my $time = time; - my @stamp = localtime($time); - my $stamp = localtime($time); - my @to; - foreach my $addr (map {$_ ? (split /\s*,\s*/,$_) : ()} $cfg->{Alerts}{to},$tree->{alertee},$alert->{to}){ - next unless $addr; - if ( $addr =~ /^\|(.+)/) { - my $cmd = $1; - if ($edgetrigger) { + my $prop = shift; + my $loss = shift; + my $rtt = shift; + my $slave = shift; + my $gotalert; + my $s = ""; + if ($slave) { + $s = '~'.$slave + } + if ( $tree->{alerts} ) { + my $priority_done; + $tree->{'stack'.$s} = {loss=>['S'],rtt=>['S']} unless defined $tree->{'stack'.$s}; + my $x = $tree->{'stack'.$s}; + $loss = undef if $loss eq 'U'; + my $lossprct = $loss * 100 / $pings; + $rtt = undef if $rtt eq 'U'; + push @{$x->{loss}}, $lossprct; + push @{$x->{rtt}}, $rtt; + if (scalar @{$x->{loss}} > $tree->{fetchlength}){ + shift @{$x->{loss}}; + shift @{$x->{rtt}}; + } + for (sort { ($cfg->{Alerts}{$a}{priority}||0) + <=> ($cfg->{Alerts}{$b}{priority}||0)} @{$tree->{alerts}}) { + my $alert = $cfg->{Alerts}{$_}; + if ( not $alert ) { + do_log "WARNING: Empty alert in ".(join ",", @{$tree->{alerts}})." ($name)\n"; + next; + }; + if ( ref $alert->{sub} ne 'CODE' ) { + do_log "WARNING: Alert '$_' did not resolve to a Sub Ref. Skipping\n"; + next; + }; + my $prevmatch = $tree->{'prevmatch'.$s}{$_} || 0; + + # add the current state of an edge triggered alert to the + # data passed into a matcher, which allows for somewhat + # more intelligent alerting due to state awareness. + $x->{prevmatch} = $prevmatch; + my $priority = $alert->{priority}; + my $match = &{$alert->{sub}}($x) || 0; # Avgratio returns undef + $gotalert = $match unless $gotalert; + my $edgetrigger = $alert->{edgetrigger} eq 'yes'; + my $what; + if ($edgetrigger and $prevmatch != $match) { + $what = ($prevmatch == 0 ? "was raised" : "was cleared"); + } + if (not $edgetrigger and $match) { + $what = "is active"; + } + if ($what and (not defined $priority or not defined $priority_done )) { + $priority_done = $priority if $priority and not $priority_done; + # send something + my $from; + my $line = "$name/$prop"; + my $base = $cfg->{General}{datadir}; + $line =~ s|^$base/||; + $line =~ s|/host$||; + $line =~ s|/|.|g; + $line .= "[from $slave]" if $slave; + do_log("Alert $_ $what for $line"); + my $urlline = $line; + $urlline = $cfg->{General}{cgiurl}."?target=".$line; + my $loss = "loss: ".join ", ",map {defined $_ ? (/^\d/ ? sprintf "%.0f%%", $_ :$_):"U" } @{$x->{loss}}; + my $rtt = "rtt: ".join ", ",map {defined $_ ? (/^\d/ ? sprintf "%.0fms", $_*1000 :$_):"U" } @{$x->{rtt}}; + my $time = time; + my @stamp = localtime($time); + my $stamp = localtime($time); + my @to; + foreach my $addr (map {$_ ? (split /\s*,\s*/,$_) : ()} $cfg->{Alerts}{to},$tree->{alertee},$alert->{to}){ + next unless $addr; + if ( $addr =~ /^\|(.+)/) { + my $cmd = $1; + if ($edgetrigger) { system $cmd,$_,$line,$loss,$rtt,$tree->{host}, ($what =~/raise/); - } else { + } else { system $cmd,$_,$line,$loss,$rtt,$tree->{host}; - } - } elsif ( $addr =~ /^snpp:(.+)/ ) { - sendsnpp $1, <{comment} $_ $what on $line $loss $rtt SNPPALERT - } else { - push @to, $addr; - } - }; - if (@to){ - my $default_mail = < <##WHAT##> on <##LINE##> <##STAMP##> @@ -1476,34 +1445,90 @@ Comment DOC my $mail = fill_template($alert->{mailtemplate}, - { - ALERT => $_, - WHAT => $what, - LINE => $line, - URL => $urlline, - STAMP => $stamp, - PAT => $alert->{pattern}, - LOSS => $loss, - RTT => $rtt, - COMMENT => $alert->{comment} - },$default_mail) || "Subject: smokeping failed to open mailtemplate '$alert->{mailtemplate}'\n\nsee subject\n"; - my $rfc2822stamp = strftime("%a, %e %b %Y %H:%M:%S %z", @stamp); - my $to = join ",",@to; - sendmail $cfg->{Alerts}{from},$to, < $_, + WHAT => $what, + LINE => $line, + URL => $urlline, + STAMP => $stamp, + PAT => $alert->{pattern}, + LOSS => $loss, + RTT => $rtt, + COMMENT => $alert->{comment} + },$default_mail) || "Subject: smokeping failed to open mailtemplate '$alert->{mailtemplate}'\n\nsee subject\n"; + my $rfc2822stamp = strftime("%a, %e %b %Y %H:%M:%S %z", @stamp); + my $to = join ",",@to; + sendmail $cfg->{Alerts}{from},$to, <{Alerts}{from} Date: $rfc2822stamp $mail ALERT - } - } else { + } + } else { do_debuglog("Alert \"$_\": no match for target $name\n"); - } - $tree->{prevmatch}{$_} = $match; - } - } # end alerts - update_sortercache $cfg,$sortercache,$name,$updatestring,$gotalert; - } + } + $tree->{'prevmatch'.$s}{$_} = $match; + } + } # end alerts + return $gotalert; +} + + +sub update_rrds($$$$$$); +sub update_rrds($$$$$$) { + my $cfg = shift; + my $probes = shift; + my $tree = shift; + my $name = shift; + my $justthisprobe = shift; # if defined, update only the targets probed by this probe + my $sortercache = shift; + + my $probe = $tree->{probe}; + foreach my $prop (keys %{$tree}) { + if (ref $tree->{$prop} eq 'HASH'){ + update_rrds $cfg, $probes, $tree->{$prop}, $name."/$prop", $justthisprobe, $sortercache; + } + # if we are looking down a branche where no probe property is set there is no sense + # in further exploring it + next unless defined $probe; + next if defined $justthisprobe and $probe ne $justthisprobe; + my $probeobj = $probes->{$probe}; + if ($prop eq 'host' and check_filter($cfg,$name)) { + my @slaves = (""); # we start with the nameles slave which is the master + if ($tree->{slaves}){ + push @slaves, split(/\s+/, $tree->{slaves}); + } + for my $slave (@slaves){ + my $updatestring = $probeobj->rrdupdate_string($tree); + my $pings = $probeobj->_pings($tree); + if ( $tree->{rawlog} ){ + my $file = POSIX::strftime $tree->{rawlog},localtime(time); + if (open LOG,">>$name.$file.csv"){ + print LOG time,"\t",join("\t",split /:/,$updatestring),"\n"; + close LOG; + } else { + do_log "Warning: failed to open $file for logging: $!\n"; + } + } + my @update = ( + $name.".rrd", + '--template', ( + join ":", "uptime", "loss", "median", + map { "ping${_}" } 1..$pings + ), + "N:".$updatestring + ); + do_debuglog("Calling RRDs::update(@update)"); + RRDs::update ( @update ); + my $ERROR = RRDs::error(); + do_log "RRDs::update ERROR: $ERROR\n" if $ERROR; + # check alerts + my ($loss,$rtt) = (split /:/, $updatestring)[1,2]; + my $gotalert = check_alerts $cfg,$tree,$pings,$name,$prop,$loss,$rtt; + update_sortercache $cfg,$sortercache,$name,$updatestring,$gotalert; + } + } } } diff --git a/lib/Smokeping/Master.pm b/lib/Smokeping/Master.pm index 9847841..11f61e9 100644 --- a/lib/Smokeping/Master.pm +++ b/lib/Smokeping/Master.pm @@ -2,9 +2,10 @@ package Smokeping::Master; use HTTP::Request; use Data::Dumper; -use Storable qw(dclone nfreeze); +use Storable qw(lock_nstore dclone lock_retrieve); use strict; use warnings; +use Fcntl qw(:flock); =head1 NAME @@ -48,8 +49,6 @@ sub get_targets { return ($ok ? \%return : undef); } - - sub extract_config { my $cfg = shift; my $slave = shift; @@ -73,9 +72,83 @@ sub extract_config { $node->{$last_key} = $cfg->{Slaves}{$slave}{override}{$override}; } } - return nfreeze \%slave_config; + return Dumper \%slave_config; } +=head3 save_updates (updates) + +When the cgi gets updates from a client, these updates are saved away, for +each 'target' so that the updates can be integrated into the relevant rrd +database by the rrd daemon as the next round of updates is processed. This +two stage process is chosen so that all results flow through the same code +path in the daemon. + +=cut + +sub save_updates { + my $cfg = shift; + my $slave = shift; + my $updates = shift; + # [ [ name, time, updatestring ], + # [ name, time, updatestring ] ] + for my $update (split /\n/, $updates){ + my ($name, $time, $updatestring) = split /\t/, $update; + my $file = $cfg->{General}{datadir}."/${name}.slaves"; + if ( ! -f $cfg->{General}{datadir}."/${name}.rrd" ){ + warn "Skipping update for $name since it does not exist in the local data structure ($cfg->{General}{datadir})\n"; + } elsif ( open (my $hand, '+>>', $file) ) { + if ( flock $hand, LOCK_EX ){ + my $existing; + if ( tell $hand > 0 ){ + eval { $existing = fd_retreive $hand }; + if ($@) { #error + warn "Loading $file: $@"; + $existing = []; + } + }; + push @{$existing}, [ $slave, $time, $updatestring]; + nstore_fd ($existing, $hand); + flock $hand, LOCK_UN; + } else { + warn "Could not lock $file. Can't store data.\n"; + } + close $hand; + } else { + warn "Could not write to $file: $!"; + } + } +}; + +=head3 answer_slave + +Answer the requests from the slave by accepting the data, verifying the secrets +and providing updated config information if necessary. + +=cut + +sub anwer_slave { + my $cfg = shift; + my $q = shift; + my $slave = $q->param('slave'); + my $secret = get_secret($slave); + my $key = $q->param('key'); + my $data = $q->param('data'); + my $config_time = $q->param('config_time'); + + # lets make sure the she share a secret + if (md5_base64($secret.$data) eq $key){ + save_updates $cfg, $slave, $data; + } else { + warn "Data from $slave was signed with $key which does not match our expectation\n"; + } + # does the client need new config ? + if ($config_time < $cfg->{__last}){ + print extract_config $cfg, $slave; + } else { + print "\n" + }; +} + 1; __END__ diff --git a/lib/Smokeping/Slave.pm b/lib/Smokeping/Slave.pm index 38dab1c..c5aa3d3 100644 --- a/lib/Smokeping/Slave.pm +++ b/lib/Smokeping/Slave.pm @@ -3,8 +3,8 @@ package Smokeping::Slave; use warnings; use strict; use Data::Dumper; -use Storable qw(nstore retreive); -use Digest::MD5 qw(md5_ base64); +use Storable qw(nstore retrieve); +use Digest::MD5 qw(md5_base64); use LWP::UserAgent; use Smokeping; @@ -49,8 +49,7 @@ sub get_results { if ($prop eq 'host') { #print "update $name\n"; my $updatestring = $probeobj->rrdupdate_string($tree); - my $pings = $probeobj->_pings($tree); - push @$results, [ $name, time, $updatestring]; + push @$results, "$name\t".time()."\t$updatestring"; } } return $results; @@ -60,25 +59,26 @@ sub submit_results { my $slave_cfg = shift; my $cfg = shift; my $myprobe = shift; + my $probes = shift; my $store = $slave_cfg->{cache_dir}."/data"; $store .= "_$myprobe" if $myprobe; $store .= ".cache"; my $restore = retrieve $store if -f $store; - my $data = get_results($slave_cfg, $cfg, $probes, $cfg->{Targets}, $cfg->{General}{datadir}, $myprobe); + my $data = get_results($slave_cfg, $cfg, $probes, $cfg->{Targets}, '', $myprobe); push @$data, @$restore; - my $data_dump = Dumper $data; + my $data_dump = join "\n",@{$data}; my $ua = LWP::UserAgent->new( agent => 'smokeping-slave/1.0', - from => $slave_cfg->{slave_name}, timeout => 10, env_proxy => 1 ); my $response = $ua->post( $slave_cfg->{master_url}, Content_Type => 'form-data', Content => [ - key => md5_base_64($slave_cfg->{shared_secret}.$data_dump) + slave => $slave_cfg->{slave_name}, + key => md5_base_64($slave_cfg->{shared_secret}.$data_dump), data => $data_dump, - config_time => $cfg->{__last} || 0; + config_time => $cfg->{__last} || 0, ], ); if ($response->is_success){ @@ -90,11 +90,14 @@ sub submit_results { } my $VAR1; eval $data; - if (ref $VAR1 eq 'HASH'){ - update_config $cfg,$VAR1; + if ($@){ + warn "evaluating new config from server failed: $@"; + } elsif (definded $VAR1 and ref $VAR1 eq 'HASH'){ + update_config($cfg,$VAR1); } } else { - # ok we have to store the result so that we can try again later + # ok did not manage to get our data to the server. + # we store the result so that we can try again later. nstore $store; warn $response->status_line(); } @@ -115,9 +118,10 @@ sub update_config { $cfg->{Database} = $data->{Database}; $cfg->{Targets} = $data->{Targets}; $cfg->{__last} = $data->{__last}; - $Smokeping::probes = Smokeping::load_probes $cfg; + my $probes = Smokeping::load_probes $cfg; $cfg->{__probes} = $probes; - add_targets $cfg, $probes, $cfg->{Targets}, $cfg->{General}{datadir}; + add_targets($cfg, $probes, $cfg->{Targets}, $cfg->{General}{datadir}); + return $probes; } 1; -- cgit v1.2.3-24-g4f1b