Skip to content

Commit

Permalink
Improve performance of list connection operations (#20264)
Browse files Browse the repository at this point in the history
* Enable source/destination filtering in /web_backend/connection/list

* Add tests on connection filtering

* Remove redundant DB call

* Make some methods static to avoid sneaky db calls

* Minor refactor

* Small refactoring + add filtering by source for catalog fetch events

* Add comment

* Trim WebBackendConnectionList response payload

* fix build errors

* Remove requests to list_by_workspace and list_latest

* Add sourcedefid and destdefid to source/dest snippets read

* fixed sourceDefinitionId and destinationDefinitionId

* Fix webbackend handler tests

Co-authored-by: Volodymyr Petrov <[email protected]>
Co-authored-by: KC <[email protected]>
  • Loading branch information
3 people authored Jan 3, 2023
1 parent dddaad5 commit ff1ebb0
Show file tree
Hide file tree
Showing 17 changed files with 260 additions and 135 deletions.
59 changes: 50 additions & 9 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1771,7 +1771,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkspaceIdRequestBody"
$ref: "#/components/schemas/WebBackendConnectionListRequestBody"
required: true
responses:
"200":
Expand Down Expand Up @@ -2738,6 +2738,24 @@ components:
type: string
icon:
type: string
SourceSnippetRead:
type: object
required:
- sourceId
- name
- sourceDefinitionId
- sourceName
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
name:
type: string
sourceDefinitionId:
$ref: "#/components/schemas/SourceDefinitionId"
sourceName:
type: string
icon:
type: string
SourceReadList:
type: object
required:
Expand Down Expand Up @@ -3035,6 +3053,24 @@ components:
type: string
icon:
type: string
DestinationSnippetRead:
type: object
required:
- destinationId
- name
- destinationDefinitionId
- destinationName
properties:
destinationId:
$ref: "#/components/schemas/DestinationId"
name:
type: string
destinationDefinitionId:
$ref: "#/components/schemas/DestinationDefinitionId"
destinationName:
type: string
icon:
type: string
DestinationReadList:
type: object
required:
Expand Down Expand Up @@ -4624,14 +4660,23 @@ components:
type: integer
sourceDefinitions:
type: integer
WebBackendConnectionListRequestBody:
type: object
required:
- workspaceId
properties:
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
sourceId:
$ref: "#/components/schemas/SourceId"
destinationId:
$ref: "#/components/schemas/DestinationId"
WebBackendConnectionListItem:
type: object
description: Information about a connection that shows up in the connection list view.
required:
- connectionId
- name
- sourceId
- destinationId
- source
- destination
- status
Expand All @@ -4642,20 +4687,16 @@ components:
$ref: "#/components/schemas/ConnectionId"
name:
type: string
sourceId:
$ref: "#/components/schemas/SourceId"
destinationId:
$ref: "#/components/schemas/DestinationId"
scheduleType:
$ref: "#/components/schemas/ConnectionScheduleType"
scheduleData:
$ref: "#/components/schemas/ConnectionScheduleData"
status:
$ref: "#/components/schemas/ConnectionStatus"
source:
$ref: "#/components/schemas/SourceRead"
$ref: "#/components/schemas/SourceSnippetRead"
destination:
$ref: "#/components/schemas/DestinationRead"
$ref: "#/components/schemas/DestinationSnippetRead"
latestSyncJobCreatedAt:
$ref: "#/components/schemas/JobCreatedAt"
latestSyncJobStatus:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import jakarta.annotation.Nonnull;
import java.io.IOException;
import java.time.OffsetDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -92,6 +93,8 @@
"OptionalUsedAsFieldOrParameterType"})
public class ConfigRepository {

public record StandardSyncQuery(@Nonnull UUID workspaceId, UUID sourceId, UUID destinationId, boolean includeDeleted) {}

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigRepository.class);
private static final String OPERATION_IDS_AGG_FIELD = "operation_ids_agg";
private static final String OPERATION_IDS_AGG_DELIMITER = ",";
Expand Down Expand Up @@ -843,6 +846,10 @@ public List<StandardSync> listStandardSyncsUsingOperation(final UUID operationId
}

