summaryrefslogtreecommitdiffstats
path: root/Bugzilla/Elastic
diff options
context:
space:
mode:
authorDylan William Hardison <dylan@hardison.net>2017-07-06 23:59:30 +0200
committerDylan William Hardison <dylan@hardison.net>2017-07-07 00:19:39 +0200
commite6d1b6fe2ad6e341ab63c30bde2544e5a8b77b31 (patch)
treed62facb48cc7df8eaa736bec9a24d3a55066f6b1 /Bugzilla/Elastic
parent248a7d4a9604f01df058dd2427897ce7c8f95635 (diff)
downloadbugzilla-e6d1b6fe2ad6e341ab63c30bde2544e5a8b77b31.tar.gz
bugzilla-e6d1b6fe2ad6e341ab63c30bde2544e5a8b77b31.tar.xz
Bug 1377620 - Elasticsearch bulk indexer ignores changes that happened before newest additions
Diffstat (limited to 'Bugzilla/Elastic')
-rw-r--r--Bugzilla/Elastic/Indexer.pm83
1 files changed, 38 insertions, 45 deletions
diff --git a/Bugzilla/Elastic/Indexer.pm b/Bugzilla/Elastic/Indexer.pm
index fce8f1053..b691e0687 100644
--- a/Bugzilla/Elastic/Indexer.pm
+++ b/Bugzilla/Elastic/Indexer.pm
@@ -10,16 +10,12 @@ use 5.10.1;
use Moo;
use List::MoreUtils qw(natatime);
use Storable qw(dclone);
+use Scalar::Util qw(looks_like_number);
use namespace::clean;
with 'Bugzilla::Elastic::Role::HasClient';
with 'Bugzilla::Elastic::Role::HasIndexName';
-has 'mtime' => (
- is => 'lazy',
- clearer => 'clear_mtime',
-);
-
has 'shadow_dbh' => ( is => 'lazy' );
has 'debug_sql' => (
@@ -109,34 +105,41 @@ sub _bulk_helper {
);
}
-sub find_largest_mtime {
- my ($self, $class) = @_;
+
+sub _find_largest {
+ my ($self, $class, $field) = @_;
my $result = $self->client->search(
index => $self->index_name,
type => $class->ES_TYPE,
body => {
- aggs => { es_mtime => { extended_stats => { field => 'es_mtime' } } },
+ aggs => { $field => { extended_stats => { field => $field } } },
size => 0
}
);
- return $result->{aggregations}{es_mtime}{max};
+ my $max = $result->{aggregations}{$field}{max};
+ if (not defined $max) {
+ return 0;
+ }
+ elsif (looks_like_number($max)) {
+ return $max;
+ }
+ else {
+ die "largest value for '$field' is not a number: $max";
+ }
}
-sub find_largest_id {
+sub _find_largest_mtime {
my ($self, $class) = @_;
- my $result = $self->client->search(
- index => $self->index_name,
- type => $class->ES_TYPE,
- body => {
- aggs => { $class->ID_FIELD => { extended_stats => { field => $class->ID_FIELD } } },
- size => 0
- }
- );
+ return $self->_find_largest($class, 'es_mtime');
+}
- return $result->{aggregations}{$class->ID_FIELD}{max};
+sub _find_largest_id {
+ my ($self, $class) = @_;
+
+ return $self->_find_largest($class, $class->ID_FIELD);
}
sub put_mapping {
@@ -170,46 +173,36 @@ sub _debug_sql {
sub bulk_load {
my ( $self, $class ) = @_;
- $self->put_mapping($class);
- my $bulk = $self->_bulk_helper($class);
- my $ids = $self->_select_all_ids($class);
- $self->clear_mtime;
- $self->_bulk_load_ids($bulk, $class, $ids) if @$ids;
- undef $ids; # free up some memory
-
- my $updated_ids = $self->_select_updated_ids($class);
- if ($updated_ids) {
- $self->_bulk_load_ids($bulk, $class, $updated_ids) if @$updated_ids;
- }
+ my $bulk = $self->_bulk_helper($class);
+ my $last_mtime = $self->_find_largest_mtime($class);
+ my $last_id = $self->_find_largest_id($class);
+ my $new_ids = $self->_select_all_ids($class, $last_id);
+ my $updated_ids = $self->_select_updated_ids($class, $last_mtime);
+
+ $self->_bulk_load_ids($bulk, $class, $new_ids) if @$new_ids;
+ $self->_bulk_load_ids($bulk, $class, $updated_ids) if @$updated_ids;
}
sub _select_all_ids {
- my ($self, $class) = @_;
+ my ($self, $class, $last_id) = @_;
- my $dbh = Bugzilla->dbh;
- my $last_id = $self->find_largest_id($class);
+ my $dbh = Bugzilla->dbh;
my ($sql, $params) = $self->_debug_sql($class->ES_SELECT_ALL_SQL($last_id));
return $dbh->selectcol_arrayref($sql, undef, @$params);
}
sub _select_updated_ids {
- my ($self, $class) = @_;
+ my ($self, $class, $last_mtime) = @_;
- my $dbh = Bugzilla->dbh;
- my $mtime = $self->find_largest_mtime($class);
- if ($mtime && $mtime != $self->mtime) {
- my ($updated_sql, $updated_params) = $self->_debug_sql($class->ES_SELECT_UPDATED_SQL($mtime));
- return $dbh->selectcol_arrayref($updated_sql, undef, @$updated_params);
- } else {
- return undef;
- }
+ my $dbh = Bugzilla->dbh;
+ my ($updated_sql, $updated_params) = $self->_debug_sql($class->ES_SELECT_UPDATED_SQL($last_mtime));
+ return $dbh->selectcol_arrayref($updated_sql, undef, @$updated_params);
}
sub bulk_load_ids {
my ($self, $class, $ids) = @_;
$self->put_mapping($class);
- $self->clear_mtime;
$self->_bulk_load_ids($self->_bulk_helper($class), $class, $ids);
}
@@ -217,7 +210,7 @@ sub _bulk_load_ids {
my ($self, $bulk, $class, $all_ids) = @_;
my $iter = natatime $class->ES_OBJECTS_AT_ONCE, @$all_ids;
- my $mtime = $self->mtime;
+ my $mtime = $self->_current_mtime;
my $progress_bar;
my $next_update;
@@ -266,7 +259,7 @@ sub _bulk_load_ids {
sub _build_shadow_dbh { Bugzilla->switch_to_shadow_db }
-sub _build_mtime {
+sub _current_mtime {
my ($self) = @_;
my ($mtime) = $self->shadow_dbh->selectrow_array("SELECT UNIX_TIMESTAMP(NOW())");
return $mtime;