diff options
Diffstat (limited to 'Bugzilla/Elastic')
-rw-r--r-- | Bugzilla/Elastic/Indexer.pm | 280 | ||||
-rw-r--r-- | Bugzilla/Elastic/Role/ChildObject.pm | 16 | ||||
-rw-r--r-- | Bugzilla/Elastic/Role/HasClient.pm | 25 | ||||
-rw-r--r-- | Bugzilla/Elastic/Role/HasIndexName.pm | 16 | ||||
-rw-r--r-- | Bugzilla/Elastic/Role/Object.pm | 48 |
5 files changed, 385 insertions, 0 deletions
diff --git a/Bugzilla/Elastic/Indexer.pm b/Bugzilla/Elastic/Indexer.pm new file mode 100644 index 000000000..82f946af9 --- /dev/null +++ b/Bugzilla/Elastic/Indexer.pm @@ -0,0 +1,280 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. +package Bugzilla::Elastic::Indexer; + +use 5.10.1; +use Moo; +use List::MoreUtils qw(natatime); +use Storable qw(dclone); +use namespace::clean; + +with 'Bugzilla::Elastic::Role::HasClient'; +with 'Bugzilla::Elastic::Role::HasIndexName'; + +has 'mtime' => ( + is => 'lazy', + clearer => 'clear_mtime', +); + +has 'shadow_dbh' => ( is => 'lazy' ); + +has 'debug_sql' => ( + is => 'ro', + default => 0, +); + +has 'progress_bar' => ( + is => 'ro', + predicate => 'has_progress_bar', +); + +sub create_index { + my ($self) = @_; + my $indices = $self->client->indices; + + $indices->create( + index => $self->index_name, + body => { + settings => { + number_of_shards => 1, + analysis => { + analyzer => { + folding => { + type => 'standard', + tokenizer => 'standard', + filter => [ 'lowercase', 'asciifolding' ] + }, + bz_text_analyzer => { + type => 'standard', + filter => ['lowercase', 'stop'], + max_token_length => '20' + }, + bz_substring_analyzer => { + type => 'custom', + filter => ['lowercase'], + tokenizer => 'bz_ngram_tokenizer', + }, + bz_equals_analyzer => { + type => 'custom', + filter => ['lowercase'], + tokenizer => 'keyword', + }, + whiteboard_words => { + type => 'custom', + tokenizer => 'whiteboard_words_pattern', + filter => ['stop'] + }, + whiteboard_shingle_words => { + type => 'custom', + tokenizer => 'whiteboard_words_pattern', + filter => ['stop', 'shingle'] + }, + whiteboard_tokens => { + type => 'custom', + tokenizer => 'whiteboard_tokens_pattern', + filter => ['stop'] + }, + whiteboard_shingle_tokens => { + type => 'custom', + tokenizer => 'whiteboard_tokens_pattern', + filter => ['stop', 'shingle'] + } + }, + tokenizer => { + bz_ngram_tokenizer => { + type => 'nGram', + min_ngram => 2, + max_ngram => 25, + }, + whiteboard_tokens_pattern => { + type => 'pattern', + pattern => '\\s*([,;]*\\[|\\][\\s\\[]*|[;,])\\s*' + }, + whiteboard_words_pattern => { + type => 'pattern', + pattern => '[\\[\\];,\\s]+' + }, + }, + }, + }, + } + ) unless $indices->exists(index => $self->index_name); +} + +sub _bulk_helper { + my ($self, $class) = @_; + + return $self->client->bulk_helper( + index => $self->index_name, + type => $class->ES_TYPE, + ); +} + +sub find_largest_mtime { + my ($self, $class) = @_; + + my $result = $self->client->search( + index => $self->index_name, + type => $class->ES_TYPE, + body => { + aggs => { es_mtime => { extended_stats => { field => 'es_mtime' } } }, + size => 0 + } + ); + + return $result->{aggregations}{es_mtime}{max}; +} + +sub find_largest_id { + my ($self, $class) = @_; + + my $result = $self->client->search( + index => $self->index_name, + type => $class->ES_TYPE, + body => { + aggs => { $class->ID_FIELD => { extended_stats => { field => $class->ID_FIELD } } }, + size => 0 + } + ); + + return $result->{aggregations}{$class->ID_FIELD}{max}; +} + +sub put_mapping { + my ($self, $class) = @_; + + my %body = ( properties => scalar $class->ES_PROPERTIES ); + if ($class->does('Bugzilla::Elastic::Role::ChildObject')) { + $body{_parent} = { type => $class->ES_PARENT_TYPE }; + } + + $self->client->indices->put_mapping( + index => $self->index_name, + type => $class->ES_TYPE, + body => \%body, + ); +} + +sub _debug_sql { + my ($self, $sql, $params) = @_; + if ($self->debug_sql) { + my ($out, @args) = ($sql, $params ? (@$params) : ()); + $out =~ s/^\n//gs; + $out =~ s/^\s{8}//gm; + $out =~ s/\?/Bugzilla->dbh->quote(shift @args)/ge; + warn $out, "\n"; + } + + return ($sql, $params) +} + +sub bulk_load { + my ( $self, $class ) = @_; + + $self->put_mapping($class); + my $bulk = $self->_bulk_helper($class); + my $ids = $self->_select_all_ids($class); + $self->clear_mtime; + $self->_bulk_load_ids($bulk, $class, $ids) if @$ids; + undef $ids; # free up some memory + + my $updated_ids = $self->_select_updated_ids($class); + if ($updated_ids) { + $self->_bulk_load_ids($bulk, $class, $updated_ids) if @$updated_ids; + } +} + +sub _select_all_ids { + my ($self, $class) = @_; + + my $dbh = Bugzilla->dbh; + my $last_id = $self->find_largest_id($class); + my ($sql, $params) = $self->_debug_sql($class->ES_SELECT_ALL_SQL($last_id)); + return $dbh->selectcol_arrayref($sql, undef, @$params); +} + +sub _select_updated_ids { + my ($self, $class) = @_; + + my $dbh = Bugzilla->dbh; + my $mtime = $self->find_largest_mtime($class); + if ($mtime && $mtime != $self->mtime) { + my ($updated_sql, $updated_params) = $self->_debug_sql($class->ES_SELECT_UPDATED_SQL($mtime)); + return $dbh->selectcol_arrayref($updated_sql, undef, @$updated_params); + } else { + return undef; + } +} + +sub bulk_load_ids { + my ($self, $class, $ids) = @_; + + $self->put_mapping($class); + $self->clear_mtime; + $self->_bulk_load_ids($self->_bulk_helper($class), $class, $ids); +} + +sub _bulk_load_ids { + my ($self, $bulk, $class, $all_ids) = @_; + + my $iter = natatime $class->ES_OBJECTS_AT_ONCE, @$all_ids; + my $mtime = $self->mtime; + my $progress_bar; + my $next_update; + + if ($self->has_progress_bar) { + my $name = (split(/::/, $class))[-1]; + $progress_bar = $self->progress_bar->new({ + name => $name, + count => scalar @$all_ids, + ETA => 'linear' + }); + $progress_bar->message(sprintf "loading %d $class objects, %d at a time", scalar @$all_ids, $class->ES_OBJECTS_AT_ONCE); + $next_update = $progress_bar->update(0); + $progress_bar->max_update_rate(1); + } + + my $total = 0; + use Time::HiRes; + my $start = time; + while (my @ids = $iter->()) { + if ($progress_bar) { + $total += @ids; + if ($total >= $next_update) { + $next_update = $progress_bar->update($total); + my $duration = time - $start || 1; + } + } + + my $objects = $class->new_from_list(\@ids); + foreach my $object (@$objects) { + my %doc = ( + id => $object->id, + source => scalar $object->es_document($mtime), + ); + + if ($class->does('Bugzilla::Elastic::Role::ChildObject')) { + $doc{parent} = $object->es_parent_id; + } + + $bulk->index(\%doc); + } + Bugzilla->_cleanup(); + } + + $bulk->flush; +} + +sub _build_shadow_dbh { Bugzilla->switch_to_shadow_db } + +sub _build_mtime { + my ($self) = @_; + my ($mtime) = $self->shadow_dbh->selectrow_array("SELECT UNIX_TIMESTAMP(NOW())"); + return $mtime; +} + +1; diff --git a/Bugzilla/Elastic/Role/ChildObject.pm b/Bugzilla/Elastic/Role/ChildObject.pm new file mode 100644 index 000000000..1f7a7483a --- /dev/null +++ b/Bugzilla/Elastic/Role/ChildObject.pm @@ -0,0 +1,16 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. +package Bugzilla::Elastic::Role::ChildObject; + +use 5.10.1; +use Role::Tiny; + +with 'Bugzilla::Elastic::Role::Object'; + +requires qw(ES_PARENT_TYPE es_parent_id); + +1; diff --git a/Bugzilla/Elastic/Role/HasClient.pm b/Bugzilla/Elastic/Role/HasClient.pm new file mode 100644 index 000000000..3d52d513a --- /dev/null +++ b/Bugzilla/Elastic/Role/HasClient.pm @@ -0,0 +1,25 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. +package Bugzilla::Elastic::Role::HasClient; + +use 5.10.1; +use Moo::Role; +use Search::Elasticsearch; + + +has 'client' => (is => 'lazy'); + +sub _build_client { + my ($self) = @_; + + return Search::Elasticsearch->new( + nodes => Bugzilla->params->{elasticsearch_nodes}, + cxn_pool => 'Sniff', + ); +} + +1; diff --git a/Bugzilla/Elastic/Role/HasIndexName.pm b/Bugzilla/Elastic/Role/HasIndexName.pm new file mode 100644 index 000000000..eaff339cd --- /dev/null +++ b/Bugzilla/Elastic/Role/HasIndexName.pm @@ -0,0 +1,16 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. +package Bugzilla::Elastic::Role::HasIndexName; + +use 5.10.1; +use Moo::Role; +use Search::Elasticsearch; + +has 'index_name' => ( is => 'ro', default => sub { Bugzilla->params->{elasticsearch_index} } ); + + +1; diff --git a/Bugzilla/Elastic/Role/Object.pm b/Bugzilla/Elastic/Role/Object.pm new file mode 100644 index 000000000..ad5ab002b --- /dev/null +++ b/Bugzilla/Elastic/Role/Object.pm @@ -0,0 +1,48 @@ +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. +package Bugzilla::Elastic::Role::Object; + +use 5.10.1; +use Role::Tiny; + +requires qw(ES_TYPE ES_PROPERTIES es_document); +requires qw(ID_FIELD DB_TABLE); + +sub ES_OBJECTS_AT_ONCE { 100 } + +sub ES_SELECT_ALL_SQL { + my ($class, $last_id) = @_; + + my $id = $class->ID_FIELD; + my $table = $class->DB_TABLE; + + return ("SELECT $id FROM $table WHERE $id > ? ORDER BY $id", [$last_id // 0]); +} + +requires qw(ES_SELECT_UPDATED_SQL); + +around 'ES_PROPERTIES' => sub { + my $orig = shift; + my $self = shift; + my $properties = $orig->($self, @_); + $properties->{es_mtime} = { type => 'long' }; + $properties->{$self->ID_FIELD} = { type => 'long', analyzer => 'keyword' }; + + return $properties; +}; + +around 'es_document' => sub { + my ($orig, $self, $mtime) = @_; + my $doc = $orig->($self); + + $doc->{es_mtime} = $mtime; + $doc->{$self->ID_FIELD} = $self->id; + + return $doc; +}; + +1; |