diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index f50c9c0a51a9..56373f25b527 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -45,6 +45,7 @@ import org.apache.lucene.util.Accountables; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.FieldMemoryStats; import org.elasticsearch.common.Nullable; @@ -1761,6 +1762,21 @@ public boolean isRecovering() { */ public abstract void maybePruneDeletes(); + /** + * Returns the maximum auto_id_timestamp of all append-only index requests have been processed by this engine + * or the auto_id_timestamp received from its primary shard via {@link #updateMaxUnsafeAutoIdTimestamp(long)}. + * Notes this method returns the auto_id_timestamp of all append-only requests, not max_unsafe_auto_id_timestamp. + */ + public long getMaxSeenAutoIdTimestamp() { + return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + } + + /** + * Forces this engine to advance its max_unsafe_auto_id_timestamp marker to at least the given timestamp. + * The engine will disable optimization for all append-only whose timestamp at most {@code newTimestamp}. + */ + public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp); + @FunctionalInterface public interface TranslogRecoveryRunner { int run(Engine engine, Translog.Snapshot snapshot) throws IOException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 64dae5dc67de..1043e514fd73 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -140,6 +140,7 @@ public class InternalEngine extends Engine { private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); + private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); @@ -166,7 +167,7 @@ public InternalEngine(EngineConfig engineConfig) { final BiFunction localCheckpointTrackerSupplier) { super(engineConfig); if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { - maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); + updateAutoIdTimestamp(Long.MAX_VALUE, true); } final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), @@ -369,7 +370,7 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { assert maxUnsafeAutoIdTimestamp.get() == -1 : "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; - maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue())); + updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true); } if (key.equals(SequenceNumbers.MAX_SEQ_NO)) { assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 : @@ -1009,11 +1010,12 @@ private boolean mayHaveBeenIndexedBefore(Index index) { final boolean mayHaveBeenIndexBefore; if (index.isRetry()) { mayHaveBeenIndexBefore = true; - maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(index.getAutoGeneratedIdTimestamp(), curr)); + updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true); assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); } else { // in this case we force mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); + updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false); } return mayHaveBeenIndexBefore; } @@ -2287,7 +2289,7 @@ public void onSettingsChanged() { // this is an anti-viral settings you can only opt out for the entire index // only if a shard starts up again due to relocation or if the index is closed // the setting will be re-interpreted if it's set to true - this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); + updateAutoIdTimestamp(Long.MAX_VALUE, true); } final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy(); final IndexSettings indexSettings = engineConfig.getIndexSettings(); @@ -2526,4 +2528,24 @@ void updateRefreshedCheckpoint(long checkpoint) { assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint; } } + + @Override + public final long getMaxSeenAutoIdTimestamp() { + return maxSeenAutoIdTimestamp.get(); + } + + @Override + public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { + updateAutoIdTimestamp(newTimestamp, true); + } + + private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) { + assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; + maxSeenAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); + if (unsafe) { + maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); + } + assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); + } + } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 9f7ceb614742..c36c2fc9c29e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -374,4 +374,9 @@ public void maybePruneDeletes() { public DocsStats docStats() { return docsStats; } + + @Override + public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { + + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8f97bc07c139..fc5248a5c33d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1219,6 +1219,29 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) { getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo); } + /** + * Returns the maximum auto_id_timestamp of all append-only requests have been processed by this shard or the auto_id_timestamp received + * from the primary via {@link #updateMaxUnsafeAutoIdTimestamp(long)} at the beginning of a peer-recovery or a primary-replica resync. + * + * @see #updateMaxUnsafeAutoIdTimestamp(long) + */ + public long getMaxSeenAutoIdTimestamp() { + return getEngine().getMaxSeenAutoIdTimestamp(); + } + + /** + * Since operations stored in soft-deletes do not have max_auto_id_timestamp, the primary has to propagate its max_auto_id_timestamp + * (via {@link #getMaxSeenAutoIdTimestamp()} of all processed append-only requests to replicas at the beginning of a peer-recovery + * or a primary-replica resync to force a replica to disable optimization for all append-only requests which are replicated via + * replication while its retry variants are replicated via recovery without auto_id_timestamp. + *

+ * Without this force-update, a replica can generate duplicate documents (for the same id) if it first receives + * a retry append-only (without timestamp) via recovery, then an original append-only (with timestamp) via replication. + */ + public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimary) { + getEngine().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampFromPrimary); + } + public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException { // If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type. final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index aaa4697e5cbb..ba88e30727d6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -455,7 +455,8 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); final RecoveryTarget recoveryTarget = recoveryRef.target(); try { - recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps()); + recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(), + request.maxSeenAutoIdTimestampOnPrimary()); channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint())); } catch (MapperException exception) { // in very rare cases a translog replay from primary is processed before a mapping update on this node diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 220abf43124a..20e6d8578732 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -215,7 +215,10 @@ public RecoveryResponse recoverToTarget() throws IOException { } final long targetLocalCheckpoint; try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) { - targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); + // We have to capture the max auto_id_timestamp after taking a snapshot of operations to guarantee + // that the auto_id_timestamp of every operation in the snapshot is at most this timestamp value. + final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp(); + targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp); } catch (Exception e) { throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e); } @@ -447,9 +450,11 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr * @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo) * @param endingSeqNo the highest sequence number that should be sent * @param snapshot a snapshot of the translog + * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary * @return the local checkpoint on the target */ - long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot) + long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot, + final long maxSeenAutoIdTimestamp) throws IOException { if (shard.state() == IndexShardState.CLOSED) { throw new IndexShardClosedException(request.shardId()); @@ -462,7 +467,8 @@ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingS "required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]"); // send all the snapshot's translog operations to the target - final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot); + final SendSnapshotResult result = sendSnapshot( + startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp); stopWatch.stop(); logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime()); @@ -530,10 +536,11 @@ static class SendSnapshotResult { * @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive) * @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the * total number of operations sent + * @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary * @throws IOException if an I/O exception occurred reading the translog snapshot */ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, - final Translog.Snapshot snapshot) throws IOException { + final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp) throws IOException { assert requiredSeqNoRangeStart <= endingSeqNo + 1: "requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo; assert startingSeqNo <= requiredSeqNoRangeStart : @@ -551,8 +558,8 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require logger.trace("no translog operations to send"); } - final CancellableThreads.IOInterruptable sendBatch = - () -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps)); + final CancellableThreads.IOInterruptable sendBatch = () -> + targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, maxSeenAutoIdTimestamp)); // send operations in batches Translog.Operation operation; diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index e28b01c8a618..e2f21fe8edd2 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -386,13 +386,21 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary) throws IOException { final RecoveryState.Translog translog = state().getTranslog(); translog.totalOperations(totalTranslogOps); assert indexShard().recoveryState() == state(); if (indexShard().state() != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, indexShard().state()); } + /* + * The maxSeenAutoIdTimestampOnPrimary received from the primary is at least the highest auto_id_timestamp from any operation + * will be replayed. Bootstrapping this timestamp here will disable the optimization for original append-only requests + * (source of these operations) replicated via replication. Without this step, we may have duplicate documents if we + * replay these operations first (without timestamp), then optimize append-only requests (with timestamp). + */ + indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); for (Translog.Operation operation : operations) { Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY); if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 4e728a72b300..53220c586094 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -59,12 +59,13 @@ public interface RecoveryTargetHandler { /** * Index a set of translog operations on the target - * @param operations operations to index - * @param totalTranslogOps current number of total operations expected to be indexed - * + * @param operations operations to index + * @param totalTranslogOps current number of total operations expected to be indexed + * @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard * @return the local checkpoint on the target shard */ - long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException; + long indexTranslogOperations(List operations, int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary) throws IOException; /** * Notifies the target of the files it is going to receive diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java index be399e0f81fd..3adb5695e029 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -34,15 +36,18 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest { private ShardId shardId; private List operations; private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; + private long maxSeenAutoIdTimestampOnPrimary; public RecoveryTranslogOperationsRequest() { } - RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, int totalTranslogOps) { + RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List operations, + int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) { this.recoveryId = recoveryId; this.shardId = shardId; this.operations = operations; this.totalTranslogOps = totalTranslogOps; + this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary; } public long recoveryId() { @@ -61,6 +66,10 @@ public int totalTranslogOps() { return totalTranslogOps; } + public long maxSeenAutoIdTimestampOnPrimary() { + return maxSeenAutoIdTimestampOnPrimary; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -68,6 +77,11 @@ public void readFrom(StreamInput in) throws IOException { shardId = ShardId.readShardId(in); operations = Translog.readOperations(in, "recovery"); totalTranslogOps = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + maxSeenAutoIdTimestampOnPrimary = in.readZLong(); + } else { + maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + } } @Override @@ -77,5 +91,8 @@ public void writeTo(StreamOutput out) throws IOException { shardId.writeTo(out); Translog.writeOperations(out, operations); out.writeVInt(totalTranslogOps); + if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) { + out.writeZLong(maxSeenAutoIdTimestampOnPrimary); + } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index edf17595350c..3a7f28e8eb7e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -110,9 +110,9 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) { + public long indexTranslogOperations(List operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) { final RecoveryTranslogOperationsRequest translogOperationsRequest = - new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps); + new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary); final TransportFuture future = transportService.submitRequest( targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2faced930a36..b2244163530b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3537,6 +3537,8 @@ public void run() { } assertEquals(0, engine.getNumVersionLookups()); assertEquals(0, engine.getNumIndexVersionsLookups()); + assertThat(engine.getMaxSeenAutoIdTimestamp(), + equalTo(docs.stream().mapToLong(Engine.Index::getAutoGeneratedIdTimestamp).max().getAsLong())); assertLuceneOperations(engine, numDocs, 0, 0); } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index f2cdfbf8fc56..2f38ef709d1c 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; @@ -141,10 +142,80 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa } } + public void testRetryAppendOnlyAfterRecovering() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startAll(); + final IndexRequest originalRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); + originalRequest.process(Version.CURRENT, null, index.getName()); + final IndexRequest retryRequest = copyIndexRequest(originalRequest); + retryRequest.onRetry(); + shards.index(retryRequest); + IndexShard replica = shards.addReplica(); + shards.recoverReplica(replica); + shards.assertAllEqual(1); + shards.index(originalRequest); // original append-only arrives after recovery completed + shards.assertAllEqual(1); + assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(originalRequest.getAutoGeneratedTimestamp())); + } + } + + public void testAppendOnlyRecoveryThenReplication() throws Exception { + CountDownLatch indexedOnPrimary = new CountDownLatch(1); + CountDownLatch recoveryDone = new CountDownLatch(1); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(1)) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return config -> new InternalEngine(config) { + @Override + public IndexResult index(Index op) throws IOException { + IndexResult result = super.index(op); + if (op.origin() == Operation.Origin.PRIMARY) { + indexedOnPrimary.countDown(); + // prevent the indexing on the primary from returning (it was added to Lucene and translog already) + // to make sure that this operation is replicated to the replica via recovery, then via replication. + try { + recoveryDone.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + } + return result; + } + }; + } + }) { + shards.startAll(); + Thread thread = new Thread(() -> { + IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); + try { + shards.index(indexRequest); + } catch (Exception e) { + throw new AssertionError(e); + } + }); + thread.start(); + IndexShard replica = shards.addReplica(); + Future fut = shards.asyncRecoverReplica(replica, + (shard, node) -> new RecoveryTarget(shard, node, recoveryListener, v -> {}){ + @Override + public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps) throws IOException { + try { + indexedOnPrimary.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + super.prepareForTranslogOperations(fileBasedRecovery, totalTranslogOps); + } + }); + fut.get(); + recoveryDone.countDown(); + thread.join(); + shards.assertAllEqual(1); + } + } + public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception { - //TODO: Enables this test with soft-deletes once we have timestamp - Settings settings = Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build(); - try (ReplicationGroup shards = createGroup(0, settings)) { + try (ReplicationGroup shards = createGroup(0)) { shards.startAll(); final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); indexRequest.onRetry(); // force an update of the timestamp diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index a73d7385d9d4..3883554acc04 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -488,9 +488,10 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { return new RecoveryTarget(indexShard, node, recoveryListener, l -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxSeenAutoIdTimestampOnPrimary) throws IOException { opsSent.set(true); - return super.indexTranslogOperations(operations, totalTranslogOps); + return super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary); } }; }); @@ -557,7 +558,8 @@ protected EngineFactory getEngineFactory(final ShardRouting routing) { replica, (indexShard, node) -> new RecoveryTarget(indexShard, node, recoveryListener, l -> {}) { @Override - public long indexTranslogOperations(final List operations, final int totalTranslogOps) + public long indexTranslogOperations(final List operations, final int totalTranslogOps, + final long maxAutoIdTimestamp) throws IOException { // index a doc which is not part of the snapshot, but also does not complete on replica replicaEngineFactory.latchIndexers(1); @@ -585,7 +587,7 @@ public long indexTranslogOperations(final List operations, f } catch (InterruptedException e) { throw new AssertionError(e); } - return super.indexTranslogOperations(operations, totalTranslogOps); + return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); } }); pendingDocActiveWithExtraDocIndexed.await(); @@ -671,11 +673,12 @@ private void blockIfNeeded(RecoveryState.Stage currentStage) { } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { if (hasBlocked() == false) { blockIfNeeded(RecoveryState.Stage.TRANSLOG); } - return super.indexTranslogOperations(operations, totalTranslogOps); + return super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 7fd47c9994cc..96557a00746f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2191,8 +2191,9 @@ public void testTranslogRecoverySyncsTranslog() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxSeenAutoIdTimestamp) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp); assertFalse(replica.isSyncNeeded()); return localCheckpoint; } @@ -2298,8 +2299,9 @@ public void testShardActiveDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); // Shard should now be active since we did recover: assertTrue(replica.isActive()); return localCheckpoint; @@ -2345,8 +2347,9 @@ public void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTra } @Override - public long indexTranslogOperations(List operations, int totalTranslogOps) throws IOException { - final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps); + public long indexTranslogOperations(List operations, int totalTranslogOps, + long maxAutoIdTimestamp) throws IOException { + final long localCheckpoint = super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp); assertListenerCalled.accept(replica); return localCheckpoint; } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 0f7a72aacf3f..9b17962f91b1 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -207,11 +207,11 @@ public int totalOperations() { public Translog.Operation next() throws IOException { return operations.get(counter++); } - }); + }, randomNonNegativeLong()); final int expectedOps = (int) (endingSeqNo - startingSeqNo + 1); assertThat(result.totalOperations, equalTo(expectedOps)); final ArgumentCaptor shippedOpsCaptor = ArgumentCaptor.forClass(List.class); - verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture()); + verify(recoveryTarget).indexTranslogOperations(shippedOpsCaptor.capture(), ArgumentCaptor.forClass(Integer.class).capture(), ArgumentCaptor.forClass(Long.class).capture()); List shippedOps = new ArrayList<>(); for (List list: shippedOpsCaptor.getAllValues()) { shippedOps.addAll(list); @@ -249,7 +249,7 @@ public Translog.Operation next() throws IOException { } while (op != null && opsToSkip.contains(op)); return op; } - })); + }, randomNonNegativeLong())); } } @@ -420,7 +420,8 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr } @Override - long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot) throws IOException { + long phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, + long maxSeenAutoIdTimestamp) { phase2Called.set(true); return SequenceNumbers.UNASSIGNED_SEQ_NO; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 5f0909db0d3f..f590b99b4817 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -55,6 +55,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; @@ -137,6 +139,17 @@ protected IndexMetaData buildIndexMetaData(int replicas, Settings indexSettings, return metaData.build(); } + protected IndexRequest copyIndexRequest(IndexRequest inRequest) throws IOException { + final IndexRequest outRequest = new IndexRequest(); + try (BytesStreamOutput out = new BytesStreamOutput()) { + inRequest.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + outRequest.readFrom(in); + } + } + return outRequest; + } + protected DiscoveryNode getDiscoveryNode(String id) { return new DiscoveryNode(id, id, buildNewFakeTransportAddress(), Collections.emptyMap(), Collections.singleton(DiscoveryNode.Role.DATA), Version.CURRENT); @@ -428,6 +441,12 @@ public synchronized List shardRoutings() { public synchronized void close() throws Exception { if (closed == false) { closed = true; + for (IndexShard replica : replicas) { + try { + assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp())); + } catch (AlreadyClosedException ignored) { + } + } closeShards(this); } else { throw new AlreadyClosedException("too bad");