Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate streams to reset for update #14188

Closed
wants to merge 10 commits into from
Closed

Conversation

alovew
Copy link
Contributor

@alovew alovew commented Jun 27, 2022

Calculate streams to reset for update

@github-actions github-actions bot added area/platform issues related to the platform area/server labels Jun 27, 2022
@alovew alovew temporarily deployed to more-secrets June 27, 2022 16:56 Inactive
@alovew alovew temporarily deployed to more-secrets June 27, 2022 17:00 Inactive
@alovew alovew force-pushed the anne/reset-update-endpoint branch from dc29f04 to e7eff8f Compare June 29, 2022 19:41
@alovew alovew temporarily deployed to more-secrets June 29, 2022 19:44 Inactive
@alovew alovew force-pushed the anne/reset-update-endpoint branch from e7eff8f to e923d14 Compare June 29, 2022 19:44
@alovew alovew marked this pull request as ready for review June 29, 2022 19:44
@alovew alovew temporarily deployed to more-secrets June 29, 2022 19:46 Inactive
Copy link
Contributor

@lmossman lmossman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small comments and suggestions, otherwise this is looking good. Feel free to re-request a review from me once those are addressed

@@ -259,7 +259,7 @@ public ConnectionRead getConnection(final UUID connectionId)
return buildConnectionRead(connectionId);
}

public static CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
public CatalogDiff getDiff(final AirbyteCatalog oldCatalog, final AirbyteCatalog newCatalog) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this non-static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change bc none of the other methods in the ConnectionsHandler are, and it made testing a little odd, since it's mocked and passed in in the tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but let me know if this messes stuff up. as far as I can see, this method is only used in 2 places and only in the web backend connections handler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I hadn't realized it was being mocked in tests when I left this comment. That is a valid reason to make this non-static so that makes sense. Thanks for explaining!

final ConnectionStateType stateType = getStateType(connectionIdRequestBody);
final List<io.airbyte.protocol.models.StreamDescriptor> streamsToReset;

if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on slack here, we should also reset all streams in the GLOBAL case for now, until we have confirmation that per-stream resets can actually be performed for CDC connectors

final AirbyteCatalog newAirbyteCatalog = webBackendConnectionUpdate.getSyncCatalog();
final CatalogDiff catalogDiff = connectionsHandler.getDiff(apiExistingCatalog, newAirbyteCatalog);
final List<StreamDescriptor> apiStreamsToReset = getStreamsToReset(catalogDiff);
streamsToReset = apiStreamsToReset.stream().map(streamDescriptor -> ProtocolConverters.streamDescriptorToProtocol(streamDescriptor)).toList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: single-argument lambdas like this can be slightly simplified like so:

Suggested change
streamsToReset = apiStreamsToReset.stream().map(streamDescriptor -> ProtocolConverters.streamDescriptorToProtocol(streamDescriptor)).toList();
streamsToReset = apiStreamsToReset.stream().map(ProtocolConverters::streamDescriptorToProtocol).toList();

// TODO (https://github.com/airbytehq/airbyte/issues/12741): change this to only get new/updated
// streams, instead of all
configRepository.getAllStreamsForConnection(webBackendConnectionUpdate.getConnectionId()));
streamsToReset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably check here if streamsToReset is empty, and if so do not call synchronousResetConnection or startNewManualSync. Otherwise, in the case that there is nothing to reset, then we will just cancel any currently running job and start a new normal sync for no reason.

We should also add a test case for this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we sort of had that check with the needsReset bool above?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That needsReset boolean is just checking if the withRefreshedCatalog flag is set on the input to this endpoint, which I believe the webapp sets whenever the Refresh source schema button is pressed in the UI and the user saves the connection settings. So I believe it is possible that the user refreshes the schema, but nothing in the schema has actually changed, and then saves the connection, causing this withRefreshedCatalog flag being set to true despite nothing changing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah is that how it works? so currently, if there is a schema refresh but no actual change, we reset all streams?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did update this, but still curious how this isn't resetting all streams on refresh right now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we probably are resetting all streams right now if that situation happens. Not 100% sure though, could be an interesting thing to test

@@ -488,6 +510,11 @@ protected static ConnectionSearch toConnectionSearch(final WebBackendConnectionS
.status(webBackendConnectionSearch.getStatus());
}

