Skip to content

Commit

Permalink
Improve performance of the operation that retrieve all connector for …
Browse files Browse the repository at this point in the history
…a given definition (#18499)
  • Loading branch information
malikdiarra authored and nataly committed Nov 3, 2022
1 parent 09cdebc commit ec24ed3
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,38 @@ public List<DestinationConnection> listWorkspaceDestinationConnection(UUID works
return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList());
}

/**
* Returns all active sources using a definition
*
* @param definitionId - id for the definition
* @return sources
* @throws IOException
*/
public List<SourceConnection> listSourcesForDefinition(UUID definitionId) throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select(asterisk())
.from(ACTOR)
.where(ACTOR.ACTOR_TYPE.eq(ActorType.source))
.and(ACTOR.ACTOR_DEFINITION_ID.eq(definitionId))
.andNot(ACTOR.TOMBSTONE).fetch());
return result.stream().map(DbConverter::buildSourceConnection).collect(Collectors.toList());
}

/**
* Returns all active destinations using a definition
*
* @param definitionId - id for the definition
* @return destinations
* @throws IOException
*/
public List<DestinationConnection> listDestinationsForDefinition(UUID definitionId) throws IOException {
final Result<Record> result = database.query(ctx -> ctx.select(asterisk())
.from(ACTOR)
.where(ACTOR.ACTOR_TYPE.eq(ActorType.destination))
.and(ACTOR.ACTOR_DEFINITION_ID.eq(definitionId))
.andNot(ACTOR.TOMBSTONE).fetch());
return result.stream().map(DbConverter::buildDestinationConnection).collect(Collectors.toList());
}

public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.STANDARD_SYNC, connectionId.toString(), StandardSync.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,22 @@ void testWorkspaceCountConnectionsDeprecated() throws IOException {
assertEquals(1, configRepository.countConnectionsForWorkspace(workspaceId));
}

@Test
void testFetchActorsUsingDefinition() throws IOException {
UUID destinationDefinitionId = MockData.publicDestinationDefinition().getDestinationDefinitionId();
UUID sourceDefinitionId = MockData.publicSourceDefinition().getSourceDefinitionId();
final List<DestinationConnection> destinationConnections = configRepository.listDestinationsForDefinition(
destinationDefinitionId);
final List<SourceConnection> sourceConnections = configRepository.listSourcesForDefinition(
sourceDefinitionId);

assertThat(destinationConnections)
.containsExactlyElementsOf(MockData.destinationConnections().stream().filter(d -> d.getDestinationDefinitionId().equals(
destinationDefinitionId) && !d.getTombstone()).collect(Collectors.toList()));
assertThat(sourceConnections).containsExactlyElementsOf(MockData.sourceConnections().stream().filter(d -> d.getSourceDefinitionId().equals(
sourceDefinitionId) && !d.getTombstone()).collect(Collectors.toList()));
}

@Test
void testSimpleInsertActorCatalog() throws IOException, JsonValidationException, SQLException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,20 @@ public DestinationRead cloneDestination(final DestinationCloneRequestBody destin

public DestinationReadList listDestinationsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<DestinationRead> reads = Lists.newArrayList();

final List<DestinationRead> reads = Lists.newArrayList();
for (final DestinationConnection dci : configRepository.listWorkspaceDestinationConnection(workspaceIdRequestBody.getWorkspaceId())) {
reads.add(buildDestinationRead(dci));
}

return new DestinationReadList().destinations(reads);
}

public DestinationReadList listDestinationsForDestinationDefinition(final DestinationDefinitionIdRequestBody destinationDefinitionIdRequestBody)
throws JsonValidationException, IOException, ConfigNotFoundException {
final List<DestinationRead> reads = Lists.newArrayList();

for (final DestinationConnection destinationConnection : configRepository.listDestinationConnection()) {
if (!destinationConnection.getDestinationDefinitionId().equals(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) {
continue;
}
if (destinationConnection.getTombstone() != null && destinationConnection.getTombstone()) {
continue;
}

for (final DestinationConnection destinationConnection : configRepository
.listDestinationsForDefinition(destinationDefinitionIdRequestBody.getDestinationDefinitionId())) {
reads.add(buildDestinationRead(destinationConnection));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.airbyte.api.model.generated.SourceSearch;
import io.airbyte.api.model.generated.SourceUpdate;
import io.airbyte.api.model.generated.WorkspaceIdRequestBody;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -176,14 +175,8 @@ public SourceReadList listSourcesForWorkspace(final WorkspaceIdRequestBody works
public SourceReadList listSourcesForSourceDefinition(final SourceDefinitionIdRequestBody sourceDefinitionIdRequestBody)
throws JsonValidationException, IOException, ConfigNotFoundException {

final List<SourceConnection> sourceConnections = configRepository.listSourceConnection()
.stream()
.filter(sc -> sc.getSourceDefinitionId().equals(sourceDefinitionIdRequestBody.getSourceDefinitionId())
&& !MoreBooleans.isTruthy(sc.getTombstone()))
.toList();

final List<SourceRead> reads = Lists.newArrayList();
for (final SourceConnection sourceConnection : sourceConnections) {
for (final SourceConnection sourceConnection : configRepository.listSourcesForDefinition(sourceDefinitionIdRequestBody.getSourceDefinitionId())) {
reads.add(buildSourceRead(sourceConnection));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ void testListSourcesForSourceDefinition() throws JsonValidationException, Config
new SourceDefinitionIdRequestBody().sourceDefinitionId(sourceConnection.getSourceDefinitionId());

when(configRepository.getSourceConnection(sourceConnection.getSourceId())).thenReturn(sourceConnection);
when(configRepository.listSourceConnection()).thenReturn(Lists.newArrayList(sourceConnection));
when(configRepository.listSourcesForDefinition(sourceConnection.getSourceDefinitionId())).thenReturn(Lists.newArrayList(sourceConnection));
when(configRepository.getStandardSourceDefinition(sourceDefinitionSpecificationRead.getSourceDefinitionId()))
.thenReturn(standardSourceDefinition);
when(configRepository.getSourceDefinitionFromSource(sourceConnection.getSourceId())).thenReturn(standardSourceDefinition);
Expand Down

0 comments on commit ec24ed3

Please sign in to comment.