-
Notifications
You must be signed in to change notification settings - Fork 246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-428 bug where copy existing is not persisted after it has finished copying #166
Conversation
sourceOffset.put(COPY_KEY, "true"); | ||
} | ||
|
||
sourceOffset.put(COPY_KEY, String.valueOf(isCopying)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rahulbats thanks for opening a pull request for this! We don't check the value of the COPY_KEY
element within the source offset. That's the case here and here. The existence alone of the COPY_KEY
key signals that the existing data should be copied. It seems like we are handling things properly without these changes. Would you be able to provide more details on the issue you experienced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rahulbats, @arahmanan has a good point; shouldn't you also check the value of the COPY_KEY? not just that it exists ? otherwise the connector will copy even if the COPY_KEY value is false, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @arahmanan you are right it only checks the key exists. I changed that code in my PR to verify if it is true as well. If we dont do this when the connector restarts it will reread the collection again . The isCopying false has to be persisted in the offsets topic , so that when connector restarts it sees the flag is false now and doesnt start rereading the entire collection again. This has been a big issue with out customer and they have giant collections which are getting reread generating lot of dups and wastage of time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @rahulbats, we would need to fix this if statement as well or we won't resume from the persisted resume token on restarts.
That being said, I don't see a change in behavior when the connector restarts with these changes. shouldCopyData
was already returning false
when the COPY_KEY
wasn't persisted. The getResumeToken
function seems to have the correct logic as well (i.e. return a resume token if the COPY_KEY
is not persisted). Those are the only two places (here and here) where the persisted COPY_KEY
is used. I believe something else is causing the issue your customer is running into.
Do you have more details from the customer, such as logs and MongoDB version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted with Rahul about this. The COPY_KEY
could be set to true in a prior poll.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove the leading space here as well as the extra new lines on :218 and :220
Also I think the build failures have to do with formatting issues with the if conditions.
Edit: I'll push these changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to break this down visually it sounds like the offset persists the COPY_KEY because when we update the offset we aren't updating the value.
i.e.
offset
{
ID: 1,
COPY_KEY: true
}
<- no longer copying, updating offset to 2
{
ID: 2
}
resulting offset:
{
ID: 2,
COPY_KEY: true
}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this looks right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick update, I just put up a PR that should address the issue you are seeing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you! LGTM!
sourceOffset.put(COPY_KEY, "true"); | ||
} | ||
|
||
sourceOffset.put(COPY_KEY, String.valueOf(isCopying)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted with Rahul about this. The COPY_KEY
could be set to true in a prior poll.
…sk switched to change stream the isCopying flag tracks wether the connector task is copying collection data or tracking change stream. This flag needs to be persisted, so that if the connector restarts later it does no start copying the collection again. There was a if statement which persisted this flag only when iscopying is true which is wrong, as the iscopying flag needs to be persisted in offsets after it has turned false when it has finished copying
added string valueof
add check to verify COPY_key is true. Without this connector will reread the collection on restarts
string parse
fix if statement
This was fixed in #168 closing this PR |
…sk switched to change stream
the isCopying flag tracks wether the connector task is copying collection data or tracking change stream. This flag needs to be persisted, so that if the connector restarts later it does no start copying the collection again. There was a if statement which persisted this flag only when iscopying is true which is wrong, as the iscopying flag needs to be persisted in offsets after it has turned false when it has finished copying