Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate max_auto_id_timestamp in peer recovery #33693

Merged
merged 11 commits into from
Sep 20, 2018
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -1726,6 +1727,21 @@ public boolean isRecovering() {
*/
public abstract void maybePruneDeletes();

/**
* Returns the maximum auto_id_timestamp of all append-only index requests have been processed by this engine
* or the auto_id_timestamp received from its primary shard via {@link #updateMaxUnsafeAutoIdTimestamp(long)}.
* Notes this method returns the auto_id_timestamp of all append-only requests, not max_unsafe_auto_id_timestamp.
*/
public long getMaxSeenAutoIdTimestamp() {
return IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}

/**
* Forces this engine to advance its max_unsafe_auto_id_timestamp marker to at least the given timestamp.
* The engine will disable optimization for all append-only whose timestamp at most {@code newTimestamp}.
*/
public abstract void updateMaxUnsafeAutoIdTimestamp(long newTimestamp);

@FunctionalInterface
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class InternalEngine extends Engine {
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
Expand All @@ -166,7 +167,7 @@ public InternalEngine(EngineConfig engineConfig) {
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
Expand Down Expand Up @@ -374,7 +375,7 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
assert maxUnsafeAutoIdTimestamp.get() == -1 :
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue()));
updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true);
}
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
Expand Down Expand Up @@ -1014,11 +1015,12 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
final boolean mayHaveBeenIndexBefore;
if (index.isRetry()) {
mayHaveBeenIndexBefore = true;
maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(index.getAutoGeneratedIdTimestamp(), curr));
updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true);
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
} else {
// in this case we force
mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false);
}
return mayHaveBeenIndexBefore;
}
Expand Down Expand Up @@ -2292,7 +2294,7 @@ public void onSettingsChanged() {
// this is an anti-viral settings you can only opt out for the entire index
// only if a shard starts up again due to relocation or if the index is closed
// the setting will be re-interpreted if it's set to true
this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
updateAutoIdTimestamp(Long.MAX_VALUE, true);
}
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final IndexSettings indexSettings = engineConfig.getIndexSettings();
Expand Down Expand Up @@ -2531,4 +2533,26 @@ void updateRefreshedCheckpoint(long checkpoint) {
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint;
}
}

@Override
public final long getMaxSeenAutoIdTimestamp() {
return maxSeenAutoIdTimestamp.get();
}

@Override
public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
updateAutoIdTimestamp(newTimestamp, true);
}

private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]";
maxSeenAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp));
assert maxSeenAutoIdTimestamp.get() >= newTimestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this add much value, we just did a max operation with it.

if (unsafe) {
maxUnsafeAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp));
assert maxUnsafeAutoIdTimestamp.get() >= newTimestamp;
}
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -373,4 +373,9 @@ public void maybePruneDeletes() {
public DocsStats docStats() {
return docsStats;
}

@Override
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {

}
}
23 changes: 23 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,29 @@ public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo);
}

/**
* Returns the maximum auto_id_timestamp of all append-only requests have been processed by this shard or the auto_id_timestamp received
* from the primary via {@link #updateMaxUnsafeAutoIdTimestamp(long)} at the beginning of a peer-recovery or a primary-replica resync.
*
* @see #updateMaxUnsafeAutoIdTimestamp(long)
*/
public long getMaxSeenAutoIdTimestamp() {
return getEngine().getMaxSeenAutoIdTimestamp();
}

/**
* Since operations stored in soft-deletes do not have max_auto_id_timestamp, the primary has to propagate its max_auto_id_timestamp
* (via {@link #getMaxSeenAutoIdTimestamp()} of all processed append-only requests to replicas at the beginning of a peer-recovery
* or a primary-replica resync to force a replica to disable optimization for all append-only requests which are replicated via
* replication while its retry variants are replicated via recovery without auto_id_timestamp.
* <p>
* Without this force-update, a replica can generate duplicate documents (for the same id) if it first receives
* a retry append-only (without timestamp) via recovery, then an original append-only (with timestamp) via replication.
*/
public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimary) {
getEngine().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampFromPrimary);
}

