summaryrefslogtreecommitdiffstats
path: root/Bugzilla/JobQueue.pm
diff options
context:
space:
mode:
Diffstat (limited to 'Bugzilla/JobQueue.pm')
-rw-r--r--Bugzilla/JobQueue.pm75
1 files changed, 75 insertions, 0 deletions
diff --git a/Bugzilla/JobQueue.pm b/Bugzilla/JobQueue.pm
index 7ea678345..669076dd5 100644
--- a/Bugzilla/JobQueue.pm
+++ b/Bugzilla/JobQueue.pm
@@ -27,12 +27,15 @@ use strict;
use Bugzilla::Constants;
use Bugzilla::Error;
use Bugzilla::Install::Util qw(install_string);
+use File::Slurp;
use base qw(TheSchwartz);
+use fields qw(_worker_pidfile);
# This maps job names for Bugzilla::JobQueue to the appropriate modules.
# If you add new types of jobs, you should add a mapping here.
use constant JOB_MAP => {
send_mail => 'Bugzilla::Job::Mailer',
+ bug_mail => 'Bugzilla::Job::BugMail',
};
# Without a driver cache TheSchwartz opens a new database connection
@@ -40,6 +43,10 @@ use constant JOB_MAP => {
# across requests.
use constant DRIVER_CACHE_TIME => 300; # 5 minutes
+# To avoid memory leak/fragmentation, a worker process won't process more than
+# MAX_MESSAGES messages.
+use constant MAX_MESSAGES => 1000;
+
sub job_map {
if (!defined(Bugzilla->request_cache->{job_map})) {
my $job_map = JOB_MAP;
@@ -99,6 +106,74 @@ sub insert {
return $retval;
}
+# To avoid memory leaks/fragmentation which tends to happen for long running
+# perl processes; check for jobs, and spawn a new process to empty the queue.
+sub subprocess_worker {
+ my $self = shift;
+
+ my $command = "$0 -p '" . $self->{_worker_pidfile} . "' onepass";
+
+ while (1) {
+ my $time = (time);
+ my @jobs = $self->list_jobs({
+ funcname => $self->{all_abilities},
+ run_after => $time,
+ grabbed_until => $time,
+ limit => 1,
+ });
+ if (@jobs) {
+ $self->debug("Spawning queue worker process");
+ # Run the worker as a daemon
+ system $command;
+ # And poll the PID to detect when the working has finished.
+ # We do this instead of system() to allow for the INT signal to
+ # interrup us and trigger kill_worker().
+ my $pid = read_file($self->{_worker_pidfile}, err_mode => 'quiet');
+ if ($pid) {
+ sleep(3) while(kill(0, $pid));
+ }
+ $self->debug("Queue worker process completed");
+ } else {
+ $self->debug("No jobs found");
+ }
+ sleep(5);
+ }
+}
+
+sub kill_worker {
+ my $self = Bugzilla->job_queue();
+ if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) {
+ my $worker_pid = read_file($self->{_worker_pidfile});
+ if ($worker_pid && kill(0, $worker_pid)) {
+ $self->debug("Stopping worker process");
+ system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop";
+ }
+ }
+}
+
+sub set_pidfile {
+ my ($self, $pidfile) = @_;
+ $pidfile =~ s/^(.+)(\..+)$/$1.worker$2/;
+ $self->{_worker_pidfile} = $pidfile;
+}
+
+# Clear the request cache at the start of each run.
+sub work_once {
+ my $self = shift;
+ Bugzilla->clear_request_cache();
+ return $self->SUPER::work_once(@_);
+}
+
+# Never process more than MAX_MESSAGES in one batch, to avoid memory
+# leak/fragmentation issues.
+sub work_until_done {
+ my $self = shift;
+ my $count = 0;
+ while ($count++ < MAX_MESSAGES) {
+ $self->work_once or last;
+ }
+}
+
1;
__END__