diff options
author | Dylan William Hardison <dylan@hardison.net> | 2017-07-06 23:59:30 +0200 |
---|---|---|
committer | Dylan William Hardison <dylan@hardison.net> | 2017-07-07 00:19:39 +0200 |
commit | e6d1b6fe2ad6e341ab63c30bde2544e5a8b77b31 (patch) | |
tree | d62facb48cc7df8eaa736bec9a24d3a55066f6b1 /Bugzilla | |
parent | 248a7d4a9604f01df058dd2427897ce7c8f95635 (diff) | |
download | bugzilla-e6d1b6fe2ad6e341ab63c30bde2544e5a8b77b31.tar.gz bugzilla-e6d1b6fe2ad6e341ab63c30bde2544e5a8b77b31.tar.xz |
Bug 1377620 - Elasticsearch bulk indexer ignores changes that happened before newest additions
Diffstat (limited to 'Bugzilla')
-rw-r--r-- | Bugzilla/Elastic/Indexer.pm | 83 |
1 files changed, 38 insertions, 45 deletions
diff --git a/Bugzilla/Elastic/Indexer.pm b/Bugzilla/Elastic/Indexer.pm index fce8f1053..b691e0687 100644 --- a/Bugzilla/Elastic/Indexer.pm +++ b/Bugzilla/Elastic/Indexer.pm @@ -10,16 +10,12 @@ use 5.10.1; use Moo; use List::MoreUtils qw(natatime); use Storable qw(dclone); +use Scalar::Util qw(looks_like_number); use namespace::clean; with 'Bugzilla::Elastic::Role::HasClient'; with 'Bugzilla::Elastic::Role::HasIndexName'; -has 'mtime' => ( - is => 'lazy', - clearer => 'clear_mtime', -); - has 'shadow_dbh' => ( is => 'lazy' ); has 'debug_sql' => ( @@ -109,34 +105,41 @@ sub _bulk_helper { ); } -sub find_largest_mtime { - my ($self, $class) = @_; + +sub _find_largest { + my ($self, $class, $field) = @_; my $result = $self->client->search( index => $self->index_name, type => $class->ES_TYPE, body => { - aggs => { es_mtime => { extended_stats => { field => 'es_mtime' } } }, + aggs => { $field => { extended_stats => { field => $field } } }, size => 0 } ); - return $result->{aggregations}{es_mtime}{max}; + my $max = $result->{aggregations}{$field}{max}; + if (not defined $max) { + return 0; + } + elsif (looks_like_number($max)) { + return $max; + } + else { + die "largest value for '$field' is not a number: $max"; + } } -sub find_largest_id { +sub _find_largest_mtime { my ($self, $class) = @_; - my $result = $self->client->search( - index => $self->index_name, - type => $class->ES_TYPE, - body => { - aggs => { $class->ID_FIELD => { extended_stats => { field => $class->ID_FIELD } } }, - size => 0 - } - ); + return $self->_find_largest($class, 'es_mtime'); +} - return $result->{aggregations}{$class->ID_FIELD}{max}; +sub _find_largest_id { + my ($self, $class) = @_; + + return $self->_find_largest($class, $class->ID_FIELD); } sub put_mapping { @@ -170,46 +173,36 @@ sub _debug_sql { sub bulk_load { my ( $self, $class ) = @_; - $self->put_mapping($class); - my $bulk = $self->_bulk_helper($class); - my $ids = $self->_select_all_ids($class); - $self->clear_mtime; - $self->_bulk_load_ids($bulk, $class, $ids) if @$ids; - undef $ids; # free up some memory - - my $updated_ids = $self->_select_updated_ids($class); - if ($updated_ids) { - $self->_bulk_load_ids($bulk, $class, $updated_ids) if @$updated_ids; - } + my $bulk = $self->_bulk_helper($class); + my $last_mtime = $self->_find_largest_mtime($class); + my $last_id = $self->_find_largest_id($class); + my $new_ids = $self->_select_all_ids($class, $last_id); + my $updated_ids = $self->_select_updated_ids($class, $last_mtime); + + $self->_bulk_load_ids($bulk, $class, $new_ids) if @$new_ids; + $self->_bulk_load_ids($bulk, $class, $updated_ids) if @$updated_ids; } sub _select_all_ids { - my ($self, $class) = @_; + my ($self, $class, $last_id) = @_; - my $dbh = Bugzilla->dbh; - my $last_id = $self->find_largest_id($class); + my $dbh = Bugzilla->dbh; my ($sql, $params) = $self->_debug_sql($class->ES_SELECT_ALL_SQL($last_id)); return $dbh->selectcol_arrayref($sql, undef, @$params); } sub _select_updated_ids { - my ($self, $class) = @_; + my ($self, $class, $last_mtime) = @_; - my $dbh = Bugzilla->dbh; - my $mtime = $self->find_largest_mtime($class); - if ($mtime && $mtime != $self->mtime) { - my ($updated_sql, $updated_params) = $self->_debug_sql($class->ES_SELECT_UPDATED_SQL($mtime)); - return $dbh->selectcol_arrayref($updated_sql, undef, @$updated_params); - } else { - return undef; - } + my $dbh = Bugzilla->dbh; + my ($updated_sql, $updated_params) = $self->_debug_sql($class->ES_SELECT_UPDATED_SQL($last_mtime)); + return $dbh->selectcol_arrayref($updated_sql, undef, @$updated_params); } sub bulk_load_ids { my ($self, $class, $ids) = @_; $self->put_mapping($class); - $self->clear_mtime; $self->_bulk_load_ids($self->_bulk_helper($class), $class, $ids); } @@ -217,7 +210,7 @@ sub _bulk_load_ids { my ($self, $bulk, $class, $all_ids) = @_; my $iter = natatime $class->ES_OBJECTS_AT_ONCE, @$all_ids; - my $mtime = $self->mtime; + my $mtime = $self->_current_mtime; my $progress_bar; my $next_update; @@ -266,7 +259,7 @@ sub _bulk_load_ids { sub _build_shadow_dbh { Bugzilla->switch_to_shadow_db } -sub _build_mtime { +sub _current_mtime { my ($self) = @_; my ($mtime) = $self->shadow_dbh->selectrow_array("SELECT UNIX_TIMESTAMP(NOW())"); return $mtime; |