-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Propagate max_auto_id_timestamp in peer recovery #33693
Conversation
Today we don't store the auto-generated timestamp of append-only operations in Lucene; and assign -1 to every index operations constructed from LuceneChangesSnapshot. This looks innocent but it generates duplicate documents on a replica if a retry append-only arrives first via peer-recovery; then an original append-only arrives via replication. Since the retry append-only (delivered via recovery) does not have timestamp, the replica will happily optimizes the original request while it should not. This change transmits the max auto-generated timestamp from the primary to replicas before translog phase in peer recovery. This timestamp will prevent replicas from optimizing append-only requests if retry counterparts have been processed.
Pinging @elastic/es-distributed |
We also need to transmit timestamp to replicas in the primary-replica resync. I will make it in a separate PR. |
@@ -201,6 +201,8 @@ public RecoveryResponse recoverToTarget() throws IOException { | |||
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()), | |||
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger); | |||
|
|||
// DISCUSS: Is it possible to have an operation gets delivered via recovery first, then delivered via replication? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think this is possible. We add the target to the replication group, and then collect the operations from the translog (or Lucene with soft deletes) to send to the target. It's possible that an operation can arrive on the primary, enter the translog on the primary, and then an evil OS scheduler puts the thread handling the replication to the target to sleep. At this moment recovery can execute copying the operation to the target. Then, our thread can wake up and the operation arrive by replication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jasontedor. I had a same thought but was not sure because I failed to come up with a test. Now I have an idea to write that test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a few sinister tests along these lines, where we latch operations in the engine to stall them for nefarious purposes. 😇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I have a test now. However, this corner case is protected by SeqNo. We are all good now.
elasticsearch/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
Line 871 in 0b4960f
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM left a suggestion
@@ -2531,4 +2530,16 @@ void updateRefreshedCheckpoint(long checkpoint) { | |||
assert refreshedCheckpoint.get() >= checkpoint : refreshedCheckpoint.get() + " < " + checkpoint; | |||
} | |||
} | |||
|
|||
@Override | |||
public long getMaxAutoIdTimestamp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just push the maxUnsafeAutoIdTimestamp
up to engine and make the methods final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, if related to this change, we should also stop storing the autoid timestamp on a per-operation basis in the translog and instead just put the max timestamp into the checkpoint file. This would somehow realign Lucene and translog, where translog is just the stuff that's not fsynced to Lucene yet.
final CancellableThreads.IOInterruptable sendBatch = | ||
() -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps)); | ||
final CancellableThreads.IOInterruptable sendBatch = () -> | ||
targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, shard.getMaxAutoIdTimestamp())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a new one for every batch that is to be sent, I would prefer to capture this after we call cancellableThreads.execute(() -> shard.waitForOpsToComplete(endingSeqNo));
in RecoverySourceHandler, and then only pass that same value. You could also add a comment then and there saying why we do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to do this after the snapshot was captured. That said, I'm +1 on explicitly capturing it once at the right moment and use the same value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have to be super careful with naming, java docs and comments to explain exactly what we are doing and why. If we don't do this, it will easily lead to bugs as this is super tricky.
For example - and I might be missing something here - I don't think the implementation does what we intend it to do. The semantics we're after is: since any of the ops we ship may collide with an optimized write we need to mark them as retry and make sure that after an op was indexed the maxUnsafeTimestamp marker on the target engine is at least as high as the original time stamp of operation we're indexing. Since we don't know what the later is (as we don't store it), we planned to use the maximum timestamp of any append only operation indexed by the source engine when the snapshot was captured. That doesn't match to the maxUnsafeAutoIdTimestamp.get()
returned by getMaxAutoIdTimestamp()
. maxUnsafeAutoIdTimestamp
only tracks append only ops that were marked as retry.
I also agree with @ywelsch that if we change semantics for the ops coming from lucene, we should also change semantics for the ops from the translog (i.e., store recovery should also just set a maximum value as unsafe when it starts) and change the translog to just not store individual ops's timestamp. This means that all recoveries (local or remote) work the same and we don't have to keep two models in our heads.
} | ||
|
||
@Override | ||
public void updateMaxAutoIdTimestamp(long newTimestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - updateMaxUnsafeAutoIdTimestamp
public abstract long getMaxAutoIdTimestamp(); | ||
|
||
/** | ||
* Sets the maximum auto-generated timestamp of append-only requests tracked by this engine to {@code newTimestamp}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to speak about updating the unsafe marker here?
final CancellableThreads.IOInterruptable sendBatch = | ||
() -> targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps)); | ||
final CancellableThreads.IOInterruptable sendBatch = () -> | ||
targetLocalCheckpoint.set(recoveryTarget.indexTranslogOperations(operations, expectedTotalOps, shard.getMaxAutoIdTimestamp())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to do this after the snapshot was captured. That said, I'm +1 on explicitly capturing it once at the right moment and use the same value.
I will make a follow-up to remove auto_id_timestamp from translog's operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. I left some minor comments.
@@ -1690,6 +1691,21 @@ public boolean isRecovering() { | |||
*/ | |||
public abstract void maybePruneDeletes(); | |||
|
|||
/** | |||
* Returns the maximum auto_id_timestamp of all append-only have been processed (or force-updated) by this engine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is force updated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see now - can you link to the method in the java docs?
/** | ||
* Tracks auto_id_timestamp of append-only index requests have been processed in an {@link Engine}. | ||
*/ | ||
final class AutoIdTimestamp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need a component for this? it only hides very trivial behavior without much added value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two reasons that I added this class:
- Make sure that we always update two markers when we have a new timestamp
- Having two markers in an Engine may be confusing
I am fine to remove it (e6a929a).
* Forces this engine to advance its max_unsafe_auto_id_timestamp marker to at least the given timestamp. | ||
* The engine will disable optimization for all append-only whose timestamp at most {@code newTimestamp}. | ||
*/ | ||
public abstract void forceUpdateMaxUnsafeAutoIdTimestamp(long newTimestamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
force suggests to me that whatever value you give this method will be set as the new value. It seems that the intended semantics are different. Maybe just call this updateMaxUnsafe...
@@ -141,10 +142,80 @@ public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaDa | |||
} | |||
} | |||
|
|||
public void testRetryAppendOnlyWhileRecovering() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean after recovering here.
@@ -428,6 +441,12 @@ public synchronized void flush() { | |||
public synchronized void close() throws Exception { | |||
if (closed == false) { | |||
closed = true; | |||
for (IndexShard replica : replicas) { | |||
try { | |||
assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
# Conflicts: # server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java
@bleskes I've addressed your comments. Could you please have another look? Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I sadly think this is not enough and we need a follow up to put this information into lucene so it will survive a restart of the primary.
private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) { | ||
assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; | ||
maxSeenAutoIdTimestamp.updateAndGet(curr -> Math.max(curr, newTimestamp)); | ||
assert maxSeenAutoIdTimestamp.get() >= newTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this add much value, we just did a max operation with it.
final RecoveryState.Translog translog = state().getTranslog(); | ||
translog.totalOperations(totalTranslogOps); | ||
assert indexShard().recoveryState() == state(); | ||
if (indexShard().state() != IndexShardState.RECOVERING) { | ||
throw new IndexShardNotRecoveringException(shardId, indexShard().state()); | ||
} | ||
indexShard().updateMaxUnsafeAutoIdTimestamp(maxSeenAutoIdTimestampOnPrimary); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment as to why we set this and how don't know what timestamp is associated with the operation so we use an upper bound?
Thank you so much for reviewing @bleskes, @s1monw, @ywelsch and @jasontedor. |
A follow-up of elastic#33693 to propagate max_seen_auto_id_timestamp in a primary-replica resync.
Today we don't store the auto-generated timestamp of append-only operations in Lucene; and assign -1 to every index operations constructed from LuceneChangesSnapshot. This looks innocent but it generates duplicate documents on a replica if a retry append-only arrives first via peer-recovery; then an original append-only arrives via replication. Since the retry append-only (delivered via recovery) does not have timestamp, the replica will happily optimizes the original request while it should not. This change transmits the max auto-generated timestamp from the primary to replicas before translog phase in peer recovery. This timestamp will prevent replicas from optimizing append-only requests if retry counterparts have been processed. Relates elastic#33656 Relates elastic#33222
Today we don't store the auto-generated timestamp of append-only operations in Lucene; and assign -1 to every index operations constructed from LuceneChangesSnapshot. This looks innocent but it generates duplicate documents on a replica if a retry append-only arrives first via peer-recovery; then an original append-only arrives via replication. Since the retry append-only (delivered via recovery) does not have timestamp, the replica will happily optimizes the original request while it should not. This change transmits the max auto-generated timestamp from the primary to replicas before translog phase in peer recovery. This timestamp will prevent replicas from optimizing append-only requests if retry counterparts have been processed. Relates #33656 Relates #33222
Today we don't store the auto-generated timestamp of append-only
operations in Lucene; and assign -1 to every index operations
constructed from LuceneChangesSnapshot. This looks innocent but it
generates duplicate documents on a replica if a retry append-only
arrives first via peer-recovery; then an original append-only arrives
via replication. Since the retry append-only (delivered via recovery)
does not have timestamp, the replica will happily optimize the original
request while it should not.
This change transmits the max auto-generated timestamp from the primary
to replicas before translog phase in peer recovery. This timestamp will
prevent replicas from optimizing append-only requests if retry
counterparts have been processed.
Relates #33656
Relates #33222