-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bmoric/update connection list with breaking #18125
Changes from 53 commits
abaea95
f428057
5a3df67
6b8e1f0
5ac2cde
024b3d3
4fcc5a6
b8eeec2
ce5b574
b14b322
9593ebc
558022c
17a9681
e03cdbb
2686be6
62ca0f7
b78fd6e
7941c76
c42af6b
c444399
2359d9d
50770ae
5bc3e3e
6d13a42
20f9e30
21eb1d0
32613c0
5320f73
826cfb0
1270de2
f10e413
1c9a008
5781ce0
5203c9f
f81285c
6c4d620
9e1db24
e16a35d
1e9cc8a
f38b682
8f28aa5
acf9ec1
fc085cf
95a6dc0
cdf1065
d1683d5
6662bb7
f93f67d
a2b00ca
53937da
8c8cd4b
0a0c500
f4959c8
68d6825
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -975,6 +975,22 @@ public Optional<ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSo | |
return records.stream().findFirst().map(DbConverter::buildActorCatalogFetchEvent); | ||
} | ||
|
||
public Map<UUID, ActorCatalogFetchEvent> getMostRecentActorCatalogFetchEventForSources(final List<UUID> sourceIds) | ||
throws IOException { | ||
|
||
return database.query(ctx -> ctx.fetch( | ||
""" | ||
select actor_catalog_id, actor_id from | ||
(select id, actor_catalog_id, actor_id, config_hash, actor_version, created_at, rank() over (partition by actor_id order by created_at desc) as creation_order_rank, modified_at | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. kind of nitpicky, but do we need to select all these fields or just actor_id, actor_catalog_id, & creation_order_rank? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes most of the column are not needed. I have remove them. Thanks for catching that. |
||
from public.actor_catalog_fetch_event | ||
) table_with_rank | ||
where creation_order_rank = 1; | ||
""")) | ||
.stream().map(DbConverter::buildActorCatalogFetchEvent) | ||
.collect(Collectors.toMap(record -> record.getActorId(), | ||
record -> record)); | ||
} | ||
|
||
/** | ||
* Stores source catalog information. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,7 +99,8 @@ public ConnectionStateType getStateType(final ConnectionIdRequestBody connection | |
return Enums.convertTo(stateHandler.getState(connectionIdRequestBody).getStateType(), ConnectionStateType.class); | ||
} | ||
|
||
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) throws IOException { | ||
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) | ||
throws IOException, JsonValidationException, ConfigNotFoundException { | ||
|
||
// passing 'false' so that deleted connections are not included | ||
final List<StandardSync> standardSyncs = | ||
|
@@ -113,6 +114,9 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final | |
final Map<UUID, JobRead> runningJobByConnectionId = | ||
getRunningJobByConnectionId(standardSyncs.stream().map(StandardSync::getConnectionId).toList()); | ||
|
||
final Map<UUID, ActorCatalogFetchEvent> newestFetchEventsByActorId = | ||
configRepository.getMostRecentActorCatalogFetchEventForSources(new ArrayList<>()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benmoriceau why are we adding more db queries into this handler? it's really specifically not supposed to be making direct database calls. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cgardens It was not very clear that the repository shouldn't have been used here. Especially since it was recently in in the same endpoint and other endpoints to get data from the DB. The ticket related to this PR #17526 is only needing the described functionality in the webBackend endpoint only so it made sense to add that direct call in the WebBackendHandler. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What else do we need to add to make it clear? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is easy to miss when the same field is being used within the same function here. When you are looking at the function to update, it is not explicit that this is a deprecated field. Something like this would make it very explicit that this is deprecated. It will show the getter are an error and would have prevent any use of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks good. let's do that. i also put up a PR that adds more explanation in the javadocs #19719 |
||
|
||
final List<WebBackendConnectionListItem> connectionItems = Lists.newArrayList(); | ||
|
||
for (final StandardSync standardSync : standardSyncs) { | ||
|
@@ -122,7 +126,8 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final | |
sourceReadById, | ||
destinationReadById, | ||
latestJobByConnectionId, | ||
runningJobByConnectionId)); | ||
runningJobByConnectionId, | ||
Optional.ofNullable(newestFetchEventsByActorId.get(standardSync.getSourceId())))); | ||
} | ||
|
||
return new WebBackendConnectionReadList().connections(connectionItems); | ||
|
@@ -175,51 +180,33 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR | |
webBackendConnectionRead.setLatestSyncJobStatus(job.getStatus()); | ||
}); | ||
|
||
SchemaChange schemaChange = getSchemaChange(connectionRead, currentSourceCatalogId); | ||
final Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = | ||
configRepository.getMostRecentActorCatalogFetchEventForSource(connectionRead.getSourceId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has been extracted from the getSchemaChange in order to avoid running one request per connection during a connection list. |
||
|
||
final SchemaChange schemaChange = getSchemaChange(connectionRead, currentSourceCatalogId, mostRecentFetchEvent); | ||
|
||
webBackendConnectionRead.setSchemaChange(schemaChange); | ||
|
||
return webBackendConnectionRead; | ||
} | ||
|
||
/* | ||
* A breakingChange boolean is stored on the connectionRead object and corresponds to the boolean | ||
* breakingChange field on the connection table. If there is not a breaking change, we still have to | ||
* check whether there is a non-breaking schema change by fetching the most recent | ||
* ActorCatalogFetchEvent. A new ActorCatalogFetchEvent is stored each time there is a source schema | ||
* refresh, so if the most recent ActorCatalogFetchEvent has a different actor catalog than the | ||
* existing actor catalog, there is a schema change. | ||
*/ | ||
private SchemaChange getSchemaChange(ConnectionRead connectionRead, Optional<UUID> currentSourceCatalogId) throws IOException { | ||
SchemaChange schemaChange = SchemaChange.NO_CHANGE; | ||
|
||
if (connectionRead.getBreakingChange()) { | ||
schemaChange = SchemaChange.BREAKING; | ||
} else if (currentSourceCatalogId.isPresent()) { | ||
final Optional<ActorCatalogFetchEvent> mostRecentFetchEvent = | ||
configRepository.getMostRecentActorCatalogFetchEventForSource(connectionRead.getSourceId()); | ||
|
||
if (mostRecentFetchEvent.isPresent()) { | ||
if (!mostRecentFetchEvent.get().getActorCatalogId().equals(currentSourceCatalogId.get())) { | ||
schemaChange = SchemaChange.NON_BREAKING; | ||
} | ||
} | ||
} | ||
|
||
return schemaChange; | ||
} | ||
|
||
private WebBackendConnectionListItem buildWebBackendConnectionListItem( | ||
final StandardSync standardSync, | ||
final Map<UUID, SourceRead> sourceReadById, | ||
final Map<UUID, DestinationRead> destinationReadById, | ||
final Map<UUID, JobRead> latestJobByConnectionId, | ||
final Map<UUID, JobRead> runningJobByConnectionId) { | ||
final Map<UUID, JobRead> runningJobByConnectionId, | ||
final Optional<ActorCatalogFetchEvent> latestFetchEvent) | ||
throws JsonValidationException, ConfigNotFoundException, IOException { | ||
|
||
final SourceRead source = sourceReadById.get(standardSync.getSourceId()); | ||
final DestinationRead destination = destinationReadById.get(standardSync.getDestinationId()); | ||
final Optional<JobRead> latestSyncJob = Optional.ofNullable(latestJobByConnectionId.get(standardSync.getConnectionId())); | ||
final Optional<JobRead> latestRunningSyncJob = Optional.ofNullable(runningJobByConnectionId.get(standardSync.getConnectionId())); | ||
final ConnectionRead connectionRead = connectionsHandler.getConnection(standardSync.getConnectionId()); | ||
final Optional<UUID> currentCatalogId = connectionRead == null ? Optional.empty() : Optional.ofNullable(connectionRead.getSourceCatalogId()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it possible for connectionread to be null? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nothing tells me that the field is not nullable so I prefered to handle the possibility. |
||
|
||
final SchemaChange schemaChange = getSchemaChange(connectionRead, currentCatalogId, latestFetchEvent); | ||
|
||
final WebBackendConnectionListItem listItem = new WebBackendConnectionListItem() | ||
.connectionId(standardSync.getConnectionId()) | ||
|
@@ -230,7 +217,8 @@ private WebBackendConnectionListItem buildWebBackendConnectionListItem( | |
.scheduleType(ApiPojoConverters.toApiConnectionScheduleType(standardSync)) | ||
.scheduleData(ApiPojoConverters.toApiConnectionScheduleData(standardSync)) | ||
.source(source) | ||
.destination(destination); | ||
.destination(destination) | ||
.schemaChange(schemaChange); | ||
|
||
listItem.setIsSyncing(latestRunningSyncJob.isPresent()); | ||
|
||
|
@@ -242,6 +230,34 @@ private WebBackendConnectionListItem buildWebBackendConnectionListItem( | |
return listItem; | ||
} | ||
|
||
/* | ||
* A breakingChange boolean is stored on the connectionRead object and corresponds to the boolean | ||
* breakingChange field on the connection table. If there is not a breaking change, we still have to | ||
* check whether there is a non-breaking schema change by fetching the most recent | ||
* ActorCatalogFetchEvent. A new ActorCatalogFetchEvent is stored each time there is a source schema | ||
* refresh, so if the most recent ActorCatalogFetchEvent has a different actor catalog than the | ||
* existing actor catalog, there is a schema change. | ||
*/ | ||
@VisibleForTesting | ||
SchemaChange getSchemaChange( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alovew I refactor this method in order to make it easier to understand. I also added missing test to it. Please make sure that I didn't miss anything in the re-implementation. |
||
final ConnectionRead connectionRead, | ||
final Optional<UUID> currentSourceCatalogId, | ||
final Optional<ActorCatalogFetchEvent> mostRecentFetchEvent) { | ||
if (connectionRead == null || currentSourceCatalogId.isEmpty()) { | ||
return SchemaChange.NO_CHANGE; | ||
} | ||
|
||
if (connectionRead.getBreakingChange() != null && connectionRead.getBreakingChange()) { | ||
return SchemaChange.BREAKING; | ||
} | ||
|
||
if (mostRecentFetchEvent.isPresent() && !mostRecentFetchEvent.map(ActorCatalogFetchEvent::getActorCatalogId).equals(currentSourceCatalogId)) { | ||
return SchemaChange.NON_BREAKING; | ||
} | ||
|
||
return SchemaChange.NO_CHANGE; | ||
} | ||
|
||
private SourceRead getSourceRead(final UUID sourceId) throws JsonValidationException, IOException, ConfigNotFoundException { | ||
final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceId); | ||
return sourceHandler.getSource(sourceIdRequestBody); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau what is this method supposed to do? the argument that is passed in is unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cgardens. It is doing a similar operation than
getMostRecentActorCatalogFetchEventForSource
but for a list of sourceIds instead of a single source Id. The use of the input got lost during PR updatesFixed in #19668