From b6dedf486972c23b05f16a00c2be1c32e139eb1f Mon Sep 17 00:00:00 2001 From: Byron Jones Date: Fri, 15 Feb 2013 13:57:04 +0800 Subject: Bug 832893: changes jobqueue.pl to spawn worker processes to deliver bugmail to avoid memory leaks r=dkl, a=LpSolit --- Bugzilla/JobQueue.pm | 60 ++++++++++++++++++++++++++++++++++++++++++++- Bugzilla/JobQueue/Runner.pm | 28 +++++++++++++-------- jobqueue.pl | 1 + t/011pod.t | 2 +- 4 files changed, 79 insertions(+), 12 deletions(-) diff --git a/Bugzilla/JobQueue.pm b/Bugzilla/JobQueue.pm index 098ee7935..9365a7d56 100644 --- a/Bugzilla/JobQueue.pm +++ b/Bugzilla/JobQueue.pm @@ -13,7 +13,10 @@ use strict; use Bugzilla::Constants; use Bugzilla::Error; use Bugzilla::Install::Util qw(install_string); -use parent qw(TheSchwartz); +use File::Basename; +use File::Slurp; +use base qw(TheSchwartz); +use fields qw(_worker_pidfile); # This maps job names for Bugzilla::JobQueue to the appropriate modules. # If you add new types of jobs, you should add a mapping here. @@ -93,6 +96,57 @@ sub insert { return $retval; } +# To avoid memory leaks/fragmentation which tends to happen for long running +# perl processes; check for jobs, and spawn a new process to empty the queue. +sub subprocess_worker { + my $self = shift; + + my $command = "$0 -d -p '" . $self->{_worker_pidfile} . "' onepass"; + + while (1) { + my $time = (time); + my @jobs = $self->list_jobs({ + funcname => $self->{all_abilities}, + run_after => $time, + grabbed_until => $time, + limit => 1, + }); + if (@jobs) { + $self->debug("Spawning queue worker process"); + # Run the worker as a daemon + system $command; + # And poll the PID to detect when the working has finished. + # We do this instead of system() to allow for the INT signal to + # interrup us and trigger kill_worker(). + my $pid = read_file($self->{_worker_pidfile}, err_mode => 'quiet'); + if ($pid) { + sleep(3) while(kill(0, $pid)); + } + $self->debug("Queue worker process completed"); + } else { + $self->debug("No jobs found"); + } + sleep(5); + } +} + +sub kill_worker { + my $self = Bugzilla->job_queue(); + if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) { + my $worker_pid = read_file($self->{_worker_pidfile}); + if ($worker_pid && kill(0, $worker_pid)) { + $self->debug("Stopping worker process"); + system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop"; + } + } +} + +sub set_pidfile { + my ($self, $pidfile) = @_; + $self->{_worker_pidfile} = bz_locations->{'datadir'} . + '/worker-' . basename($pidfile); +} + # Clear the request cache at the start of each run. sub work_once { my $self = shift; @@ -136,4 +190,8 @@ be sent away to be done later. =item job_map +=item set_pidfile + +=item kill_worker + =back diff --git a/Bugzilla/JobQueue/Runner.pm b/Bugzilla/JobQueue/Runner.pm index 7ab4f7f1a..a0d6a77cb 100644 --- a/Bugzilla/JobQueue/Runner.pm +++ b/Bugzilla/JobQueue/Runner.pm @@ -38,6 +38,7 @@ our $initscript = "bugzilla-queue"; sub gd_preconfig { my $self = shift; + $self->{_run_command} = 'subprocess_worker'; my $pidfile = $self->{gd_args}{pidfile}; if (!$pidfile) { $pidfile = bz_locations()->{datadir} . '/' . $self->{gd_progname} @@ -136,6 +137,7 @@ sub gd_can_install { print $config_fh <gd_quit_event(); } } -sub gd_other_cmd { - my ($self) = shift; - if ($ARGV[0] eq "once") { - $self->_do_work("work_once"); +sub gd_quit_event { + Bugzilla->job_queue->kill_worker(); + exit(1); +} - exit(0); +sub gd_other_cmd { + my ($self, $do, $locked) = @_; + if ($do eq "once") { + $self->{_run_command} = 'work_once'; + } elsif ($do eq "onepass") { + $self->{_run_command} = 'work_until_done'; + } else { + $self->SUPER::gd_other_cmd($do, $locked); } - - $self->SUPER::gd_other_cmd(); } sub gd_run { my $self = shift; - - $self->_do_work("work"); + $self->_do_work($self->{_run_command}); } sub _do_work { @@ -205,11 +211,11 @@ sub _do_work { my $jq = Bugzilla->job_queue(); $jq->set_verbose($self->{debug}); + $jq->set_pidfile($self->{gd_pidfile}); foreach my $module (values %{ Bugzilla::JobQueue->job_map() }) { eval "use $module"; $jq->can_do($module); } - $jq->$fn; } @@ -242,6 +248,8 @@ to run the Bugzilla job queue. =item gd_can_install +=item gd_quit_event + =item gd_other_cmd =item gd_more_opt diff --git a/jobqueue.pl b/jobqueue.pl index 6ba288d2e..c8afd74cc 100755 --- a/jobqueue.pl +++ b/jobqueue.pl @@ -46,6 +46,7 @@ jobqueue.pl - Runs jobs in the background for Bugzilla. starts a new one. once Checks the job queue once, executes the first item found (if any) and then exits + onepass Checks the job queue, executes all items found, and then exits check Report the current status of the daemon. install On some *nix systems, this automatically installs and configures jobqueue.pl as a system service so that it will diff --git a/t/011pod.t b/t/011pod.t index 7fdafd910..92474d553 100644 --- a/t/011pod.t +++ b/t/011pod.t @@ -30,7 +30,7 @@ use constant DEFAULT_WHITELIST => qr/^(?:new|new_from_list|check|run_create_vali use constant SUB_WHITELIST => ( 'Bugzilla::Flag' => qr/^(?:(force_)?retarget|force_cleanup)$/, 'Bugzilla::FlagType' => qr/^sqlify_criteria$/, - 'Bugzilla::JobQueue' => qr/^work_once$/, + 'Bugzilla::JobQueue' => qr/(?:^work_once|subprocess_worker)$/, ); # These modules do not need to be documented, generally because they -- cgit v1.2.3-24-g4f1b