Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of the operation that retrieve all connector for a given definition #18499

Merged
merged 3 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,38 @@ public List<DestinationConnection> listWorkspaceDestinationConnection(UUID works
return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList());
}

/**
* Returns all active sources using a definition
*
* @param definitionId - id for the definition
* @return sources
* @throws IOException
*/
public List<SourceConnection> listSourcesForDefinition(UUID definitionId) throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select(asterisk())
.from(ACTOR)
.where(ACTOR.ACTOR_TYPE.eq(ActorType.source))
.and(ACTOR.ACTOR_DEFINITION_ID.eq(definitionId))
.andNot(ACTOR.TOMBSTONE).fetch());
return result.stream().map(DbConverter::buildSourceConnection).collect(Collectors.toList());
}

/**
* Returns all active destinations using a definition
*
* @param definitionId - id for the definition
* @return destinations
* @throws IOException
*/
public List<DestinationConnection> listDestinationsForDefinition(UUID definitionId) throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select(asterisk())
.from(ACTOR)
.where(ACTOR.ACTOR_TYPE.eq(ActorType.destination))
.and(ACTOR.ACTOR_DEFINITION_ID.eq(definitionId))
.andNot(ACTOR.TOMBSTONE).fetch());
return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList());
}

public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.STANDARD_SYNC, connectionId.toString(), StandardSync.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,22 @@ void testWorkspaceCountConnectionsDeprecated() throws IOException {
assertEquals(1, configRepository.countConnectionsForWorkspace(workspaceId));
}

@Test
void testFetchActorsUsingDefinition() throws IOException {
UUID destinationDefinitionId = MockData.publicDestinationDefinition().getDestinationDefinitionId();
UUID sourceDefinitionId = MockData.publicSourceDefinition().getSourceDefinitionId();
final List<DestinationConnection> destinationConnections = configRepository.listDestinationsForDefinition(
destinationDefinitionId);
final List<SourceConnection> sourceConnections = configRepository.listSourcesForDefinition(
sourceDefinitionId);

assertThat(destinationConnections)
.containsExactlyElementsOf(MockData.destinationConnections().stream().filter(d -> d.getDestinationDefinitionId().equals(
destinationDefinitionId) && !d.getTombstone()).collect(Collectors.toList()));
assertThat(sourceConnections).containsExactlyElementsOf(MockData.sourceConnections().stream().filter(d -> d.getSourceDefinitionId().equals(
sourceDefinitionId) && !d.getTombstone()).collect(Collectors.toList()));
}

@Test
void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, SQLException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,20 @@ public DestinationRead cloneDestination(final DestinationCloneRequestBody destin

public DestinationReadList listDestinationsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<DestinationRead> reads = Lists.newArrayList();

final List<DestinationRead> reads = Lists.newArrayList();
for (final DestinationConnection dci : configRepository.listWorkspaceDestinationConnection(workspaceIdRequestBody.getWorkspaceId())) {
reads.add(buildDestinationRead(dci));
}

return new DestinationReadList().destinations(reads);
}

public DestinationReadList listDestinationsForDestinationDefinition(final DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody)
throws JsonValidationException, IOException, ConfigNotFoundException {
final List<DestinationRead> reads = Lists.newArrayList();

for (final DestinationConnection destinationConnection : configRepository.listDestinationConnection()) {
if (!destinationConnection.getDestinationDefinitionId().equals(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) {
continue;
}
if (destinationConnection.getTombstone() != null && destinationConnection.getTombstone()) {
continue;
}

for (final DestinationConnection destinationConnection : configRepository
.listDestinationsForDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) {
reads.add(buildDestinationRead(destinationConnection));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.airbyte.api.model.generated.SourceSearch;
import io.airbyte.api.model.generated.SourceUpdate;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -176,14 +175,8 @@ public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody works
public SourceReadList listSourcesForSourceDefinition(final SourceDefinitionIdRequestBody sourceDefinitionIdRequestBody)
throws JsonValidationException, IOException, ConfigNotFoundException {

final List<SourceConnection> sourceConnections = configRepository.listSourceConnection()
.stream()
.filter(sc -> sc.getSourceDefinitionId().equals(sourceDefinitionIdRequestBody.getSourceDefinitionId())
&& !MoreBooleans.isTruthy(sc.getTombstone()))
.toList();

final List<SourceRead> reads = Lists.newArrayList();
for (final SourceConnection sourceConnection : sourceConnections) {
for (final SourceConnection sourceConnection : configRepository.listSourcesForDefinition(sourceDefinitionIdRequestBody.getSourceDefinitionId())) {
reads.add(buildSourceRead(sourceConnection));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ void testListSourcesForSourceDefinition() throws JsonValidationException, Config
new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceConnection.getSourceDefinitionId());

when(configRepository.getSourceConnection(sourceConnection.getSourceId())).thenReturn(sourceConnection);
when(configRepository.listSourceConnection()).thenReturn(Lists.newArrayList(sourceConnection));
when(configRepository.listSourcesForDefinition(sourceConnection.getSourceDefinitionId())).thenReturn(Lists.newArrayList(sourceConnection));
when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId()))
.thenReturn(standardSourceDefinition);
when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition);
Expand Down