Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-428: Mark the copy complete in the source offset for the last copied document #168

Merged
merged 4 commits into from
Oct 21, 2024

Conversation

Calvinnix
Copy link
Collaborator

@Calvinnix Calvinnix commented Oct 16, 2024

This fixes an issue that was occurring when:

  1. startup mode COPY_EXISTING is used
  2. the copy has completed
  3. but no new has been sent
  4. then the kafka connector restarts

The resulting behavior of this scenario is that the copy would occur again on restart.


This was happening because sourceOffets for copied records specify copy: true, so the restart thinks that the copy was in progress and it tries again. (note that restarts that occur during a copy is expected to reattempt the copy, which will duplicate data)

Here is an example of what that copy offset looks like:

{"_id":"{\"_id\": {\"$oid\": \"670ee1efaa5a9af80d592c47\"}, \"copyingData\": true}","copy":"true"}

It does look like we have logic which appears to try to "mark copying ended" but this doesn't work when the cachedResult is null.

      // Copying finished - mark copying ended and add cached result
      isCopying = false;
      LOGGER.info("Finished copying existing data from the collection(s).");
      if (cachedResult != null) {
        batch.add(cachedResult);
        cachedResult = null;
      }

(I'm not actually sure if this logic ever worked, but I could see a future where the cachedResult concept is removed. I haven't thought about this enough to form a strong enough opinion yet)


The fix for this issue was to identify when we are creating the sourceOffset for the last copied document and in that case go ahead and "mark copying ended" by removing the copy: true flag and setting the '_id' field to the cachedResumeToken.

