summaryrefslogtreecommitdiffstats
path: root/extensions/Push/lib/Connector.disabled/AMQP.pm
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/Push/lib/Connector.disabled/AMQP.pm')
-rw-r--r--extensions/Push/lib/Connector.disabled/AMQP.pm344
1 files changed, 165 insertions, 179 deletions
diff --git a/extensions/Push/lib/Connector.disabled/AMQP.pm b/extensions/Push/lib/Connector.disabled/AMQP.pm
index 1ba365e21..dda73dade 100644
--- a/extensions/Push/lib/Connector.disabled/AMQP.pm
+++ b/extensions/Push/lib/Connector.disabled/AMQP.pm
@@ -20,211 +20,197 @@ use Bugzilla::Util qw(generate_random_password);
use DateTime;
sub init {
- my ($self) = @_;
- $self->{mq} = 0;
- $self->{channel} = 1;
-
- if ($self->config->{queue}) {
- $self->{queue_name} = $self->config->{queue};
- } else {
- my $queue_name = Bugzilla->localconfig->{'urlbase'};
- $queue_name =~ s#^https?://##;
- $queue_name =~ s#/$#|#;
- $queue_name .= generate_random_password(16);
- $self->{queue_name} = $queue_name;
- }
+ my ($self) = @_;
+ $self->{mq} = 0;
+ $self->{channel} = 1;
+
+ if ($self->config->{queue}) {
+ $self->{queue_name} = $self->config->{queue};
+ }
+ else {
+ my $queue_name = Bugzilla->localconfig->{'urlbase'};
+ $queue_name =~ s#^https?://##;
+ $queue_name =~ s#/$#|#;
+ $queue_name .= generate_random_password(16);
+ $self->{queue_name} = $queue_name;
+ }
}
sub options {
- return (
- {
- name => 'host',
- label => 'AMQP Hostname',
- type => 'string',
- default => 'localhost',
- required => 1,
- },
- {
- name => 'port',
- label => 'AMQP Port',
- type => 'string',
- default => '5672',
- required => 1,
- validate => sub {
- $_[0] =~ /\D/ && die "Invalid port (must be numeric)\n";
- },
- },
- {
- name => 'username',
- label => 'Username',
- type => 'string',
- default => 'guest',
- required => 1,
- },
- {
- name => 'password',
- label => 'Password',
- type => 'password',
- default => 'guest',
- required => 1,
- },
- {
- name => 'vhost',
- label => 'Virtual Host',
- type => 'string',
- default => '/',
- required => 1,
- },
- {
- name => 'exchange',
- label => 'Exchange',
- type => 'string',
- default => '',
- required => 1,
- },
- {
- name => 'queue',
- label => 'Queue',
- type => 'string',
- },
- );
+ return (
+ {
+ name => 'host',
+ label => 'AMQP Hostname',
+ type => 'string',
+ default => 'localhost',
+ required => 1,
+ },
+ {
+ name => 'port',
+ label => 'AMQP Port',
+ type => 'string',
+ default => '5672',
+ required => 1,
+ validate => sub {
+ $_[0] =~ /\D/ && die "Invalid port (must be numeric)\n";
+ },
+ },
+ {
+ name => 'username',
+ label => 'Username',
+ type => 'string',
+ default => 'guest',
+ required => 1,
+ },
+ {
+ name => 'password',
+ label => 'Password',
+ type => 'password',
+ default => 'guest',
+ required => 1,
+ },
+ {
+ name => 'vhost',
+ label => 'Virtual Host',
+ type => 'string',
+ default => '/',
+ required => 1,
+ },
+ {
+ name => 'exchange',
+ label => 'Exchange',
+ type => 'string',
+ default => '',
+ required => 1,
+ },
+ {name => 'queue', label => 'Queue', type => 'string',},
+ );
}
sub stop {
- my ($self) = @_;
- if ($self->{mq}) {
- Bugzilla->push_ext->logger->debug('AMQP: disconnecting');
- $self->{mq}->disconnect();
- $self->{mq} = 0;
- }
+ my ($self) = @_;
+ if ($self->{mq}) {
+ Bugzilla->push_ext->logger->debug('AMQP: disconnecting');
+ $self->{mq}->disconnect();
+ $self->{mq} = 0;
+ }
}
sub _connect {
- my ($self) = @_;
- my $logger = Bugzilla->push_ext->logger;
- my $config = $self->config;
-
- $self->stop();
-
- $logger->debug('AMQP: Connecting to RabbitMQ ' . $config->{host} . ':' . $config->{port});
- require Net::RabbitMQ;
- my $mq = Net::RabbitMQ->new();
- $mq->connect(
- $config->{host},
- {
- port => $config->{port},
- user => $config->{username},
- password => $config->{password},
- }
- );
- $self->{mq} = $mq;
-
- $logger->debug('AMQP: Opening channel ' . $self->{channel});
- $self->{mq}->channel_open($self->{channel});
-
- $logger->debug('AMQP: Declaring queue ' . $self->{queue_name});
- $self->{mq}->queue_declare(
- $self->{channel},
- $self->{queue_name},
- {
- passive => 0,
- durable => 1,
- exclusive => 0,
- auto_delete => 0,
- },
- );
+ my ($self) = @_;
+ my $logger = Bugzilla->push_ext->logger;
+ my $config = $self->config;
+
+ $self->stop();
+
+ $logger->debug(
+ 'AMQP: Connecting to RabbitMQ ' . $config->{host} . ':' . $config->{port});
+ require Net::RabbitMQ;
+ my $mq = Net::RabbitMQ->new();
+ $mq->connect(
+ $config->{host},
+ {
+ port => $config->{port},
+ user => $config->{username},
+ password => $config->{password},
+ }
+ );
+ $self->{mq} = $mq;
+
+ $logger->debug('AMQP: Opening channel ' . $self->{channel});
+ $self->{mq}->channel_open($self->{channel});
+
+ $logger->debug('AMQP: Declaring queue ' . $self->{queue_name});
+ $self->{mq}->queue_declare($self->{channel}, $self->{queue_name},
+ {passive => 0, durable => 1, exclusive => 0, auto_delete => 0,},
+ );
}
sub _bind {
- my ($self, $message) = @_;
- my $logger = Bugzilla->push_ext->logger;
- my $config = $self->config;
-
- # bind to queue (also acts to verify the connection is still valid)
- if ($self->{mq}) {
- eval {
- $logger->debug('AMQP: binding queue(' . $self->{queue_name} . ') with exchange(' . $config->{exchange} . ')');
- $self->{mq}->queue_bind(
- $self->{channel},
- $self->{queue_name},
- $config->{exchange},
- $message->routing_key,
- );
- };
- if ($@) {
- $logger->debug('AMQP: ' . clean_error($@));
- $self->{mq} = 0;
- }
+ my ($self, $message) = @_;
+ my $logger = Bugzilla->push_ext->logger;
+ my $config = $self->config;
+
+ # bind to queue (also acts to verify the connection is still valid)
+ if ($self->{mq}) {
+ eval {
+ $logger->debug('AMQP: binding queue('
+ . $self->{queue_name}
+ . ') with exchange('
+ . $config->{exchange}
+ . ')');
+ $self->{mq}->queue_bind(
+ $self->{channel}, $self->{queue_name},
+ $config->{exchange}, $message->routing_key,
+ );
+ };
+ if ($@) {
+ $logger->debug('AMQP: ' . clean_error($@));
+ $self->{mq} = 0;
}
+ }
}
sub should_send {
- my ($self, $message) = @_;
- my $logger = Bugzilla->push_ext->logger;
-
- my $payload = $message->payload_decoded();
- my $target = $payload->{event}->{target};
- my $is_private = $payload->{$target}->{is_private} ? 1 : 0;
- if (!$is_private && exists $payload->{$target}->{bug}) {
- $is_private = $payload->{$target}->{bug}->{is_private} ? 1 : 0;
- }
-
- if ($is_private) {
- # we only want to push the is_private message from the change_set, as
- # this is guaranteed to contain public information only
- if ($message->routing_key !~ /\.modify:is_private$/) {
- $logger->debug('AMQP: Ignoring private message');
- return 0;
- }
- $logger->debug('AMQP: Sending change of message to is_private');
+ my ($self, $message) = @_;
+ my $logger = Bugzilla->push_ext->logger;
+
+ my $payload = $message->payload_decoded();
+ my $target = $payload->{event}->{target};
+ my $is_private = $payload->{$target}->{is_private} ? 1 : 0;
+ if (!$is_private && exists $payload->{$target}->{bug}) {
+ $is_private = $payload->{$target}->{bug}->{is_private} ? 1 : 0;
+ }
+
+ if ($is_private) {
+
+ # we only want to push the is_private message from the change_set, as
+ # this is guaranteed to contain public information only
+ if ($message->routing_key !~ /\.modify:is_private$/) {
+ $logger->debug('AMQP: Ignoring private message');
+ return 0;
}
- return 1;
+ $logger->debug('AMQP: Sending change of message to is_private');
+ }
+ return 1;
}
sub send {
- my ($self, $message) = @_;
- my $logger = Bugzilla->push_ext->logger;
- my $config = $self->config;
-
- # don't push comments to pulse
- if ($message->routing_key =~ /^comment\./) {
- $logger->debug('AMQP: Ignoring comment');
- return PUSH_RESULT_IGNORED;
- }
+ my ($self, $message) = @_;
+ my $logger = Bugzilla->push_ext->logger;
+ my $config = $self->config;
- # don't push private data
- $self->should_push($message)
- || return PUSH_RESULT_IGNORED;
+ # don't push comments to pulse
+ if ($message->routing_key =~ /^comment\./) {
+ $logger->debug('AMQP: Ignoring comment');
+ return PUSH_RESULT_IGNORED;
+ }
- $self->_bind($message);
+ # don't push private data
+ $self->should_push($message) || return PUSH_RESULT_IGNORED;
- eval {
- # reconnect if required
- if (!$self->{mq}) {
- $self->_connect();
- }
-
- # send message
- $logger->debug('AMQP: Publishing message');
- $self->{mq}->publish(
- $self->{channel},
- $message->routing_key,
- $message->payload,
- {
- exchange => $config->{exchange},
- },
- {
- content_type => 'text/plain',
- content_encoding => '8bit',
- },
- );
- };
- if ($@) {
- return (PUSH_RESULT_TRANSIENT, clean_error($@));
+ $self->_bind($message);
+
+ eval {
+ # reconnect if required
+ if (!$self->{mq}) {
+ $self->_connect();
}
- return PUSH_RESULT_OK;
+ # send message
+ $logger->debug('AMQP: Publishing message');
+ $self->{mq}->publish(
+ $self->{channel}, $message->routing_key, $message->payload,
+ {exchange => $config->{exchange},},
+ {content_type => 'text/plain', content_encoding => '8bit',},
+ );
+ };
+ if ($@) {
+ return (PUSH_RESULT_TRANSIENT, clean_error($@));
+ }
+
+ return PUSH_RESULT_OK;
}
1;