diff options
Diffstat (limited to 'Bugzilla/JobQueue.pm')
-rw-r--r-- | Bugzilla/JobQueue.pm | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/Bugzilla/JobQueue.pm b/Bugzilla/JobQueue.pm index 7ea678345..669076dd5 100644 --- a/Bugzilla/JobQueue.pm +++ b/Bugzilla/JobQueue.pm @@ -27,12 +27,15 @@ use strict; use Bugzilla::Constants; use Bugzilla::Error; use Bugzilla::Install::Util qw(install_string); +use File::Slurp; use base qw(TheSchwartz); +use fields qw(_worker_pidfile); # This maps job names for Bugzilla::JobQueue to the appropriate modules. # If you add new types of jobs, you should add a mapping here. use constant JOB_MAP => { send_mail => 'Bugzilla::Job::Mailer', + bug_mail => 'Bugzilla::Job::BugMail', }; # Without a driver cache TheSchwartz opens a new database connection @@ -40,6 +43,10 @@ use constant JOB_MAP => { # across requests. use constant DRIVER_CACHE_TIME => 300; # 5 minutes +# To avoid memory leak/fragmentation, a worker process won't process more than +# MAX_MESSAGES messages. +use constant MAX_MESSAGES => 1000; + sub job_map { if (!defined(Bugzilla->request_cache->{job_map})) { my $job_map = JOB_MAP; @@ -99,6 +106,74 @@ sub insert { return $retval; } +# To avoid memory leaks/fragmentation which tends to happen for long running +# perl processes; check for jobs, and spawn a new process to empty the queue. +sub subprocess_worker { + my $self = shift; + + my $command = "$0 -p '" . $self->{_worker_pidfile} . "' onepass"; + + while (1) { + my $time = (time); + my @jobs = $self->list_jobs({ + funcname => $self->{all_abilities}, + run_after => $time, + grabbed_until => $time, + limit => 1, + }); + if (@jobs) { + $self->debug("Spawning queue worker process"); + # Run the worker as a daemon + system $command; + # And poll the PID to detect when the working has finished. + # We do this instead of system() to allow for the INT signal to + # interrup us and trigger kill_worker(). + my $pid = read_file($self->{_worker_pidfile}, err_mode => 'quiet'); + if ($pid) { + sleep(3) while(kill(0, $pid)); + } + $self->debug("Queue worker process completed"); + } else { + $self->debug("No jobs found"); + } + sleep(5); + } +} + +sub kill_worker { + my $self = Bugzilla->job_queue(); + if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) { + my $worker_pid = read_file($self->{_worker_pidfile}); + if ($worker_pid && kill(0, $worker_pid)) { + $self->debug("Stopping worker process"); + system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop"; + } + } +} + +sub set_pidfile { + my ($self, $pidfile) = @_; + $pidfile =~ s/^(.+)(\..+)$/$1.worker$2/; + $self->{_worker_pidfile} = $pidfile; +} + +# Clear the request cache at the start of each run. +sub work_once { + my $self = shift; + Bugzilla->clear_request_cache(); + return $self->SUPER::work_once(@_); +} + +# Never process more than MAX_MESSAGES in one batch, to avoid memory +# leak/fragmentation issues. +sub work_until_done { + my $self = shift; + my $count = 0; + while ($count++ < MAX_MESSAGES) { + $self->work_once or last; + } +} + 1; __END__ |