@@ -169,6 +169,9 @@ void testSourceLoadsDataFromMongoClientWithCopyExisting() {
new HashMap<String, String>() {
{
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
String namespaceRegex =
String.format("(%s\\.coll|%s\\.coll)", db1.getName(), db2.getName());
put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[driveby] without this filter this test tries to copy all data

Comment on lines 224 to 234
// if isCopying is true, we want to set the COPY_KEY flag so that kafka has context that a
// copy is in progress. However, for the last document that we are copying, we should not set
// this flag because the copy has completed, otherwise we are relying on future change stream
// events to signify that we are no longer copying. We also need to set the _id field to be a
// valid resume token, which during copying exists in the cachedResumeToken variable.
boolean lastDocument = !batchIterator.hasNext();
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying();
if (isCopying && lastDocument && noMoreDataToCopy) {
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson());
sourceOffset.remove(COPY_KEY);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Highlighting this since this is the logic that fixes the issue.

Copy link
Collaborator

@arahmanan arahmanan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! I did a first pass

() ->
assertTrue(
firstPoll.stream()
.map(SourceRecord::sourceOffset)
.allMatch(i -> i.containsKey("copy"))));
.limit(149)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[opt] Thoughts on replacing this magic number to make this test a little more readable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what do you think about changing this to 150 - 1 with a comment like // exclude the last record

I extracted these numbers into variables but I feel like it hurt the readability, especially for the other test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be the case even if all these numbers are generated from two variables, such as numInsertsPerNS=75 and numNamespaces=2? Either way, I'm fine with keeping it as is.

assertTrue(
thirdPoll.stream()
.map(SourceRecord::sourceOffset)
.limit(24)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same optional comment here

sourceOffset.put(COPY_KEY, "true");
}
Iterator<BsonDocument> batchIterator = getNextBatch().iterator();
BsonDocument changeStreamDocument;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Do we need to define this outside the while loop?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err nope 😆 I'll update

boolean lastDocument = !batchIterator.hasNext();
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying();
if (isCopying && lastDocument && noMoreDataToCopy) {
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great find! Should we add a test that confirms we're setting the expected resume token?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we got coverage of this with the recent e2e tests I added around restarting after copying. Let me know if you disagree.

boolean lastDocument = !batchIterator.hasNext();
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying();
if (isCopying && lastDocument && noMoreDataToCopy) {
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of cachedResumeToken , it might be safer to create and use a different variable here that contains the same value as cachedResumeToken but isn't modified anywhere else. cachedResumeToken get's set to null (here) within getResumeToken, which ends up being called here if cursor == null. The cursor could be set to null if the cluster runs into a temporary connection issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good catch, I'll create a new token called resumeTokenAfterCopy

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait I think we're actually good here, I'm happy to create a new variable for extra safety though if you're still concerned.

because we are assigning this in the copied batch the cursor == null code won't be hit until after we use and store the cachedResumeToken. i.e. the getNextBatch function exits early here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'm fine with keeping it as is as well.

@@ -544,7 +559,7 @@ void testCopyingExistingWithARestartMidwayThrough() {
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
put(MongoSourceConfig.STARTUP_MODE_CONFIG, StartupMode.COPY_EXISTING.propertyValue());
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "25");
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "10000");
put(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, "1000");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[driveby] because this test expects an empty batch we should reduce the poll await time even further, otherwise we're guaranteeing to wait for at least 5x10000ms. This just speeds the test up.

Comment on lines +578 to +580
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
@SuppressWarnings("unchecked")
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this code is just to make the (Map<String, Object>) lastOffset cast safer.. since this is just a test we could probably skip the null check.

assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
@SuppressWarnings("unchecked")
Map<String, Object> mockedOffset = (Map<String, Object>) lastOffset;
when(offsetStorageReader.offset(any())).thenReturn(mockedOffset);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[driveby] I included mocking logic in this test because otherwise the code will always re-copy the data, but not because it sees the expected copy offset, but because the context is null.


// mock the context so that on restart we know where the last task left off
when(context.offsetStorageReader()).thenReturn(offsetStorageReader);
assertNotNull(lastOffset.get("_id")); // check to make sure the value is an Object
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as the one above for mocking offset, we could remove this line if we want.

@Calvinnix
Copy link
Collaborator Author

Calvinnix commented Oct 17, 2024

So the integration tests are failing for version 3.6 which I'm assuming has something to do with changestream/resumetoken support. I'm wondering if I should just remove this test (and potentially replace with newer versions of mongodb)

cc: @arahmanan

EDIT: I added some logging in this patch and found that for version 3.6 the cachedResumeToken is initially stored as null. It might be worth including a null check in the if condition for this logic to avoid these scenarios.

Copy link
Collaborator

@arahmanan arahmanan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final comment before approving!

boolean lastDocument = !batchIterator.hasNext();
boolean noMoreDataToCopy = copyDataManager != null && !copyDataManager.isCopying();
if (isCopying && lastDocument && noMoreDataToCopy) {
sourceOffset.put(ID_FIELD, cachedResumeToken.toJson());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'm fine with keeping it as is as well.

() ->
assertTrue(
firstPoll.stream()
.map(SourceRecord::sourceOffset)
.allMatch(i -> i.containsKey("copy"))));
.limit(149)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be the case even if all these numbers are generated from two variables, such as numInsertsPerNS=75 and numNamespaces=2? Either way, I'm fine with keeping it as is.

@@ -579,6 +617,71 @@ void testCopyingExistingWithARestartMidwayThrough() {
}
}

@Test
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the integration tests are failing for version 3.6 which I'm assuming has something to do with changestream/resumetoken support. I'm wondering if I should just remove this test (and potentially replace with newer versions of mongodb)
EDIT: I added some logging in this patch and found that for version 3.6 the cachedResumeToken is initially stored as null. It might be worth including a null check in the if condition for this logic to avoid these scenarios.

Moving this thread here. I agree with adding a null check. I don't expect customers to be using v3.6 in production, but it's still worth avoiding the null pointer exception. Out of curiosity, did you find any valuable details from the logs you've added?

Copy link
Collaborator Author

@Calvinnix Calvinnix Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the logs showed that cachedResumeToken was being initialized as null. (search for CALVIN: if you wanted to review the logs)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I just added the null check, I'll let the tests run but I do expect the test to still fail for version 3.6 because it doesn't have a resume token to set and resume from.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@arahmanan arahmanan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@Calvinnix Calvinnix merged commit e4cda27 into mongodb:master Oct 21, 2024
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants