summaryrefslogtreecommitdiffstats
path: root/Bugzilla/Elastic
diff options
context:
space:
mode:
Diffstat (limited to 'Bugzilla/Elastic')
-rw-r--r--Bugzilla/Elastic/Indexer.pm280
-rw-r--r--Bugzilla/Elastic/Role/ChildObject.pm16
-rw-r--r--Bugzilla/Elastic/Role/HasClient.pm25
-rw-r--r--Bugzilla/Elastic/Role/HasIndexName.pm16
-rw-r--r--Bugzilla/Elastic/Role/Object.pm48
5 files changed, 385 insertions, 0 deletions
diff --git a/Bugzilla/Elastic/Indexer.pm b/Bugzilla/Elastic/Indexer.pm
new file mode 100644
index 000000000..82f946af9
--- /dev/null
+++ b/Bugzilla/Elastic/Indexer.pm
@@ -0,0 +1,280 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# This Source Code Form is "Incompatible With Secondary Licenses", as
+# defined by the Mozilla Public License, v. 2.0.
+package Bugzilla::Elastic::Indexer;
+
+use 5.10.1;
+use Moo;
+use List::MoreUtils qw(natatime);
+use Storable qw(dclone);
+use namespace::clean;
+
+with 'Bugzilla::Elastic::Role::HasClient';
+with 'Bugzilla::Elastic::Role::HasIndexName';
+
+has 'mtime' => (
+ is => 'lazy',
+ clearer => 'clear_mtime',
+);
+
+has 'shadow_dbh' => ( is => 'lazy' );
+
+has 'debug_sql' => (
+ is => 'ro',
+ default => 0,
+);
+
+has 'progress_bar' => (
+ is => 'ro',
+ predicate => 'has_progress_bar',
+);
+
+sub create_index {
+ my ($self) = @_;
+ my $indices = $self->client->indices;
+
+ $indices->create(
+ index => $self->index_name,
+ body => {
+ settings => {
+ number_of_shards => 1,
+ analysis => {
+ analyzer => {
+ folding => {
+ type => 'standard',
+ tokenizer => 'standard',
+ filter => [ 'lowercase', 'asciifolding' ]
+ },
+ bz_text_analyzer => {
+ type => 'standard',
+ filter => ['lowercase', 'stop'],
+ max_token_length => '20'
+ },
+ bz_substring_analyzer => {
+ type => 'custom',
+ filter => ['lowercase'],
+ tokenizer => 'bz_ngram_tokenizer',
+ },
+ bz_equals_analyzer => {
+ type => 'custom',
+ filter => ['lowercase'],
+ tokenizer => 'keyword',
+ },
+ whiteboard_words => {
+ type => 'custom',
+ tokenizer => 'whiteboard_words_pattern',
+ filter => ['stop']
+ },
+ whiteboard_shingle_words => {
+ type => 'custom',
+ tokenizer => 'whiteboard_words_pattern',
+ filter => ['stop', 'shingle']
+ },
+ whiteboard_tokens => {
+ type => 'custom',
+ tokenizer => 'whiteboard_tokens_pattern',
+ filter => ['stop']
+ },
+ whiteboard_shingle_tokens => {
+ type => 'custom',
+ tokenizer => 'whiteboard_tokens_pattern',
+ filter => ['stop', 'shingle']
+ }
+ },
+ tokenizer => {
+ bz_ngram_tokenizer => {
+ type => 'nGram',
+ min_ngram => 2,
+ max_ngram => 25,
+ },
+ whiteboard_tokens_pattern => {
+ type => 'pattern',
+ pattern => '\\s*([,;]*\\[|\\][\\s\\[]*|[;,])\\s*'
+ },
+ whiteboard_words_pattern => {
+ type => 'pattern',
+ pattern => '[\\[\\];,\\s]+'
+ },
+ },
+ },
+ },
+ }
+ ) unless $indices->exists(index => $self->index_name);
+}
+
+sub _bulk_helper {
+ my ($self, $class) = @_;
+
+ return $self->client->bulk_helper(
+ index => $self->index_name,
+ type => $class->ES_TYPE,
+ );
+}
+
+sub find_largest_mtime {
+ my ($self, $class) = @_;
+
+ my $result = $self->client->search(
+ index => $self->index_name,
+ type => $class->ES_TYPE,
+ body => {
+ aggs => { es_mtime => { extended_stats => { field => 'es_mtime' } } },
+ size => 0
+ }
+ );
+
+ return $result->{aggregations}{es_mtime}{max};
+}
+
+sub find_largest_id {
+ my ($self, $class) = @_;
+
+ my $result = $self->client->search(
+ index => $self->index_name,
+ type => $class->ES_TYPE,
+ body => {
+ aggs => { $class->ID_FIELD => { extended_stats => { field => $class->ID_FIELD } } },
+ size => 0
+ }
+ );
+
+ return $result->{aggregations}{$class->ID_FIELD}{max};
+}
+
+sub put_mapping {
+ my ($self, $class) = @_;
+
+ my %body = ( properties => scalar $class->ES_PROPERTIES );
+ if ($class->does('Bugzilla::Elastic::Role::ChildObject')) {
+ $body{_parent} = { type => $class->ES_PARENT_TYPE };
+ }
+
+ $self->client->indices->put_mapping(
+ index => $self->index_name,
+ type => $class->ES_TYPE,
+ body => \%body,
+ );
+}
+
+sub _debug_sql {
+ my ($self, $sql, $params) = @_;
+ if ($self->debug_sql) {
+ my ($out, @args) = ($sql, $params ? (@$params) : ());
+ $out =~ s/^\n//gs;
+ $out =~ s/^\s{8}//gm;
+ $out =~ s/\?/Bugzilla->dbh->quote(shift @args)/ge;
+ warn $out, "\n";
+ }
+
+ return ($sql, $params)
+}
+
+sub bulk_load {
+ my ( $self, $class ) = @_;
+
+ $self->put_mapping($class);
+ my $bulk = $self->_bulk_helper($class);
+ my $ids = $self->_select_all_ids($class);
+ $self->clear_mtime;
+ $self->_bulk_load_ids($bulk, $class, $ids) if @$ids;
+ undef $ids; # free up some memory
+
+ my $updated_ids = $self->_select_updated_ids($class);
+ if ($updated_ids) {
+ $self->_bulk_load_ids($bulk, $class, $updated_ids) if @$updated_ids;
+ }
+}
+
+sub _select_all_ids {
+ my ($self, $class) = @_;
+
+ my $dbh = Bugzilla->dbh;
+ my $last_id = $self->find_largest_id($class);
+ my ($sql, $params) = $self->_debug_sql($class->ES_SELECT_ALL_SQL($last_id));
+ return $dbh->selectcol_arrayref($sql, undef, @$params);
+}
+
+sub _select_updated_ids {
+ my ($self, $class) = @_;
+
+ my $dbh = Bugzilla->dbh;
+ my $mtime = $self->find_largest_mtime($class);
+ if ($mtime && $mtime != $self->mtime) {
+ my ($updated_sql, $updated_params) = $self->_debug_sql($class->ES_SELECT_UPDATED_SQL($mtime));
+ return $dbh->selectcol_arrayref($updated_sql, undef, @$updated_params);
+ } else {
+ return undef;
+ }
+}
+
+sub bulk_load_ids {
+ my ($self, $class, $ids) = @_;
+
+ $self->put_mapping($class);
+ $self->clear_mtime;
+ $self->_bulk_load_ids($self->_bulk_helper($class), $class, $ids);
+}
+
+sub _bulk_load_ids {
+ my ($self, $bulk, $class, $all_ids) = @_;
+
+ my $iter = natatime $class->ES_OBJECTS_AT_ONCE, @$all_ids;
+ my $mtime = $self->mtime;
+ my $progress_bar;
+ my $next_update;
+
+ if ($self->has_progress_bar) {
+ my $name = (split(/::/, $class))[-1];
+ $progress_bar = $self->progress_bar->new({
+ name => $name,
+ count => scalar @$all_ids,
+ ETA => 'linear'
+ });
+ $progress_bar->message(sprintf "loading %d $class objects, %d at a time", scalar @$all_ids, $class->ES_OBJECTS_AT_ONCE);
+ $next_update = $progress_bar->update(0);
+ $progress_bar->max_update_rate(1);
+ }
+
+ my $total = 0;
+ use Time::HiRes;
+ my $start = time;
+ while (my @ids = $iter->()) {
+ if ($progress_bar) {
+ $total += @ids;
+ if ($total >= $next_update) {
+ $next_update = $progress_bar->update($total);
+ my $duration = time - $start || 1;
+ }
+ }
+
+ my $objects = $class->new_from_list(\@ids);
+ foreach my $object (@$objects) {
+ my %doc = (
+ id => $object->id,
+ source => scalar $object->es_document($mtime),
+ );
+
+ if ($class->does('Bugzilla::Elastic::Role::ChildObject')) {
+ $doc{parent} = $object->es_parent_id;
+ }
+
+ $bulk->index(\%doc);
+ }
+ Bugzilla->_cleanup();
+ }
+
+ $bulk->flush;
+}
+
+sub _build_shadow_dbh { Bugzilla->switch_to_shadow_db }
+
+sub _build_mtime {
+ my ($self) = @_;
+ my ($mtime) = $self->shadow_dbh->selectrow_array("SELECT UNIX_TIMESTAMP(NOW())");
+ return $mtime;
+}
+
+1;
diff --git a/Bugzilla/Elastic/Role/ChildObject.pm b/Bugzilla/Elastic/Role/ChildObject.pm
new file mode 100644
index 000000000..1f7a7483a
--- /dev/null
+++ b/Bugzilla/Elastic/Role/ChildObject.pm
@@ -0,0 +1,16 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# This Source Code Form is "Incompatible With Secondary Licenses", as
+# defined by the Mozilla Public License, v. 2.0.
+package Bugzilla::Elastic::Role::ChildObject;
+
+use 5.10.1;
+use Role::Tiny;
+
+with 'Bugzilla::Elastic::Role::Object';
+
+requires qw(ES_PARENT_TYPE es_parent_id);
+
+1;
diff --git a/Bugzilla/Elastic/Role/HasClient.pm b/Bugzilla/Elastic/Role/HasClient.pm
new file mode 100644
index 000000000..3d52d513a
--- /dev/null
+++ b/Bugzilla/Elastic/Role/HasClient.pm
@@ -0,0 +1,25 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# This Source Code Form is "Incompatible With Secondary Licenses", as
+# defined by the Mozilla Public License, v. 2.0.
+package Bugzilla::Elastic::Role::HasClient;
+
+use 5.10.1;
+use Moo::Role;
+use Search::Elasticsearch;
+
+
+has 'client' => (is => 'lazy');
+
+sub _build_client {
+ my ($self) = @_;
+
+ return Search::Elasticsearch->new(
+ nodes => Bugzilla->params->{elasticsearch_nodes},
+ cxn_pool => 'Sniff',
+ );
+}
+
+1;
diff --git a/Bugzilla/Elastic/Role/HasIndexName.pm b/Bugzilla/Elastic/Role/HasIndexName.pm
new file mode 100644
index 000000000..eaff339cd
--- /dev/null
+++ b/Bugzilla/Elastic/Role/HasIndexName.pm
@@ -0,0 +1,16 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# This Source Code Form is "Incompatible With Secondary Licenses", as
+# defined by the Mozilla Public License, v. 2.0.
+package Bugzilla::Elastic::Role::HasIndexName;
+
+use 5.10.1;
+use Moo::Role;
+use Search::Elasticsearch;
+
+has 'index_name' => ( is => 'ro', default => sub { Bugzilla->params->{elasticsearch_index} } );
+
+
+1;
diff --git a/Bugzilla/Elastic/Role/Object.pm b/Bugzilla/Elastic/Role/Object.pm
new file mode 100644
index 000000000..ad5ab002b
--- /dev/null
+++ b/Bugzilla/Elastic/Role/Object.pm
@@ -0,0 +1,48 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# This Source Code Form is "Incompatible With Secondary Licenses", as
+# defined by the Mozilla Public License, v. 2.0.
+package Bugzilla::Elastic::Role::Object;
+
+use 5.10.1;
+use Role::Tiny;
+
+requires qw(ES_TYPE ES_PROPERTIES es_document);
+requires qw(ID_FIELD DB_TABLE);
+
+sub ES_OBJECTS_AT_ONCE { 100 }
+
+sub ES_SELECT_ALL_SQL {
+ my ($class, $last_id) = @_;
+
+ my $id = $class->ID_FIELD;
+ my $table = $class->DB_TABLE;
+
+ return ("SELECT $id FROM $table WHERE $id > ? ORDER BY $id", [$last_id // 0]);
+}
+
+requires qw(ES_SELECT_UPDATED_SQL);
+
+around 'ES_PROPERTIES' => sub {
+ my $orig = shift;
+ my $self = shift;
+ my $properties = $orig->($self, @_);
+ $properties->{es_mtime} = { type => 'long' };
+ $properties->{$self->ID_FIELD} = { type => 'long', analyzer => 'keyword' };
+
+ return $properties;
+};
+
+around 'es_document' => sub {
+ my ($orig, $self, $mtime) = @_;
+ my $doc = $orig->($self);
+
+ $doc->{es_mtime} = $mtime;
+ $doc->{$self->ID_FIELD} = $self->id;
+
+ return $doc;
+};
+
+1;