Skip to content

Commit

Permalink
Display union of schema and replicated schema in connection settings …
Browse files Browse the repository at this point in the history
…page (#11611)

* Fix selected flag in merged catalog.

* Merge cached schema and sync schema by default
  • Loading branch information
malikdiarra authored May 8, 2022
1 parent 886da80 commit c195c4e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,16 @@ private Map<UUID, AirbyteCatalog> findCatalogByHash(final String catalogHash, fi
return result;
}

public ActorCatalog getActorCatalogById(final UUID actorCatalogId)
throws IOException, ConfigNotFoundException {
final Result<Record> result = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk())
.from(ACTOR_CATALOG).where(ACTOR_CATALOG.ID.eq(actorCatalogId))).fetch();
if (result.size() > 0) {
return DbConverter.buildActorCatalog(result.get(0));
}
throw new ConfigNotFoundException(ConfigSchema.ACTOR_CATALOG, actorCatalogId);
}

/**
* Store an Airbyte catalog in DB if it is not present already
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.api.model.AirbyteCatalog;
import io.airbyte.api.model.ConnectionCreate;
import io.airbyte.api.model.ConnectionRead;
import io.airbyte.api.model.ConnectionReadList;
Expand All @@ -22,6 +23,8 @@
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
Expand All @@ -47,6 +50,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -269,6 +273,17 @@ public ConnectionRead getConnection(final UUID connectionId)
return buildConnectionRead(connectionId);
}

public Optional<AirbyteCatalog> getConnectionAirbyteCatalog(final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync connection = configRepository.getStandardSync(connectionId);
if (connection.getSourceCatalogId() == null) {
return Optional.empty();
}
final ActorCatalog catalog = configRepository.getActorCatalogById(connection.getSourceCatalogId());
return Optional.of(CatalogConverter.toApi(Jsons.object(catalog.getCatalog(),
io.airbyte.protocol.models.AirbyteCatalog.class)));
}

public ConnectionReadList searchConnections(final ConnectionSearch connectionSearch)
throws JsonValidationException, IOException, ConfigNotFoundException {
final List<ConnectionRead> reads = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
Expand Down Expand Up @@ -198,19 +199,24 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti

final ConnectionRead connection = connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId());

final Optional<AirbyteCatalog> discovered;
if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) {
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq =
new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId()).disableCache(true);
final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);

final AirbyteCatalog original = connection.getSyncCatalog();
final AirbyteCatalog discovered = discoverSchema.getCatalog();
final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered);

discovered = Optional.of(discoverSchema.getCatalog());
connection.setSourceCatalogId(discoverSchema.getCatalogId());
} else {
discovered = connectionsHandler.getConnectionAirbyteCatalog(webBackendConnectionRequestBody.getConnectionId());
}
final AirbyteCatalog original = connection.getSyncCatalog();
if (discovered.isPresent()) {
final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered.get());
connection.setSyncCatalog(combined);
} else {
connection.setSyncCatalog(original);
}

return buildWebBackendConnectionRead(connection);
}

Expand Down Expand Up @@ -252,9 +258,10 @@ protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog o
}

outputStreamConfig.setAliasName(originalStreamConfig.getAliasName());
outputStreamConfig.setSelected(originalStreamConfig.getSelected());
outputStreamConfig.setSelected(true);
} else {
outputStreamConfig = s.getConfig();
outputStreamConfig.setSelected(false);
}
final AirbyteStreamAndConfiguration outputStream = new AirbyteStreamAndConfiguration()
.stream(Jsons.clone(stream))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,8 @@ public void testUpdateSchemaWithDiscoveryFromEmpty() {
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1");
.aliasName("stream1")
.setSelected(false);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);

Expand Down Expand Up @@ -729,7 +730,8 @@ public void testUpdateSchemaWithDiscoveryResetStream() {
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream1");
.aliasName("stream1")
.setSelected(false);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);

Expand Down Expand Up @@ -791,7 +793,8 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() {
.cursorField(List.of("field1"))
.destinationSyncMode(DestinationSyncMode.APPEND)
.primaryKey(Collections.emptyList())
.aliasName("renamed_stream");
.aliasName("renamed_stream")
.setSelected(true);
final AirbyteStreamAndConfiguration expectedNewStream = ConnectionHelpers.generateBasicApiCatalog().getStreams().get(0);
expectedNewStream.getStream()
.name("stream2")
Expand All @@ -803,7 +806,8 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() {
.cursorField(Collections.emptyList())
.destinationSyncMode(DestinationSyncMode.OVERWRITE)
.primaryKey(Collections.emptyList())
.aliasName("stream2");
.aliasName("stream2")
.setSelected(false);
expected.getStreams().add(expectedNewStream);

final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);
Expand Down

0 comments on commit c195c4e

Please sign in to comment.