public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine.Operation.Origin origin) throws IOException {
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ public void messageReceived(final RecoveryTranslogOperationsRequest request, fin
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryTarget recoveryTarget = recoveryRef.target();
try {
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps());
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary());
channel.sendResponse(new RecoveryTranslogOperationsResponse(recoveryTarget.indexShard().getLocalCheckpoint()));
} catch (MapperException exception) {
// in very rare cases a translog replay from primary is processed before a mapping update on this node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ public RecoveryResponse recoverToTarget() throws IOException {
}
final long targetLocalCheckpoint;
try (Translog.Snapshot snapshot = shard.getHistoryOperations("peer-recovery", startingSeqNo)) {
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
// We have to capture the max auto_id_timestamp after taking a snapshot of operations to guarantee
// that the auto_id_timestamp of every operation in the snapshot is at most this timestamp value.
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
targetLocalCheckpoint = phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
Expand Down Expand Up @@ -447,9 +450,11 @@ void prepareTargetForTranslog(final boolean fileBasedRecovery, final int totalTr
* @param requiredSeqNoRangeStart the lower sequence number of the required range (ending with endingSeqNo)
* @param endingSeqNo the highest sequence number that should be sent
* @param snapshot a snapshot of the translog
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @return the local checkpoint on the target
*/
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot)
long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, final Translog.Snapshot snapshot,
final long maxSeenAutoIdTimestamp)
throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
Expand All @@ -462,7 +467,8 @@ long phase2(final long startingSeqNo, long requiredSeqNoRangeStart, long endingS
"required [" + requiredSeqNoRangeStart + ":" + endingSeqNo + "]");

// send all the snapshot's translog operations to the target
final SendSnapshotResult result = sendSnapshot(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot);
final SendSnapshotResult result = sendSnapshot(
startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, snapshot, maxSeenAutoIdTimestamp);

stopWatch.stop();
logger.trace("recovery [phase2]: took [{}]", stopWatch.totalTime());
Expand Down Expand Up @@ -530,10 +536,11 @@ static class SendSnapshotResult {
* @param endingSeqNo the upper bound of the sequence number range to be sent (inclusive)
* @param snapshot the translog snapshot to replay operations from @return the local checkpoint on the target and the
* total number of operations sent
* @param maxSeenAutoIdTimestamp the max auto_id_timestamp of append-only requests on the primary
* @throws IOException if an I/O exception occurred reading the translog snapshot
*/
protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
final Translog.Snapshot snapshot) throws IOException {
final Translog.Snapshot snapshot, final long maxSeenAutoIdTimestamp) throws IOException {
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
assert startingSeqNo <= requiredSeqNoRangeStart :
Expand All @@ -551,8 +558,8 @@ protected SendSnapshotResult sendSnapshot(final long startingSeqNo, long require
logger.trace("no translog operations to send");
}

final CancellableThreads.IOInterruptable sendBatch =
() -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps));
final CancellableThreads.IOInterruptable sendBatch = () ->
targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, maxSeenAutoIdTimestamp));

// send operations in batches
Translog.Operation operation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,15 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary) throws IOException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
if (indexShard().state() != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, indexShard().state());
}
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment as to why we set this and how don't know what timestamp is associated with the operation so we use an upper bound?

for (Translog.Operation operation : operations) {
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ public interface RecoveryTargetHandler {

/**
* Index a set of translog operations on the target
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
*
* @param operations operations to index
* @param totalTranslogOps current number of total operations expected to be indexed
* @param maxSeenAutoIdTimestampOnPrimary the maximum auto_id_timestamp of all append-only requests processed by the primary shard
* @return the local checkpoint on the target shard
*/
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws IOException;
long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps,
long maxSeenAutoIdTimestampOnPrimary) throws IOException;

/**
* Notifies the target of the files it is going to receive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -34,15 +36,18 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
private ShardId shardId;
private List<Translog.Operation> operations;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private long maxSeenAutoIdTimestampOnPrimary;

public RecoveryTranslogOperationsRequest() {
}

RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps) {
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations,
int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.operations = operations;
this.totalTranslogOps = totalTranslogOps;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
}

public long recoveryId() {
Expand All @@ -61,13 +66,22 @@ public int totalTranslogOps() {
return totalTranslogOps;
}

public long maxSeenAutoIdTimestampOnPrimary() {
return maxSeenAutoIdTimestampOnPrimary;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
operations = Translog.readOperations(in, "recovery");
totalTranslogOps = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
} else {
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
}

@Override
Expand All @@ -77,5 +91,8 @@ public void writeTo(StreamOutput out) throws IOException {
shardId.writeTo(out);
Translog.writeOperations(out, operations);
out.writeVInt(totalTranslogOps);
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
}

@Override
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) {
public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary) {
final RecoveryTranslogOperationsRequest translogOperationsRequest =
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps);
new RecoveryTranslogOperationsRequest(recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary);
final TransportFuture<RecoveryTranslogOperationsResponse> future = transportService.submitRequest(
targetNode,
PeerRecoveryTargetService.Actions.TRANSLOG_OPS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3536,6 +3536,8 @@ public void run() {
}
assertEquals(0, engine.getNumVersionLookups());
assertEquals(0, engine.getNumIndexVersionsLookups());
assertThat(engine.getMaxSeenAutoIdTimestamp(),
equalTo(docs.stream().mapToLong(Engine.Index::getAutoGeneratedIdTimestamp).max().getAsLong()));
assertLuceneOperations(engine, numDocs, 0, 0);
}

Expand Down
Loading