summaryrefslogtreecommitdiffstats
path: root/extensions/Push/lib/Push.pm
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/Push/lib/Push.pm')
-rw-r--r--extensions/Push/lib/Push.pm434
1 files changed, 219 insertions, 215 deletions
diff --git a/extensions/Push/lib/Push.pm b/extensions/Push/lib/Push.pm
index ab640da81..97bac942b 100644
--- a/extensions/Push/lib/Push.pm
+++ b/extensions/Push/lib/Push.pm
@@ -24,269 +24,273 @@ use Bugzilla::Extension::Push::Util;
use DateTime;
use Try::Tiny;
-has 'is_daemon' => (
- is => 'rw',
- default => 0,
-);
+has 'is_daemon' => (is => 'rw', default => 0,);
sub start {
- my ($self) = @_;
- my $connectors = $self->connectors;
- $self->{config_last_modified} = $self->get_config_last_modified();
- $self->{config_last_checked} = (time);
-
- foreach my $connector ($connectors->list) {
- $connector->backlog->reset_backoff();
- }
-
- my $pushd_loop = IO::Async::Loop->new;
- my $main_timer = IO::Async::Timer::Periodic->new(
- first_interval => 0,
- interval => POLL_INTERVAL_SECONDS,
- reschedule => 'drift',
- on_tick => sub {
- if ( $self->_dbh_check() ) {
- $self->_reload();
- try {
- $self->push();
- }
- catch {
- FATAL($_);
- };
- }
- },
+ my ($self) = @_;
+ my $connectors = $self->connectors;
+ $self->{config_last_modified} = $self->get_config_last_modified();
+ $self->{config_last_checked} = (time);
+
+ foreach my $connector ($connectors->list) {
+ $connector->backlog->reset_backoff();
+ }
+
+ my $pushd_loop = IO::Async::Loop->new;
+ my $main_timer = IO::Async::Timer::Periodic->new(
+ first_interval => 0,
+ interval => POLL_INTERVAL_SECONDS,
+ reschedule => 'drift',
+ on_tick => sub {
+ if ($self->_dbh_check()) {
+ $self->_reload();
+ try {
+ $self->push();
+ }
+ catch {
+ FATAL($_);
+ };
+ }
+ },
+ );
+ if (Bugzilla->datadog) {
+ my $dog_timer = IO::Async::Timer::Periodic->new(
+ interval => 120,
+ reschedule => 'drift',
+ on_tick => sub { $self->heartbeat },
);
- if ( Bugzilla->datadog ) {
- my $dog_timer = IO::Async::Timer::Periodic->new(
- interval => 120,
- reschedule => 'drift',
- on_tick => sub { $self->heartbeat },
- );
- $pushd_loop->add($dog_timer);
- $dog_timer->start;
- }
+ $pushd_loop->add($dog_timer);
+ $dog_timer->start;
+ }
- $pushd_loop->add($main_timer);
- $main_timer->start;
- $pushd_loop->run;
+ $pushd_loop->add($main_timer);
+ $main_timer->start;
+ $pushd_loop->run;
}
sub heartbeat {
- my ($self) = @_;
- my $dd = Bugzilla->datadog('bugzilla.pushd');
-
- $dd->gauge('scheduled_jobs', Bugzilla->dbh->selectrow_array('SELECT COUNT(*) FROM push'));
-
- foreach my $connector ($self->connectors->list) {
- if ($connector->enabled) {
- my $lcname = lc $connector->name;
- $dd->gauge("${lcname}.backlog", Bugzilla->dbh->selectrow_array('SELECT COUNT(*) FROM push_backlog WHERE connector = ?', undef, $connector->name));
- }
+ my ($self) = @_;
+ my $dd = Bugzilla->datadog('bugzilla.pushd');
+
+ $dd->gauge('scheduled_jobs',
+ Bugzilla->dbh->selectrow_array('SELECT COUNT(*) FROM push'));
+
+ foreach my $connector ($self->connectors->list) {
+ if ($connector->enabled) {
+ my $lcname = lc $connector->name;
+ $dd->gauge(
+ "${lcname}.backlog",
+ Bugzilla->dbh->selectrow_array(
+ 'SELECT COUNT(*) FROM push_backlog WHERE connector = ?', undef,
+ $connector->name
+ )
+ );
}
+ }
}
sub push {
- my ($self) = @_;
- my $logger = $self->logger;
- my $connectors = $self->connectors;
+ my ($self) = @_;
+ my $logger = $self->logger;
+ my $connectors = $self->connectors;
+
+ my $enabled = 0;
+ foreach my $connector ($connectors->list) {
+ if ($connector->enabled) {
+ $enabled = 1;
+ last;
+ }
+ }
+ return unless $enabled;
- my $enabled = 0;
+ $logger->debug("polling");
+
+ # process each message
+ while (my $message = $self->queue->oldest) {
foreach my $connector ($connectors->list) {
- if ($connector->enabled) {
- $enabled = 1;
- last;
+ next unless $connector->enabled;
+ next unless $connector->should_send($message);
+ $logger->debug("pushing to " . $connector->name);
+
+ my $is_backlogged = $connector->backlog->count;
+
+ if (!$is_backlogged) {
+
+ # connector isn't backlogged, immediate send
+ $logger->debug("immediate send");
+ my ($result, $data);
+ eval { ($result, $data) = $connector->send($message); };
+ if ($@) {
+ $result = PUSH_RESULT_TRANSIENT;
+ $data = clean_error($@);
}
- }
- return unless $enabled;
-
- $logger->debug("polling");
-
- # process each message
- while(my $message = $self->queue->oldest) {
- foreach my $connector ($connectors->list) {
- next unless $connector->enabled;
- next unless $connector->should_send($message);
- $logger->debug("pushing to " . $connector->name);
-
- my $is_backlogged = $connector->backlog->count;
-
- if (!$is_backlogged) {
- # connector isn't backlogged, immediate send
- $logger->debug("immediate send");
- my ($result, $data);
- eval {
- ($result, $data) = $connector->send($message);
- };
- if ($@) {
- $result = PUSH_RESULT_TRANSIENT;
- $data = clean_error($@);
- }
- if (!$result) {
- $logger->error($connector->name . " failed to return a result code");
- $result = PUSH_RESULT_UNKNOWN;
- }
- $logger->result($connector, $message, $result, $data);
-
- if ($result == PUSH_RESULT_TRANSIENT) {
- $is_backlogged = 1;
- }
- }
-
- # if the connector is backlogged, push to the backlog queue
- if ($is_backlogged) {
- INFO('connector is backlogged');
- my $backlog = Bugzilla::Extension::Push::BacklogMessage->create_from_message($message, $connector);
- }
+ if (!$result) {
+ $logger->error($connector->name . " failed to return a result code");
+ $result = PUSH_RESULT_UNKNOWN;
}
+ $logger->result($connector, $message, $result, $data);
- # message processed
- $message->remove_from_db();
+ if ($result == PUSH_RESULT_TRANSIENT) {
+ $is_backlogged = 1;
+ }
+ }
+
+ # if the connector is backlogged, push to the backlog queue
+ if ($is_backlogged) {
+ INFO('connector is backlogged');
+ my $backlog
+ = Bugzilla::Extension::Push::BacklogMessage->create_from_message($message,
+ $connector);
+ }
}
- # process backlog
- foreach my $connector ($connectors->list) {
- next unless $connector->enabled;
- my $message = $connector->backlog->oldest();
- next unless $message;
-
- $logger->debug("processing backlog for " . $connector->name);
- while ($message) {
- my ($result, $data);
- eval {
- ($result, $data) = $connector->send($message);
- };
- if ($@) {
- $result = PUSH_RESULT_TRANSIENT;
- $data = $@;
- }
- $message->inc_attempts($result == PUSH_RESULT_OK ? '' : $data);
- if (!$result) {
- $logger->error($connector->name . " failed to return a result code");
- $result = PUSH_RESULT_UNKNOWN;
- }
- $logger->result($connector, $message, $result, $data);
-
- if ($result == PUSH_RESULT_TRANSIENT) {
- # connector is still down, stop trying
- $connector->backlog->inc_backoff();
- last;
- }
-
- # message was processed
- $message->remove_from_db();
-
- $message = $connector->backlog->oldest();
- }
+ # message processed
+ $message->remove_from_db();
+ }
+
+ # process backlog
+ foreach my $connector ($connectors->list) {
+ next unless $connector->enabled;
+ my $message = $connector->backlog->oldest();
+ next unless $message;
+
+ $logger->debug("processing backlog for " . $connector->name);
+ while ($message) {
+ my ($result, $data);
+ eval { ($result, $data) = $connector->send($message); };
+ if ($@) {
+ $result = PUSH_RESULT_TRANSIENT;
+ $data = $@;
+ }
+ $message->inc_attempts($result == PUSH_RESULT_OK ? '' : $data);
+ if (!$result) {
+ $logger->error($connector->name . " failed to return a result code");
+ $result = PUSH_RESULT_UNKNOWN;
+ }
+ $logger->result($connector, $message, $result, $data);
+
+ if ($result == PUSH_RESULT_TRANSIENT) {
+
+ # connector is still down, stop trying
+ $connector->backlog->inc_backoff();
+ last;
+ }
+
+ # message was processed
+ $message->remove_from_db();
+
+ $message = $connector->backlog->oldest();
}
+ }
}
sub _reload {
- my ($self) = @_;
-
- # check for updated config every 60 seconds
- my $now = (time);
- if ($now - $self->{config_last_checked} < 60) {
- return;
- }
- $self->{config_last_checked} = $now;
-
- $self->logger->debug('Checking for updated configuration');
- if ($self->get_config_last_modified eq $self->{config_last_modified}) {
- return;
- }
- $self->{config_last_modified} = $self->get_config_last_modified();
-
- $self->logger->debug('Configuration has been updated');
- $self->connectors->reload();
+ my ($self) = @_;
+
+ # check for updated config every 60 seconds
+ my $now = (time);
+ if ($now - $self->{config_last_checked} < 60) {
+ return;
+ }
+ $self->{config_last_checked} = $now;
+
+ $self->logger->debug('Checking for updated configuration');
+ if ($self->get_config_last_modified eq $self->{config_last_modified}) {
+ return;
+ }
+ $self->{config_last_modified} = $self->get_config_last_modified();
+
+ $self->logger->debug('Configuration has been updated');
+ $self->connectors->reload();
}
sub get_config_last_modified {
- my ($self) = @_;
- my $options_list = Bugzilla::Extension::Push::Option->match({
- connector => '*',
- option_name => 'last-modified',
+ my ($self) = @_;
+ my $options_list
+ = Bugzilla::Extension::Push::Option->match({
+ connector => '*', option_name => 'last-modified',
});
- if (@$options_list) {
- return $options_list->[0]->value;
- } else {
- return $self->set_config_last_modified();
- }
+ if (@$options_list) {
+ return $options_list->[0]->value;
+ }
+ else {
+ return $self->set_config_last_modified();
+ }
}
sub set_config_last_modified {
- my ($self) = @_;
- my $options_list = Bugzilla::Extension::Push::Option->match({
- connector => '*',
- option_name => 'last-modified',
+ my ($self) = @_;
+ my $options_list
+ = Bugzilla::Extension::Push::Option->match({
+ connector => '*', option_name => 'last-modified',
});
- my $now = DateTime->now->datetime();
- if (@$options_list) {
- $options_list->[0]->set_value($now);
- $options_list->[0]->update();
- } else {
- Bugzilla::Extension::Push::Option->create({
- connector => '*',
- option_name => 'last-modified',
- option_value => $now,
- });
- }
- return $now;
+ my $now = DateTime->now->datetime();
+ if (@$options_list) {
+ $options_list->[0]->set_value($now);
+ $options_list->[0]->update();
+ }
+ else {
+ Bugzilla::Extension::Push::Option->create({
+ connector => '*', option_name => 'last-modified', option_value => $now,
+ });
+ }
+ return $now;
}
sub config {
- my ($self) = @_;
- if (!$self->{config}) {
- $self->{config} = Bugzilla::Extension::Push::Config->new(
- 'global',
- {
- name => 'log_purge',
- label => 'Purge logs older than (days)',
- type => 'string',
- default => '7',
- required => '1',
- validate => sub { $_[0] =~ /\D/ && die "Invalid purge duration (must be numeric)\n"; },
- },
- );
- $self->{config}->load();
- }
- return $self->{config};
+ my ($self) = @_;
+ if (!$self->{config}) {
+ $self->{config} = Bugzilla::Extension::Push::Config->new(
+ 'global',
+ {
+ name => 'log_purge',
+ label => 'Purge logs older than (days)',
+ type => 'string',
+ default => '7',
+ required => '1',
+ validate =>
+ sub { $_[0] =~ /\D/ && die "Invalid purge duration (must be numeric)\n"; },
+ },
+ );
+ $self->{config}->load();
+ }
+ return $self->{config};
}
sub logger {
- my ($self, $value) = @_;
- $self->{logger} = $value if $value;
- return $self->{logger};
+ my ($self, $value) = @_;
+ $self->{logger} = $value if $value;
+ return $self->{logger};
}
sub connectors {
- my ($self, $value) = @_;
- $self->{connectors} = $value if $value;
- return $self->{connectors};
+ my ($self, $value) = @_;
+ $self->{connectors} = $value if $value;
+ return $self->{connectors};
}
sub queue {
- my ($self) = @_;
- $self->{queue} ||= Bugzilla::Extension::Push::Queue->new();
- return $self->{queue};
+ my ($self) = @_;
+ $self->{queue} ||= Bugzilla::Extension::Push::Queue->new();
+ return $self->{queue};
}
sub log {
- my ($self) = @_;
- $self->{log} ||= Bugzilla::Extension::Push::Log->new();
- return $self->{log};
+ my ($self) = @_;
+ $self->{log} ||= Bugzilla::Extension::Push::Log->new();
+ return $self->{log};
}
sub _dbh_check {
- my ($self) = @_;
- eval {
- Bugzilla->dbh->selectrow_array("SELECT 1 FROM push");
- };
- if ($@) {
- $self->logger->error(clean_error($@));
- return 0;
- } else {
- return 1;
- }
+ my ($self) = @_;
+ eval { Bugzilla->dbh->selectrow_array("SELECT 1 FROM push"); };
+ if ($@) {
+ $self->logger->error(clean_error($@));
+ return 0;
+ }
+ else {
+ return 1;
+ }
}
1;