summaryrefslogtreecommitdiffstats
path: root/Bugzilla/JobQueue.pm
blob: 50e6de03c5fe88816103f67c9a888b18707537fc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# -*- Mode: perl; indent-tabs-mode: nil -*-
#
# The contents of this file are subject to the Mozilla Public
# License Version 1.1 (the "License"); you may not use this file
# except in compliance with the License. You may obtain a copy of
# the License at http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS
# IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
# implied. See the License for the specific language governing
# rights and limitations under the License.
#
# The Original Code is the Bugzilla Bug Tracking System.
#
# The Initial Developer of the Original Code is Mozilla Corporation.
# Portions created by the Initial Developer are Copyright (C) 2008
# Mozilla Corporation. All Rights Reserved.
#
# Contributor(s): 
#   Mark Smith <mark@mozilla.com>
#   Max Kanat-Alexander <mkanat@bugzilla.org>

package Bugzilla::JobQueue;

use strict;

use Bugzilla::Constants;
use Bugzilla::Error;
use Bugzilla::Install::Util qw(install_string);
use File::Slurp;
use base qw(TheSchwartz);
use fields qw(_worker_pidfile);

# This maps job names for Bugzilla::JobQueue to the appropriate modules.
# If you add new types of jobs, you should add a mapping here.
use constant JOB_MAP => {
    send_mail => 'Bugzilla::Job::Mailer',
    bug_mail  => 'Bugzilla::Job::BugMail',
};

# Without a driver cache TheSchwartz opens a new database connection
# for each email it sends.  This cached connection doesn't persist
# across requests.
use constant DRIVER_CACHE_TIME => 300; # 5 minutes

# To avoid memory leak/fragmentation, a worker process won't process more than
# MAX_MESSAGES messages.
use constant MAX_MESSAGES => 100;

sub job_map {
    if (!defined(Bugzilla->request_cache->{job_map})) {
        my $job_map = JOB_MAP;
        Bugzilla::Hook::process('job_map', { job_map => $job_map });
        Bugzilla->request_cache->{job_map} = $job_map;
    }
    
    return Bugzilla->request_cache->{job_map};
}

sub new {
    my $class = shift;

    if (!Bugzilla->feature('jobqueue')) {
        ThrowCodeError('feature_disabled', { feature => 'jobqueue' });
    }

    my $lc = Bugzilla->localconfig;
    # We need to use the main DB as TheSchwartz module is going
    # to write to it.
    my $self = $class->SUPER::new(
        databases => [{
            dsn    => Bugzilla->dbh_main->{private_bz_dsn},
            user   => $lc->{db_user},
            pass   => $lc->{db_pass},
            prefix => 'ts_',
        }],
        driver_cache_expiration => DRIVER_CACHE_TIME,
    );

    return $self;
}

# A way to get access to the underlying databases directly.
sub bz_databases {
    my $self = shift;
    my @hashes = keys %{ $self->{databases} };
    return map { $self->driver_for($_) } @hashes;
}

# inserts a job into the queue to be processed and returns immediately
sub insert {
    my $self = shift;
    my $job = shift;

    my $mapped_job = Bugzilla::JobQueue->job_map()->{$job};
    ThrowCodeError('jobqueue_no_job_mapping', { job => $job })
        if !$mapped_job;
    unshift(@_, $mapped_job);

    my $retval = $self->SUPER::insert(@_);
    # XXX Need to get an error message here if insert fails, but
    # I don't see any way to do that in TheSchwartz.
    ThrowCodeError('jobqueue_insert_failed', { job => $job, errmsg => $@ })
        if !$retval;
 
    return $retval;
}

# To avoid memory leaks/fragmentation which tends to happen for long running
# perl processes; check for jobs, and spawn a new process to empty the queue.
sub subprocess_worker {
    my $self = shift;

    my $command = "$0 -p '" . $self->{_worker_pidfile} . "' onepass";

    while (1) {
        my $time = (time);
        my @jobs = $self->list_jobs({
            funcname      => $self->{all_abilities},
            run_after     => $time,
            grabbed_until => $time,
            limit         => 1,
        });
        if (@jobs) {
            $self->debug("Spawning queue worker process");
            # Run the worker as a daemon
            system $command;
            # And poll the PID to detect when the working has finished.
            # We do this instead of system() to allow for the INT signal to
            # interrup us and trigger kill_worker().
            my $pid = read_file($self->{_worker_pidfile}, err_mode => 'quiet');
            if ($pid) {
                sleep(3) while(kill(0, $pid));
            }
            $self->debug("Queue worker process completed");
        } else {
            $self->debug("No jobs found");
        }
        sleep(5);
    }
}

sub kill_worker {
    my $self = Bugzilla->job_queue();
    if ($self->{_worker_pidfile} && -e $self->{_worker_pidfile}) {
        my $worker_pid = read_file($self->{_worker_pidfile});
        if ($worker_pid && kill(0, $worker_pid)) {
            $self->debug("Stopping worker process");
            system "$0 -f -p '" . $self->{_worker_pidfile} . "' stop";
        }
    }
}

sub set_pidfile {
    my ($self, $pidfile) = @_;
    $pidfile =~ s/^(.+)(\..+)$/$1.worker$2/;
    $self->{_worker_pidfile} = $pidfile;
}

# Clear the request cache at the start of each run.
sub work_once {
    my $self = shift;
    Bugzilla->clear_request_cache();
    return $self->SUPER::work_once(@_);
}

# Never process more than MAX_MESSAGES in one batch, to avoid memory
# leak/fragmentation issues.
sub work_until_done {
    my $self = shift;
    my $count = 0;
    while ($count++ < MAX_MESSAGES) {
        $self->work_once or last;
    }
}

1;

__END__

=head1 NAME

Bugzilla::JobQueue - Interface between Bugzilla and TheSchwartz.

=head1 SYNOPSIS

 use Bugzilla;

 my $obj = Bugzilla->job_queue();
 $obj->insert('send_mail', { msg => $message });

=head1 DESCRIPTION

Certain tasks should be done asyncronously.  The job queue system allows
Bugzilla to use some sort of service to schedule jobs to happen asyncronously.

=head2 Inserting a Job

See the synopsis above for an easy to follow example on how to insert a
job into the queue.  Give it a name and some arguments and the job will
be sent away to be done later.