diff options
Diffstat (limited to 'extensions/Push/lib/Connector')
-rw-r--r-- | extensions/Push/lib/Connector/AMQP.pm | 230 | ||||
-rw-r--r-- | extensions/Push/lib/Connector/Base.pm | 106 | ||||
-rw-r--r-- | extensions/Push/lib/Connector/File.pm | 68 | ||||
-rw-r--r-- | extensions/Push/lib/Connector/TCL.pm | 315 |
4 files changed, 719 insertions, 0 deletions
diff --git a/extensions/Push/lib/Connector/AMQP.pm b/extensions/Push/lib/Connector/AMQP.pm new file mode 100644 index 000000000..7b7d4aa72 --- /dev/null +++ b/extensions/Push/lib/Connector/AMQP.pm @@ -0,0 +1,230 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + +package Bugzilla::Extension::Push::Connector::AMQP; + +use strict; +use warnings; + +use base 'Bugzilla::Extension::Push::Connector::Base'; + +use Bugzilla::Constants; +use Bugzilla::Extension::Push::Constants; +use Bugzilla::Extension::Push::Util; +use Bugzilla::Util qw(generate_random_password); +use DateTime; + +sub init { + my ($self) = @_; + $self->{mq} = 0; + $self->{channel} = 1; + + if ($self->config->{queue}) { + $self->{queue_name} = $self->config->{queue}; + } else { + my $queue_name = Bugzilla->params->{'urlbase'}; + $queue_name =~ s#^https?://##; + $queue_name =~ s#/$#|#; + $queue_name .= generate_random_password(16); + $self->{queue_name} = $queue_name; + } +} + +sub options { + return ( + { + name => 'host', + label => 'AMQP Hostname', + type => 'string', + default => 'localhost', + required => 1, + }, + { + name => 'port', + label => 'AMQP Port', + type => 'string', + default => '5672', + required => 1, + validate => sub { + $_[0] =~ /\D/ && die "Invalid port (must be numeric)\n"; + }, + }, + { + name => 'username', + label => 'Username', + type => 'string', + default => 'guest', + required => 1, + }, + { + name => 'password', + label => 'Password', + type => 'password', + default => 'guest', + required => 1, + }, + { + name => 'vhost', + label => 'Virtual Host', + type => 'string', + default => '/', + required => 1, + }, + { + name => 'exchange', + label => 'Exchange', + type => 'string', + default => '', + required => 1, + }, + { + name => 'queue', + label => 'Queue', + type => 'string', + }, + ); +} + +sub stop { + my ($self) = @_; + if ($self->{mq}) { + Bugzilla->push_ext->logger->debug('AMQP: disconnecting'); + $self->{mq}->disconnect(); + $self->{mq} = 0; + } +} + +sub _connect { + my ($self) = @_; + my $logger = Bugzilla->push_ext->logger; + my $config = $self->config; + + $self->stop(); + + $logger->debug('AMQP: Connecting to RabbitMQ ' . $config->{host} . ':' . $config->{port}); + require Net::RabbitMQ; + my $mq = Net::RabbitMQ->new(); + $mq->connect( + $config->{host}, + { + port => $config->{port}, + user => $config->{username}, + password => $config->{password}, + } + ); + $self->{mq} = $mq; + + $logger->debug('AMQP: Opening channel ' . $self->{channel}); + $self->{mq}->channel_open($self->{channel}); + + $logger->debug('AMQP: Declaring queue ' . $self->{queue_name}); + $self->{mq}->queue_declare( + $self->{channel}, + $self->{queue_name}, + { + passive => 0, + durable => 1, + exclusive => 0, + auto_delete => 0, + }, + ); +} + +sub _bind { + my ($self, $message) = @_; + my $logger = Bugzilla->push_ext->logger; + my $config = $self->config; + + # bind to queue (also acts to verify the connection is still valid) + if ($self->{mq}) { + eval { + $logger->debug('AMQP: binding queue(' . $self->{queue_name} . ') with exchange(' . $config->{exchange} . ')'); + $self->{mq}->queue_bind( + $self->{channel}, + $self->{queue_name}, + $config->{exchange}, + $message->routing_key, + ); + }; + if ($@) { + $logger->debug('AMQP: ' . clean_error($@)); + $self->{mq} = 0; + } + } + +} + +sub should_send { + my ($self, $message) = @_; + my $logger = Bugzilla->push_ext->logger; + + my $payload = $message->payload_decoded(); + my $target = $payload->{event}->{target}; + my $is_private = $payload->{$target}->{is_private} ? 1 : 0; + if (!$is_private && exists $payload->{$target}->{bug}) { + $is_private = $payload->{$target}->{bug}->{is_private} ? 1 : 0; + } + + if ($is_private) { + # we only want to push the is_private message from the change_set, as + # this is guaranteed to contain public information only + if ($message->routing_key !~ /\.modify:is_private$/) { + $logger->debug('AMQP: Ignoring private message'); + return 0; + } + $logger->debug('AMQP: Sending change of message to is_private'); + } + return 1; +} + +sub send { + my ($self, $message) = @_; + my $logger = Bugzilla->push_ext->logger; + my $config = $self->config; + + # don't push comments to pulse + if ($message->routing_key =~ /^comment\./) { + $logger->debug('AMQP: Ignoring comment'); + return PUSH_RESULT_IGNORED; + } + + # don't push private data + $self->should_push($message) + || return PUSH_RESULT_IGNORED; + + $self->_bind($message); + + eval { + # reconnect if required + if (!$self->{mq}) { + $self->_connect(); + } + + # send message + $logger->debug('AMQP: Publishing message'); + $self->{mq}->publish( + $self->{channel}, + $message->routing_key, + $message->payload, + { + exchange => $config->{exchange}, + }, + { + content_type => 'text/plain', + content_encoding => '8bit', + }, + ); + }; + if ($@) { + return (PUSH_RESULT_TRANSIENT, clean_error($@)); + } + + return PUSH_RESULT_OK; +} + +1; + diff --git a/extensions/Push/lib/Connector/Base.pm b/extensions/Push/lib/Connector/Base.pm new file mode 100644 index 000000000..290ea9740 --- /dev/null +++ b/extensions/Push/lib/Connector/Base.pm @@ -0,0 +1,106 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + +package Bugzilla::Extension::Push::Connector::Base; + +use strict; +use warnings; + +use Bugzilla; +use Bugzilla::Extension::Push::Config; +use Bugzilla::Extension::Push::BacklogMessage; +use Bugzilla::Extension::Push::BacklogQueue; +use Bugzilla::Extension::Push::Backoff; + +sub new { + my ($class) = @_; + my $self = {}; + bless($self, $class); + ($self->{name}) = $class =~ /^.+:(.+)$/; + $self->init(); + return $self; +} + +sub name { + my $self = shift; + return $self->{name}; +} + +sub init { + my ($self) = @_; + # abstract + # perform any initialisation here + # will be run when created by the web pages or by the daemon + # and also when the configuration needs to be reloaded +} + +sub stop { + my ($self) = @_; + # abstract + # run from the daemon only; disconnect from remote hosts, etc +} + +sub should_send { + my ($self, $message) = @_; + # abstract + # return boolean indicating if the connector will be sending the message. + # this will be called each message, and should be a very quick simple test. + # the connector can perform a more exhaustive test in the send() method. + return 0; +} + +sub send { + my ($self, $message) = @_; + # abstract + # deliver the message, daemon only +} + +sub options { + my ($self) = @_; + # abstract + # return an array of configuration variables + return (); +} + +sub options_validate { + my ($class, $config) = @_; + # abstract, static + # die if a combination of options in $config is invalid +} + +# +# +# + +sub config { + my ($self) = @_; + if (!$self->{config}) { + $self->load_config(); + } + return $self->{config}; +} + +sub load_config { + my ($self) = @_; + my $config = Bugzilla::Extension::Push::Config->new($self->name, $self->options); + $config->load(); + $self->{config} = $config; +} + +sub enabled { + my ($self) = @_; + return $self->config->{enabled} eq 'Enabled'; +} + +sub backlog { + my ($self) = @_; + $self->{backlog} ||= Bugzilla::Extension::Push::BacklogQueue->new($self->name); + return $self->{backlog}; +} + +1; + diff --git a/extensions/Push/lib/Connector/File.pm b/extensions/Push/lib/Connector/File.pm new file mode 100644 index 000000000..2a8f4193d --- /dev/null +++ b/extensions/Push/lib/Connector/File.pm @@ -0,0 +1,68 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + +package Bugzilla::Extension::Push::Connector::File; + +use strict; +use warnings; + +use base 'Bugzilla::Extension::Push::Connector::Base'; + +use Bugzilla::Constants; +use Bugzilla::Extension::Push::Constants; +use Bugzilla::Extension::Push::Util; +use Encode; +use FileHandle; + +sub init { + my ($self) = @_; +} + +sub options { + return ( + { + name => 'filename', + label => 'Filename', + type => 'string', + default => 'push.log', + required => 1, + validate => sub { + my $filename = shift; + $filename =~ m#^/# + && die "Absolute paths are not permitted\n"; + }, + }, + ); +} + +sub should_send { + my ($self, $message) = @_; + return 1; +} + +sub send { + my ($self, $message) = @_; + + # pretty-format json payload + my $payload = $message->payload_decoded; + $payload = to_json($payload, 1); + + my $filename = bz_locations()->{'datadir'} . '/' . $self->config->{filename}; + Bugzilla->push_ext->logger->debug("File: Appending to $filename"); + my $fh = FileHandle->new(">>$filename"); + $fh->binmode(':utf8'); + $fh->print( + "[" . scalar(localtime) . "]\n" . + $payload . "\n\n" + ); + $fh->close; + + return PUSH_RESULT_OK; +} + +1; + diff --git a/extensions/Push/lib/Connector/TCL.pm b/extensions/Push/lib/Connector/TCL.pm new file mode 100644 index 000000000..b2fb818fd --- /dev/null +++ b/extensions/Push/lib/Connector/TCL.pm @@ -0,0 +1,315 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. + +package Bugzilla::Extension::Push::Connector::TCL; + +use strict; +use warnings; + +use base 'Bugzilla::Extension::Push::Connector::Base'; + +use Bugzilla::Constants; +use Bugzilla::Extension::Push::Constants; +use Bugzilla::Extension::Push::Serialise; +use Bugzilla::Extension::Push::Util; +use Bugzilla::User; + +use Digest::MD5 qw(md5_hex); + +sub options { + return ( + { + name => 'tcl_user', + label => 'Bugzilla TCL User', + type => 'string', + default => 'tcl@bugzilla.tld', + required => 1, + validate => sub { + Bugzilla::User->new({ name => $_[0] }) + || die "Invalid Bugzilla user ($_[0])\n"; + }, + }, + { + name => 'sftp_host', + label => 'SFTP Host', + type => 'string', + default => '', + required => 1, + }, + { + name => 'sftp_port', + label => 'SFTP Port', + type => 'string', + default => '22', + required => 1, + validate => sub { + $_[0] =~ /\D/ && die "SFTP Port must be an integer\n"; + }, + }, + { + name => 'sftp_user', + label => 'SFTP Username', + type => 'string', + default => '', + required => 1, + }, + { + name => 'sftp_pass', + label => 'SFTP Password', + type => 'password', + default => '', + required => 1, + }, + { + name => 'sftp_remote_path', + label => 'SFTP Remote Path', + type => 'string', + default => '', + required => 0, + }, + ); +} + +my $_instance; + +sub init { + my ($self) = @_; + $_instance = $self; +} + +sub load_config { + my ($self) = @_; + $self->SUPER::load_config(@_); +} + +sub should_send { + my ($self, $message) = @_; + + my $data = $message->payload_decoded; + my $bug_data = $self->_get_bug_data($data) + || return 0; + + # sanity check user + $self->{tcl_user} ||= Bugzilla::User->new({ name => $self->config->{tcl_user} }); + if (!$self->{tcl_user} || !$self->{tcl_user}->is_enabled) { + return 0; + } + + # only send bugs created by the tcl user + unless ($bug_data->{reporter}->{id} == $self->{tcl_user}->id) { + return 0; + } + + # don't push changes made by the tcl user + if ($data->{event}->{user}->{id} == $self->{tcl_user}->id) { + return 0; + } + + # send comments + if ($data->{event}->{routing_key} eq 'comment.create') { + return 0 if $data->{comment}->{is_private}; + return 1; + } + + # send status and resolution updates + foreach my $change (@{ $data->{event}->{changes} }) { + return 1 if $change->{field} eq 'bug_status' || $change->{field} eq 'resolution'; + } + + # and nothing else + return 0; +} + +sub send { + my ($self, $message) = @_; + my $logger = Bugzilla->push_ext->logger; + my $config = $self->config; + + require XML::Simple; + require Net::SFTP; + + $self->{tcl_user} ||= Bugzilla::User->new({ name => $self->config->{tcl_user} }); + if (!$self->{tcl_user}) { + return (PUSH_RESULT_TRANSIENT, "Invalid bugzilla-user (" . $self->config->{tcl_user} . ")"); + } + + # load the bug + my $data = $message->payload_decoded; + my $bug_data = $self->_get_bug_data($data); + + # build payload + my %xml = ( + Mozilla_ID => $bug_data->{id}, + When => $data->{event}->{time}, + Who => $data->{event}->{user}->{login}, + Status => $bug_data->{status}->{name}, + Resolution => $bug_data->{resolution}, + ); + if ($data->{event}->{routing_key} eq 'comment.create') { + $xml{Comment} = $data->{comment}->{body}; + } + + # convert to xml + my $xml = XML::Simple::XMLout( + \%xml, + NoAttr => 1, + RootName => 'sync', + XMLDecl => 1, + ); + + # generate md5 + my $md5 = md5_hex($xml); + + # build filename + my ($sec, $min, $hour, $day, $mon, $year) = localtime(time); + my $change_set = $data->{event}->{change_set}; + $change_set =~ s/\.//g; + my $filename = sprintf( + '%04s%02d%02d%02d%02d%02d%s', + $year + 1900, + $mon + 1, + $day, + $hour, + $min, + $sec, + $change_set, + ); + + # create temp files; + my $temp_dir = File::Temp::Directory->new(); + my $local_dir = $temp_dir->dirname; + _write_file("$local_dir/$filename.sync", $xml); + _write_file("$local_dir/$filename.sync.check", $md5); + _write_file("$local_dir/$filename.done", ''); + + my $remote_dir = $self->config->{sftp_remote_path} eq '' + ? '' + : $self->config->{sftp_remote_path} . '/'; + + # send files via sftp + $logger->debug("Connecting to " . $self->config->{sftp_host} . ":" . $self->config->{sftp_port}); + my $sftp = Net::SFTP->new( + $self->config->{sftp_host}, + ssh_args => { + port => $self->config->{sftp_port}, + }, + user => $self->config->{sftp_user}, + password => $self->config->{sftp_pass}, + ); + + $logger->debug("Uploading $local_dir/$filename.sync"); + $sftp->put("$local_dir/$filename.sync", "$remote_dir$filename.sync") + or return (PUSH_RESULT_ERROR, "Failed to upload $local_dir/$filename.sync"); + + $logger->debug("Uploading $local_dir/$filename.sync.check"); + $sftp->put("$local_dir/$filename.sync.check", "$remote_dir$filename.sync.check") + or return (PUSH_RESULT_ERROR, "Failed to upload $local_dir/$filename.sync.check"); + + $logger->debug("Uploading $local_dir/$filename.done"); + $sftp->put("$local_dir/$filename.done", "$remote_dir$filename.done") + or return (PUSH_RESULT_ERROR, "Failed to upload $local_dir/$filename.done"); + + # success + return (PUSH_RESULT_OK, "uploaded $filename.sync"); +} + +sub _get_bug_data { + my ($self, $data) = @_; + my $target = $data->{event}->{target}; + if ($target eq 'bug') { + return $data->{bug}; + } elsif (exists $data->{$target}->{bug}) { + return $data->{$target}->{bug}; + } else { + return; + } +} + +sub _write_file { + my ($filename, $content) = @_; + open(my $fh, ">$filename") or die "Failed to write to $filename: $!\n"; + print $fh $content; + close($fh) or die "Failed to write to $filename: $!\n"; +} + +1; + +# File::Temp->newdir() requires a newer version of File::Temp than we have on +# production, so here's a small inline package which performs the same task. + +package File::Temp::Directory; + +use strict; +use warnings; + +use File::Temp; +use File::Path qw(rmtree); +use File::Spec; + +my @chars; + +sub new { + my ($class) = @_; + my $self = {}; + bless($self, $class); + + @chars = qw/ A B C D E F G H I J K L M N O P Q R S T U V W X Y Z + a b c d e f g h i j k l m n o p q r s t u v w x y z + 0 1 2 3 4 5 6 7 8 9 _ + /; + + $self->{TEMPLATE} = File::Spec->catdir(File::Spec->tmpdir, 'X' x 10); + $self->{DIRNAME} = $self->_mktemp(); + return $self; +} + +sub _mktemp { + my ($self) = @_; + my $path = $self->_random_name(); + while(1) { + if (mkdir($path, 0700)) { + # in case of odd umask + chmod(0700, $path); + return $path; + } else { + # abort with error if the reason for failure was anything except eexist + die "Could not create directory $path: $!\n" unless ($!{EEXIST}); + # loop round for another try + } + $path = $self->_random_name(); + } + + return $path; +} + +sub _random_name { + my ($self) = @_; + my $path = $self->{TEMPLATE}; + $path =~ s/X/$chars[int(rand(@chars))]/ge; + return $path; +} + +sub dirname { + my ($self) = @_; + return $self->{DIRNAME}; +} + +sub DESTROY { + my ($self) = @_; + local($., $@, $!, $^E, $?); + if (-d $self->{DIRNAME}) { + # Some versions of rmtree will abort if you attempt to remove the + # directory you are sitting in. We protect that and turn it into a + # warning. We do this because this occurs during object destruction and + # so can not be caught by the user. + eval { rmtree($self->{DIRNAME}, 0, 0); }; + warn $@ if ($@ && $^W); + } +} + +1; + |