public List<StandardSync> listWorkspaceStandardSyncs(final UUID workspaceId, final boolean includeDeleted) throws IOException {
return listWorkspaceStandardSyncs(new StandardSyncQuery(workspaceId, null, null, includeDeleted));
}

public List<StandardSync> listWorkspaceStandardSyncs(final StandardSyncQuery standardSyncQuery) throws IOException {
final Result<Record> connectionAndOperationIdsResult = database.query(ctx -> ctx
// SELECT connection.* plus the connection's associated operationIds as a concatenated list
.select(
Expand All @@ -856,8 +863,10 @@ public List<StandardSync> listWorkspaceStandardSyncs(final UUID workspaceId, fin

// join with source actors so that we can filter by workspaceId
.join(ACTOR).on(CONNECTION.SOURCE_ID.eq(ACTOR.ID))
.where(ACTOR.WORKSPACE_ID.eq(workspaceId)
.and(includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated)))
.where(ACTOR.WORKSPACE_ID.eq(standardSyncQuery.workspaceId)
.and(standardSyncQuery.destinationId == null ? noCondition() : CONNECTION.DESTINATION_ID.eq(standardSyncQuery.destinationId))
.and(standardSyncQuery.sourceId == null ? noCondition() : CONNECTION.SOURCE_ID.eq(standardSyncQuery.sourceId))
.and(standardSyncQuery.includeDeleted ? noCondition() : CONNECTION.STATUS.notEqual(StatusType.deprecated)))

// group by connection.id so that the groupConcat above works
.groupBy(CONNECTION.ID)).fetch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigRepository.DestinationAndDefinition;
import io.airbyte.config.persistence.ConfigRepository.SourceAndDefinition;
import io.airbyte.config.persistence.ConfigRepository.StandardSyncQuery;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.protocol.models.AirbyteCatalog;
Expand Down Expand Up @@ -188,7 +189,45 @@ void testSimpleInsertActorCatalog() throws IOException, JsonValidationException,
@Test
void testListWorkspaceStandardSyncAll() throws IOException {
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 4);
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(MockData.standardWorkspaces().get(0).getWorkspaceId(), true);
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(
MockData.standardWorkspaces().get(0).getWorkspaceId(), true);

assertSyncsMatch(expectedSyncs, actualSyncs);
}

@Test
void testListWorkspaceStandardSyncWithAllFiltering() throws IOException {
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, MockData.SOURCE_ID_1, MockData.DESTINATION_ID_1, false);
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
.filter(sync -> sync.getDestinationId().equals(query.destinationId()))
.filter(sync -> sync.getSourceId().equals(query.sourceId()))
.toList();
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);

assertSyncsMatch(expectedSyncs, actualSyncs);
}

@Test
void testListWorkspaceStandardSyncDestinationFiltering() throws IOException {
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, null, MockData.DESTINATION_ID_1, false);
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
.filter(sync -> sync.getDestinationId().equals(query.destinationId()))
.toList();
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);

assertSyncsMatch(expectedSyncs, actualSyncs);
}

