summaryrefslogtreecommitdiffstats
path: root/Bugzilla/JobQueue
diff options
context:
space:
mode:
authorDylan William Hardison <dylan@hardison.net>2018-04-01 16:52:36 +0200
committerDylan William Hardison <dylan@hardison.net>2018-04-01 16:52:36 +0200
commitab229b9a828b77f8a3b9ce215f0dfed4c84d4ae5 (patch)
tree483da9c8b66f4444bb8a410e3d599c7484ad721e /Bugzilla/JobQueue
parentdaa2d6b1c40354ecce0e48e6c5ee686efe642c4b (diff)
parent2f8b999750cc700faf03c6aee1c53d1fc4df767f (diff)
downloadbugzilla-ab229b9a828b77f8a3b9ce215f0dfed4c84d4ae5.tar.gz
bugzilla-ab229b9a828b77f8a3b9ce215f0dfed4c84d4ae5.tar.xz
Merge branch 'master' into unstable
Diffstat (limited to 'Bugzilla/JobQueue')
-rw-r--r--Bugzilla/JobQueue/Runner.pm175
-rw-r--r--Bugzilla/JobQueue/Worker.pm30
2 files changed, 138 insertions, 67 deletions
diff --git a/Bugzilla/JobQueue/Runner.pm b/Bugzilla/JobQueue/Runner.pm
index 5b3164ef9..0177de40a 100644
--- a/Bugzilla/JobQueue/Runner.pm
+++ b/Bugzilla/JobQueue/Runner.pm
@@ -14,23 +14,34 @@ package Bugzilla::JobQueue::Runner;
use 5.10.1;
use strict;
use warnings;
+use autodie qw(open close unlink system);
+use Bugzilla::Logging;
+use Bugzilla::Constants;
+use Bugzilla::DaemonControl qw(:utils);
+use Bugzilla::JobQueue::Worker;
+use Bugzilla::JobQueue;
+use Bugzilla::Util qw(get_text);
use Cwd qw(abs_path);
+use English qw(-no_match_vars $PROGRAM_NAME $EXECUTABLE_NAME);
use File::Basename;
use File::Copy;
+use File::Spec::Functions qw(catfile tmpdir);
+use Future;
+use Future::Utils qw(fmap_void);
+use IO::Async::Loop;
+use IO::Async::Process;
+use IO::Async::Signal;
use Pod::Usage;
-use Bugzilla::Constants;
-use Bugzilla::JobQueue;
-use Bugzilla::Util qw(get_text);
-BEGIN { eval "use base qw(Daemon::Generic)"; }
+use parent qw(Daemon::Generic);
-our $VERSION = BUGZILLA_VERSION;
+our $VERSION = 2;
# Info we need to install/uninstall the daemon.
-our $chkconfig = "/sbin/chkconfig";
-our $initd = "/etc/init.d";
-our $initscript = "bugzilla-queue";
+our $chkconfig = '/sbin/chkconfig';
+our $initd = '/etc/init.d';
+our $initscript = 'bugzilla-queue';
# The Daemon::Generic docs say that it uses all sorts of
# things from gd_preconfig, but in fact it does not. The
@@ -40,11 +51,10 @@ sub gd_preconfig {
my $self = shift;
my $pidfile = $self->{gd_args}{pidfile};
- if (!$pidfile) {
- $pidfile = bz_locations()->{datadir} . '/' . $self->{gd_progname}
- . ".pid";
+ if ( !$pidfile ) {
+ $pidfile = catfile(tmpdir(), $self->{gd_progname} . '.pid');
}
- return (pidfile => $pidfile);
+ return ( pidfile => $pidfile );
}
# All config other than the pidfile has to be done in gd_getopt
@@ -54,24 +64,30 @@ sub gd_getopt {
$self->SUPER::gd_getopt();
- if ($self->{gd_args}{progname}) {
+ if ( $self->{gd_args}{progname} ) {
$self->{gd_progname} = $self->{gd_args}{progname};
}
else {
- $self->{gd_progname} = basename($0);
+ $self->{gd_progname} = basename($PROGRAM_NAME);
}
- # There are places that Daemon Generic's new() uses $0 instead of
+ # There are places that Daemon Generic's new() uses $PROGRAM_NAME instead of
# gd_progname, which it really shouldn't, but this hack fixes it.
- $self->{_original_zero} = $0;
- $0 = $self->{gd_progname};
+ $self->{_original_program_name} = $PROGRAM_NAME;
+
+ ## no critic (Variables::RequireLocalizedPunctuationVars)
+ $PROGRAM_NAME = $self->{gd_progname};
+ ## use critic
}
sub gd_postconfig {
my $self = shift;
+
# See the hack above in gd_getopt. This just reverses it
# in case anything else needs the accurate $0.
- $0 = delete $self->{_original_zero};
+ ## no critic (Variables::RequireLocalizedPunctuationVars)
+ $PROGRAM_NAME = delete $self->{_original_program_name};
+ ## use critic
}
sub gd_more_opt {
@@ -79,12 +95,13 @@ sub gd_more_opt {
return (
'pidfile=s' => \$self->{gd_args}{pidfile},
'n=s' => \$self->{gd_args}{progname},
+ 'jobs|j=i' => \$self->{gd_args}{jobs},
);
}
sub gd_usage {
- pod2usage({ -verbose => 0, -exitval => 'NOEXIT' });
- return 0
+ pod2usage( { -verbose => 0, -exitval => 'NOEXIT' } );
+ return 0;
}
sub gd_can_install {
@@ -95,66 +112,63 @@ sub gd_can_install {
my $sysconfig = '/etc/sysconfig';
my $config_file = "$sysconfig/$initscript";
- if (!-x $chkconfig or !-d $initd) {
+ if ( !-x $chkconfig || !-d $initd ) {
return $self->SUPER::gd_can_install(@_);
}
return sub {
- if (!-w $initd) {
+ if ( !-w $initd ) {
print "You must run the 'install' command as root.\n";
return;
}
- if (-e $dest_file) {
+ if ( -e $dest_file ) {
print "$initscript already in $initd.\n";
}
else {
- copy($source_file, $dest_file)
+ copy( $source_file, $dest_file )
or die "Could not copy $source_file to $dest_file: $!";
- chmod(0755, $dest_file)
+ chmod 0755, $dest_file
or die "Could not change permissions on $dest_file: $!";
}
- system($chkconfig, '--add', $initscript);
- print "$initscript installed.",
- " To start the daemon, do \"$dest_file start\" as root.\n";
+ system $chkconfig, '--add', $initscript;
+ print "$initscript installed.", " To start the daemon, do \"$dest_file start\" as root.\n";
- if (-d $sysconfig and -w $sysconfig) {
- if (-e $config_file) {
+ if ( -d $sysconfig and -w $sysconfig ) {
+ if ( -e $config_file ) {
print "$config_file already exists.\n";
return;
}
- open(my $config_fh, ">", $config_file)
- or die "Could not write to $config_file: $!";
- my $directory = abs_path(dirname($self->{_original_zero}));
- my $owner_id = (stat $self->{_original_zero})[4];
- my $owner = getpwuid($owner_id);
- print $config_fh <<END;
+ open my $config_fh, '>', $config_file;
+ my $directory = abs_path( dirname( $self->{_original_program_name} ) );
+ my $owner_id = ( stat $self->{_original_program_name} )[4];
+ my $owner = getpwuid $owner_id;
+ print $config_fh <<"END";
#!/bin/sh
BUGZILLA="$directory"
USER=$owner
END
- close($config_fh);
+ close $config_fh;
}
else {
print "Please edit $dest_file to configure the daemon.\n";
}
- }
+ }
}
sub gd_can_uninstall {
my $self = shift;
- if (-x $chkconfig and -d $initd) {
+ if ( -x $chkconfig and -d $initd ) {
return sub {
- if (!-e "$initd/$initscript") {
+ if ( !-e "$initd/$initscript" ) {
print "$initscript not installed.\n";
return;
}
- system($chkconfig, '--del', $initscript);
- print "$initscript disabled.",
- " To stop it, run: $initd/$initscript stop\n";
- }
+ system $chkconfig, '--del', $initscript;
+ print "$initscript disabled.", " To stop it, run: $initd/$initscript stop\n";
+ }
}
return $self->SUPER::gd_can_install(@_);
@@ -164,49 +178,76 @@ sub gd_check {
my $self = shift;
# Get a count of all the jobs currently in the queue.
- my $jq = Bugzilla->job_queue();
- my @dbs = $jq->bz_databases();
+ my $jq = Bugzilla->job_queue();
+ my @dbs = $jq->bz_databases();
my $count = 0;
foreach my $driver (@dbs) {
- $count += $driver->select_one('SELECT COUNT(*) FROM ts_job', []);
+ $count += $driver->select_one( 'SELECT COUNT(*) FROM ts_job', [] );
}
- print get_text('job_queue_depth', { count => $count }) . "\n";
+ print get_text( 'job_queue_depth', { count => $count } ) . "\n";
}
+# override this to use IO::Async.
sub gd_setup_signals {
- my $self = shift;
- $self->SUPER::gd_setup_signals();
- $SIG{TERM} = sub { $self->gd_quit_event(); }
+ my $self = shift;
+ my @signals = qw( INT HUP TERM );
+ $self->{_signal_future} = Future->wait_any( map { catch_signal( $_, $_ ) } @signals );
}
sub gd_other_cmd {
my ($self) = shift;
- if ($ARGV[0] eq "once") {
- $self->_do_work("work_once");
-
- exit(0);
+ if ( $ARGV[0] eq 'once' ) {
+ Bugzilla::JobQueue::Worker->run('work_once');
+ exit;
}
-
+
$self->SUPER::gd_other_cmd();
}
-sub gd_run {
- my $self = shift;
+sub gd_quit_event { FATAL('gd_quit_event() should never be called') }
+sub gd_reconfig_event { FATAL('gd_reconfig_event() should never be called') }
- $self->_do_work("work");
+sub gd_run {
+ my $self = shift;
+ my $jobs = $self->{gd_args}{jobs} // 1;
+ my $signal_f = $self->{_signal_future};
+ my $workers_f = fmap_void { $self->run_worker() }
+ concurrent => $jobs,
+ generate => sub { !$signal_f->is_ready };
+
+ # This is so the process shows up in (h)top in a useful way.
+ local $PROGRAM_NAME = "$self->{gd_progname} [supervisor]";
+ Future->wait_any($signal_f, $workers_f)->get;
+ unlink $self->{gd_pidfile};
+ exit 0;
}
-sub _do_work {
- my ($self, $fn) = @_;
+# This executes the script "jobqueue-worker.pl"
+# $EXECUTABLE_NAME is the name of the perl interpreter.
+sub run_worker {
+ my ( $self ) = @_;
- my $jq = Bugzilla->job_queue();
- $jq->set_verbose($self->{debug});
- foreach my $module (values %{ Bugzilla::JobQueue->job_map() }) {
- eval "use $module";
- $jq->can_do($module);
+ my $script = catfile( bz_locations->{cgi_path}, 'jobqueue-worker.pl' );
+ my @command = ( $EXECUTABLE_NAME, $script);
+ if ( $self->{gd_args}{progname} ) {
+ push @command, '--name' => "$self->{gd_args}{progname} [worker]";
}
- $jq->$fn;
+ my $loop = IO::Async::Loop->new;
+ my $exit_f = $loop->new_future;
+ my $worker = IO::Async::Process->new(
+ command => \@command,
+ on_finish => on_finish($exit_f),
+ on_exception => on_exception( 'jobqueue worker', $exit_f )
+ );
+ $exit_f->on_cancel(
+ sub {
+ DEBUG('terminate worker');
+ $worker->kill('TERM');
+ }
+ );
+ $loop->add($worker);
+ return $exit_f;
}
1;
diff --git a/Bugzilla/JobQueue/Worker.pm b/Bugzilla/JobQueue/Worker.pm
new file mode 100644
index 000000000..db8ebe35e
--- /dev/null
+++ b/Bugzilla/JobQueue/Worker.pm
@@ -0,0 +1,30 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# This Source Code Form is "Incompatible With Secondary Licenses", as
+# defined by the Mozilla Public License, v. 2.0.
+
+package Bugzilla::JobQueue::Worker;
+use 5.10.1;
+use strict;
+use warnings;
+
+use Bugzilla::Logging;
+use Module::Runtime qw(require_module);
+
+sub run {
+ my ( $class, $fn ) = @_;
+ DEBUG("Starting up for $fn");
+ my $jq = Bugzilla->job_queue();
+
+ DEBUG('Loading jobqueue modules');
+ foreach my $module ( values %{ Bugzilla::JobQueue->job_map() } ) {
+ DEBUG("JobQueue can do $module");
+ require_module($module);
+ $jq->can_do($module);
+ }
+ $jq->$fn;
+}
+
+1;