From eca2e93e7893754b313674614fb5c76b4a43f87c Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 23 Oct 2019 19:37:05 +0200 Subject: [PATCH 1/2] take the max if global checkpoints mismatch --- .../checkpoint/DefaultCheckpointProvider.java | 11 +++++----- .../TransformsCheckpointServiceTests.java | 20 ++++++++++--------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index e624f2e62706b..5904e039530d4 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -20,11 +20,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.xpack.core.ClientHelper; -import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; +import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; @@ -189,10 +189,11 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set checkpoints = checkpointsByIndex.get(indexName); if (checkpoints.containsKey(shard.getShardRouting().getId())) { - // there is already a checkpoint entry for this index/shard combination, check if they match - if (checkpoints.get(shard.getShardRouting().getId()) != globalCheckpoint) { - throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id [" - + shard.getShardRouting().getId() + "]"); + // there is already a checkpoint entry for this index/shard combination + // it's possible that replica shards report a different/higher global checkpoints + // This is by design and not a problem, take the max() for this case + if (checkpoints.get(shard.getShardRouting().getId()) < globalCheckpoint) { + checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint); } } else { // 1st time we see this shard for this index, add the entry for the shard diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java index d343dd5e0066e..d62a242d8fe16 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java @@ -43,8 +43,6 @@ import java.util.Map.Entry; import java.util.Set; -import static org.hamcrest.Matchers.containsString; - public class TransformsCheckpointServiceTests extends ESTestCase { public void testExtractIndexCheckpoints() { @@ -104,11 +102,15 @@ public void testExtractIndexCheckpointsInconsistentGlobalCheckpoints() { ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false); - // fail - CheckpointException e = expectThrows(CheckpointException.class, - () -> DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices)); + Map checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices); + + assertEquals(expectedCheckpoints.size(), checkpoints.size()); + assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet()); - assertThat(e.getMessage(), containsString("Global checkpoints mismatch")); + // global checkpoints should be max() of all global checkpoints + for (Entry entry : expectedCheckpoints.entrySet()) { + assertArrayEquals(entry.getValue(), checkpoints.get(entry.getKey())); + } } /** @@ -176,8 +178,8 @@ private static ShardStats[] createRandomShardStats(Map expectedC } // SeqNoStats asserts that checkpoints are logical - long localCheckpoint = randomLongBetween(0L, 100000000L); - long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L); + long localCheckpoint = randomLongBetween(100L, 100000000L); + long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(100L, 100000000L); long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint); SeqNoStats validSeqNoStats = null; @@ -221,7 +223,7 @@ private static ShardStats[] createRandomShardStats(Map expectedC if (inconsistentReplica == replica) { // overwrite SeqNoStats invalidSeqNoStats = - new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint + randomLongBetween(10L, 100L)); + new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint - randomLongBetween(10L, 100L)); shardStats.add( new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, invalidSeqNoStats, null)); From 1a2cb532360bbbd2a5c145f4438e0cafd0511015 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 24 Oct 2019 08:18:17 +0200 Subject: [PATCH 2/2] simplify --- .../checkpoint/DefaultCheckpointProvider.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java index 5904e039530d4..87614e8a33337 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java @@ -188,15 +188,12 @@ static Map extractIndexCheckPoints(ShardStats[] shards, Set checkpoints = checkpointsByIndex.get(indexName); - if (checkpoints.containsKey(shard.getShardRouting().getId())) { - // there is already a checkpoint entry for this index/shard combination - // it's possible that replica shards report a different/higher global checkpoints - // This is by design and not a problem, take the max() for this case - if (checkpoints.get(shard.getShardRouting().getId()) < globalCheckpoint) { - checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint); - } - } else { - // 1st time we see this shard for this index, add the entry for the shard + // 1st time we see this shard for this index, add the entry for the shard + // or there is already a checkpoint entry for this index/shard combination + // but with a higher global checkpoint. This is by design(not a problem) and + // we take the higher value + if (checkpoints.containsKey(shard.getShardRouting().getId()) == false + || checkpoints.get(shard.getShardRouting().getId()) < globalCheckpoint) { checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint); } } else {