@Test
void testListWorkspaceStandardSyncSourceFiltering() throws IOException {
final UUID workspaceId = MockData.standardWorkspaces().get(0).getWorkspaceId();
final StandardSyncQuery query = new StandardSyncQuery(workspaceId, MockData.SOURCE_ID_2, null, false);
final List<StandardSync> expectedSyncs = MockData.standardSyncs().subList(0, 3).stream()
.filter(sync -> sync.getSourceId().equals(query.sourceId()))
.toList();
final List<StandardSync> actualSyncs = configRepository.listWorkspaceStandardSyncs(query);

assertSyncsMatch(expectedSyncs, actualSyncs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public class MockData {
public static final UUID SOURCE_ID_1 = UUID.randomUUID();
public static final UUID SOURCE_ID_2 = UUID.randomUUID();
private static final UUID SOURCE_ID_3 = UUID.randomUUID();
private static final UUID DESTINATION_ID_1 = UUID.randomUUID();
private static final UUID DESTINATION_ID_2 = UUID.randomUUID();
private static final UUID DESTINATION_ID_3 = UUID.randomUUID();
public static final UUID DESTINATION_ID_1 = UUID.randomUUID();
public static final UUID DESTINATION_ID_2 = UUID.randomUUID();
public static final UUID DESTINATION_ID_3 = UUID.randomUUID();
private static final UUID OPERATION_ID_1 = UUID.randomUUID();
private static final UUID OPERATION_ID_2 = UUID.randomUUID();
private static final UUID OPERATION_ID_3 = UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import io.airbyte.api.model.generated.ConnectionStateType;
import io.airbyte.api.model.generated.WebBackendCheckUpdatesRead;
import io.airbyte.api.model.generated.WebBackendConnectionCreate;
import io.airbyte.api.model.generated.WebBackendConnectionListRequestBody;
import io.airbyte.api.model.generated.WebBackendConnectionRead;
import io.airbyte.api.model.generated.WebBackendConnectionReadList;
import io.airbyte.api.model.generated.WebBackendConnectionRequestBody;
import io.airbyte.api.model.generated.WebBackendConnectionUpdate;
import io.airbyte.api.model.generated.WebBackendGeographiesListResult;
import io.airbyte.api.model.generated.WebBackendWorkspaceState;
import io.airbyte.api.model.generated.WebBackendWorkspaceStateResult;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.server.handlers.WebBackendCheckUpdatesHandler;
import io.airbyte.server.handlers.WebBackendConnectionsHandler;
import io.airbyte.server.handlers.WebBackendGeographiesHandler;
Expand Down Expand Up @@ -57,8 +57,8 @@ public WebBackendWorkspaceStateResult webBackendGetWorkspaceState(final WebBacke
}

@Override
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody));
public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final WebBackendConnectionListRequestBody webBackendConnectionListRequestBody) {
return ApiHelper.execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(webBackendConnectionListRequestBody));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.api.model.generated.DestinationRead;
import io.airbyte.api.model.generated.DestinationReadList;
import io.airbyte.api.model.generated.DestinationSearch;
import io.airbyte.api.model.generated.DestinationSnippetRead;
import io.airbyte.api.model.generated.DestinationUpdate;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -303,4 +304,14 @@ protected static DestinationRead toDestinationRead(final DestinationConnection d
.icon(DestinationDefinitionsHandler.loadIcon(standardDestinationDefinition.getIcon()));
}

protected static DestinationSnippetRead toDestinationSnippetRead(final DestinationConnection destinationConnection,
final StandardDestinationDefinition standardDestinationDefinition) {
return new DestinationSnippetRead()
.destinationId(destinationConnection.getDestinationId())
.name(destinationConnection.getName())
.destinationDefinitionId(standardDestinationDefinition.getDestinationDefinitionId())
.destinationName(standardDestinationDefinition.getName())
.icon(DestinationDefinitionsHandler.loadIcon(standardDestinationDefinition.getIcon()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.SourceReadList;
import io.airbyte.api.model.generated.SourceSearch;
import io.airbyte.api.model.generated.SourceSnippetRead;
import io.airbyte.api.model.generated.SourceUpdate;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.config.SourceConnection;
Expand Down Expand Up @@ -319,4 +320,13 @@ protected static SourceRead toSourceRead(final SourceConnection sourceConnection
.icon(SourceDefinitionsHandler.loadIcon(standardSourceDefinition.getIcon()));
}

protected static SourceSnippetRead toSourceSnippetRead(final SourceConnection source, final StandardSourceDefinition sourceDefinition) {
return new SourceSnippetRead()
.sourceId(source.getSourceId())
.name(source.getName())
.sourceDefinitionId(sourceDefinition.getSourceDefinitionId())
.sourceName(sourceDefinition.getName())
.icon(SourceDefinitionsHandler.loadIcon(sourceDefinition.getIcon()));
}

}
Loading

0 comments on commit ff1ebb0

Please sign in to comment.