@VisibleForTesting
protected static List<StreamDescriptor> getStreamsToReset(final CatalogDiff catalogDiff) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
protected static List<StreamDescriptor> getStreamsToReset(final CatalogDiff catalogDiff) {
static List<StreamDescriptor> getStreamsToReset(final CatalogDiff catalogDiff) {

This doesn't need protected, since the test is in the same package as this class. So we can just do package-private (i.e. no keyword), which limits this to only being used by classes in the same package

@alovew alovew temporarily deployed to more-secrets June 29, 2022 23:06 Inactive
@alovew alovew requested a review from lmossman June 29, 2022 23:12
@lmossman
Copy link
Contributor

@alovew I responded on the comments above

@alovew alovew temporarily deployed to more-secrets June 30, 2022 01:39 Inactive
Copy link
Contributor

@lmossman lmossman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, probably worth manually testing this out locally a bit to double check that everything is working as expected

Copy link
Contributor

@benmoriceau benmoriceau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the global need to generate the diff for the catalogs. Did we also miss an issue for the reset endpoint?

final ConnectionStateType stateType = getStateType(connectionIdRequestBody);
final List<io.airbyte.protocol.models.StreamDescriptor> streamsToReset;

if (stateType == ConnectionStateType.LEGACY || stateType == ConnectionStateType.NOT_SET || stateType == ConnectionStateType.GLOBAL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this implies that we can't do a partial reset for global. I think that this is not the expected behavior (i.e. : We need to calculate the diff for global as well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We wanted to wait on this until we know that global connectors can handle partial resets. Subodh is currently working on it: https://airbytehq-team.slack.com/archives/C03AS1GAQV6/p1656578565814009?thread_ts=1656533917.939489&cid=C03AS1GAQV6

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we explicitly added the || stateType == ConnectionStateType.GLOBAL because you and Subodh said that partial resets for CDC sources are not currently working. We can remove that once that issue Subodh linked is completed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are they producing a GLOBAL state? If they aren't compatible with the global state, they should produce a legacy state. I think that it is something that we might forget and become a future bug where the UI indicate that we will do a partial reset but in the end we will perform a full reset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alovew I think that I now understand the issue which is that are producing a non legacy state where we should. Let's keep that in mind to not forget to remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I believe the CDC sources will produce a GLOBAL state (@jdpgrailsdev to confirm?), so until CDC sources can support partial resets, we want to still reset everything.

@benmoriceau is the UI currently checking the state type and returning a message about partial resets for GLOBAL and STREAM? If so we may want to ask them to change that logic to only show the partial reset message for STREAM state type, until the CDC support for partial resets is in and we remove this || GLOBAL here.

@alovew could you write a brief ticket and put it in our current sprint to remove this || GLOBAL once CDC sources support partial resets? Just to make extra sure that we don't forget to do that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmossman Correct. The updated connector (Postgres) CDC version will produce both the new GLOBAL payload plus the legacy data payload, that has the legacy CdcState blob in it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lmossman
Copy link
Contributor

@benmoriceau the reset endpoint is what gets called when the Reset your data button is clicked the UI. My understanding was that this should continue to do full resets of all streams. Having this do partial resets would require frontend work that I don't believe is currently planned.

Did you have a different understanding here?

@benmoriceau
Copy link
Contributor

@benmoriceau the reset endpoint is what gets called when the Reset your data button is clicked the UI. My understanding was that this should continue to do full resets of all streams. Having this do partial resets would require frontend work that I don't believe is currently planned.

Did you have a different understanding here?

Yes, I don't remember that if we properly the reset to always do a full reset instead of a partial one.

@benmoriceau
Copy link
Contributor

@benmoriceau the reset endpoint is what gets called when the Reset your data button is clicked the UI. My understanding was that this should continue to do full resets of all streams. Having this do partial resets would require frontend work that I don't believe is currently planned.
Did you have a different understanding here?

Yes, I don't remember that if we properly the reset to always do a full reset instead of a partial one.

@lmossman it was done by you in #13990, I didn't remember that we update to api to pass all the streams.

@lmossman
Copy link
Contributor

lmossman commented Jun 30, 2022

Ah yeah, I understand the confusion now. Yep I added the logic to reset all streams in that endpoint in my PR, since that was needed to make my changes not break current resets

@subodh1810
Copy link
Contributor

commenting just to get a notification when the PR is merged, please ignore me!

@alovew alovew force-pushed the anne/reset-update-endpoint branch from 0af1c61 to 6353e79 Compare July 6, 2022 17:33
@alovew alovew temporarily deployed to more-secrets July 6, 2022 17:35 Inactive
@alovew alovew temporarily deployed to more-secrets July 6, 2022 19:01 Inactive
@andyjih andyjih mentioned this pull request Jul 6, 2022
@alovew alovew temporarily deployed to more-secrets July 6, 2022 20:55 Inactive
@alovew alovew temporarily deployed to more-secrets July 6, 2022 21:48 Inactive
@@ -361,14 +367,32 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
connectionRead = connectionsHandler.updateConnection(connectionUpdate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that I found a bug here while running the acceptance test. Here we are saving the connectionUpdate. This is will also update the catalog since it is part of the update object. So when we fetch it on line 382, the existed one wil thus have no diff with the updated one because the update have already happen. 58319dd fixed my acceptance test. cc @lmossman for your own acceptance test

@benmoriceau
Copy link
Contributor

I requested changes following a comment that I have. It comes when writing the acceptance test, sorry about missing it before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants