Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate auto_id_timestamp in primary-replica resync #33964

Merged
merged 3 commits into from
Sep 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.action.resync;

import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -28,6 +29,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

/**
* Represents a batch of operations sent from the primary to its replicas during the primary-replica resync.
Expand All @@ -36,22 +38,28 @@ public final class ResyncReplicationRequest extends ReplicatedWriteRequest<Resyn

private long trimAboveSeqNo;
private Translog.Operation[] operations;
private long maxSeenAutoIdTimestampOnPrimary;

ResyncReplicationRequest() {
super();
}

public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo,
final Translog.Operation[] operations) {
public ResyncReplicationRequest(final ShardId shardId, final long trimAboveSeqNo, final long maxSeenAutoIdTimestampOnPrimary,
final Translog.Operation[]operations) {
super(shardId);
this.trimAboveSeqNo = trimAboveSeqNo;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
this.operations = operations;
}

public long getTrimAboveSeqNo() {
return trimAboveSeqNo;
}

public long getMaxSeenAutoIdTimestampOnPrimary() {
return maxSeenAutoIdTimestampOnPrimary;
}

public Translog.Operation[] getOperations() {
return operations;
}
Expand All @@ -73,6 +81,11 @@ public void readFrom(final StreamInput in) throws IOException {
} else {
trimAboveSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
} else {
maxSeenAutoIdTimestampOnPrimary = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
}
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}

Expand All @@ -82,6 +95,9 @@ public void writeTo(final StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
out.writeZLong(trimAboveSeqNo);
}
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
}
out.writeArray(Translog.Operation::writeOperation, operations);
}

Expand All @@ -90,13 +106,13 @@ public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ResyncReplicationRequest that = (ResyncReplicationRequest) o;
return trimAboveSeqNo == that.trimAboveSeqNo
return trimAboveSeqNo == that.trimAboveSeqNo && maxSeenAutoIdTimestampOnPrimary == that.maxSeenAutoIdTimestampOnPrimary
&& Arrays.equals(operations, that.operations);
}

@Override
public int hashCode() {
return Long.hashCode(trimAboveSeqNo) + 31 * Arrays.hashCode(operations);
return Objects.hash(trimAboveSeqNo, maxSeenAutoIdTimestampOnPrimary, operations);
}

@Override
Expand All @@ -106,6 +122,7 @@ public String toString() {
", timeout=" + timeout +
", index='" + index + '\'' +
", trimAboveSeqNo=" + trimAboveSeqNo +
", maxSeenAutoIdTimestampOnPrimary=" + maxSeenAutoIdTimestampOnPrimary +
", ops=" + operations.length +
"}";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ protected WriteReplicaResult shardOperationOnReplica(ResyncReplicationRequest re

public static Translog.Location performOnReplica(ResyncReplicationRequest request, IndexShard replica) throws Exception {
Translog.Location location = null;
/*
* Operations received from resync do not have auto_id_timestamp individually, we need to bootstrap this max_seen_timestamp
* (at least the highest timestamp from any of these operations) to make sure that we will disable optimization for the same
* append-only requests with timestamp (sources of these operations) that are replicated; otherwise we may have duplicates.
*/
replica.updateMaxUnsafeAutoIdTimestamp(request.getMaxSeenAutoIdTimestampOnPrimary());
for (Translog.Operation operation : request.getOperations()) {
final Engine.Result operationResult = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
if (operationResult.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ public void onFailure(final Exception e) {
}
}
};

// We must capture the timestamp after snapshotting a snapshot of operations to make sure
// that the auto_id_timestamp of every operation in the snapshot is at most this value.
final long maxSeenAutoIdTimestamp = indexShard.getMaxSeenAutoIdTimestamp();
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPendingPrimaryTerm(), wrappedSnapshot,
startingSeqNo, maxSeqNo, resyncListener);
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, resyncListener);
} catch (Exception e) {
try {
IOUtils.close(snapshot);
Expand All @@ -150,7 +152,7 @@ public void onFailure(final Exception e) {
}

private void resync(final ShardId shardId, final String primaryAllocationId, final long primaryTerm, final Translog.Snapshot snapshot,
long startingSeqNo, long maxSeqNo, ActionListener<ResyncTask> listener) {
long startingSeqNo, long maxSeqNo, long maxSeenAutoIdTimestamp, ActionListener<ResyncTask> listener) {
ResyncRequest request = new ResyncRequest(shardId, primaryAllocationId);
ResyncTask resyncTask = (ResyncTask) taskManager.register("transport", "resync", request); // it's not transport :-)
ActionListener<Void> wrappedListener = new ActionListener<Void>() {
Expand All @@ -170,7 +172,7 @@ public void onFailure(Exception e) {
};
try {
new SnapshotSender(logger, syncAction, resyncTask, shardId, primaryAllocationId, primaryTerm, snapshot, chunkSize.bytesAsInt(),
startingSeqNo, maxSeqNo, wrappedListener).run();
startingSeqNo, maxSeqNo, maxSeenAutoIdTimestamp, wrappedListener).run();
} catch (Exception e) {
wrappedListener.onFailure(e);
}
Expand All @@ -191,6 +193,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private final Translog.Snapshot snapshot;
private final long startingSeqNo;
private final long maxSeqNo;
private final long maxSeenAutoIdTimestamp;
private final int chunkSizeInBytes;
private final ActionListener<Void> listener;
private final AtomicBoolean firstMessage = new AtomicBoolean(true);
Expand All @@ -199,7 +202,8 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
private AtomicBoolean closed = new AtomicBoolean();

SnapshotSender(Logger logger, SyncAction syncAction, ResyncTask task, ShardId shardId, String primaryAllocationId, long primaryTerm,
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo, ActionListener<Void> listener) {
Translog.Snapshot snapshot, int chunkSizeInBytes, long startingSeqNo, long maxSeqNo,
long maxSeenAutoIdTimestamp, ActionListener<Void> listener) {
this.logger = logger;
this.syncAction = syncAction;
this.task = task;
Expand All @@ -210,6 +214,7 @@ static class SnapshotSender extends AbstractRunnable implements ActionListener<R
this.chunkSizeInBytes = chunkSizeInBytes;
this.startingSeqNo = startingSeqNo;
this.maxSeqNo = maxSeqNo;
this.maxSeenAutoIdTimestamp = maxSeenAutoIdTimestamp;
this.listener = listener;
task.setTotalOperations(snapshot.totalOperations());
}
Expand Down Expand Up @@ -260,7 +265,7 @@ protected void doRun() throws Exception {
if (!operations.isEmpty() || trimmedAboveSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
task.setPhase("sending_ops");
ResyncReplicationRequest request =
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, operations.toArray(EMPTY_ARRAY));
new ResyncReplicationRequest(shardId, trimmedAboveSeqNo, maxSeenAutoIdTimestamp, operations.toArray(EMPTY_ARRAY));
logger.trace("{} sending batch of [{}][{}] (total sent: [{}], skipped: [{}])", shardId, operations.size(),
new ByteSizeValue(size), totalSentOps.get(), totalSkippedOps.get());
firstMessage.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testSerialization() throws IOException {
final Translog.Index index = new Translog.Index("type", "id", 0, randomNonNegativeLong(),
randomNonNegativeLong(), bytes, null, -1);
final ShardId shardId = new ShardId(new Index("index", "uuid"), 0);
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, new Translog.Operation[]{index});
final ResyncReplicationRequest before = new ResyncReplicationRequest(shardId, 42L, 100, new Translog.Operation[]{index});

final BytesStreamOutput out = new BytesStreamOutput();
before.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -633,6 +634,49 @@ public long indexTranslogOperations(final List<Translog.Operation> operations, f
}
}

public void testTransferMaxSeenAutoIdTimestampOnResync() throws Exception {
try (ReplicationGroup shards = createGroup(2)) {
shards.startAll();
IndexShard primary = shards.getPrimary();
IndexShard replica1 = shards.getReplicas().get(0);
IndexShard replica2 = shards.getReplicas().get(1);
long maxTimestampOnReplica1 = -1;
long maxTimestampOnReplica2 = -1;
List<IndexRequest> replicationRequests = new ArrayList<>();
for (int numDocs = between(1, 10), i = 0; i < numDocs; i++) {
final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON);
indexRequest.process(Version.CURRENT, null, index.getName());
final IndexRequest copyRequest;
if (randomBoolean()) {
copyRequest = copyIndexRequest(indexRequest);
indexRequest.onRetry();
} else {
copyRequest = copyIndexRequest(indexRequest);
copyRequest.onRetry();
}
replicationRequests.add(copyRequest);
final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, primary);
if (randomBoolean()) {
indexOnReplica(bulkShardRequest, shards, replica1);
maxTimestampOnReplica1 = Math.max(maxTimestampOnReplica1, indexRequest.getAutoGeneratedTimestamp());
} else {
indexOnReplica(bulkShardRequest, shards, replica2);
maxTimestampOnReplica2 = Math.max(maxTimestampOnReplica2, indexRequest.getAutoGeneratedTimestamp());
}
}
assertThat(replica1.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica2));
shards.promoteReplicaToPrimary(replica1).get();
assertThat(replica2.getMaxSeenAutoIdTimestamp(), equalTo(maxTimestampOnReplica1));
for (IndexRequest request : replicationRequests) {
shards.index(request); // deliver via normal replication
}
for (IndexShard shard : shards) {
assertThat(shard.getMaxSeenAutoIdTimestamp(), equalTo(Math.max(maxTimestampOnReplica1, maxTimestampOnReplica2)));
}
}
}

public static class BlockingTarget extends RecoveryTarget {

private final CountDownLatch recoveryBlocked;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
// Index doc but not advance local checkpoint.
shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL,
SourceToParse.source(shard.shardId().getIndexName(), "_doc", Integer.toString(i), new BytesArray("{}"), XContentType.JSON),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
randomBoolean() ? IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : randomNonNegativeLong(), true);
}

long globalCheckPoint = numDocs > 0 ? randomIntBetween(0, numDocs - 1) : 0;
Expand Down Expand Up @@ -105,6 +105,8 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception {
.findFirst()
.isPresent(),
is(false));

assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp()));
}
if (syncNeeded && globalCheckPoint < numDocs - 1) {
if (shard.indexSettings.isSoftDeleteEnabled()) {
Expand Down