From c195c4ebd9f2d805de759eafe4e07167ded2b9be Mon Sep 17 00:00:00 2001 From: Malik Diarra Date: Sun, 8 May 2022 16:16:12 -0700 Subject: [PATCH] Display union of schema and replicated schema in connection settings page (#11611) * Fix selected flag in merged catalog. * Merge cached schema and sync schema by default --- .../config/persistence/ConfigRepository.java | 10 ++++++++++ .../server/handlers/ConnectionsHandler.java | 15 +++++++++++++++ .../WebBackendConnectionsHandler.java | 19 +++++++++++++------ .../WebBackendConnectionsHandlerTest.java | 12 ++++++++---- 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index c5c208b8a8ba..76e5d06d4bea 100644 --- a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -794,6 +794,16 @@ private Map findCatalogByHash(final String catalogHash, fi return result; } + public ActorCatalog getActorCatalogById(final UUID actorCatalogId) + throws IOException, ConfigNotFoundException { + final Result result = database.query(ctx -> ctx.select(ACTOR_CATALOG.asterisk()) + .from(ACTOR_CATALOG).where(ACTOR_CATALOG.ID.eq(actorCatalogId))).fetch(); + if (result.size() > 0) { + return DbConverter.buildActorCatalog(result.get(0)); + } + throw new ConfigNotFoundException(ConfigSchema.ACTOR_CATALOG, actorCatalogId); + } + /** * Store an Airbyte catalog in DB if it is not present already * diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index 600593eb6535..ea24b2ed26a4 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -9,6 +9,7 @@ import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; import io.airbyte.analytics.TrackingClient; +import io.airbyte.api.model.AirbyteCatalog; import io.airbyte.api.model.ConnectionCreate; import io.airbyte.api.model.ConnectionRead; import io.airbyte.api.model.ConnectionReadList; @@ -22,6 +23,8 @@ import io.airbyte.api.model.WorkspaceIdRequestBody; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ActorCatalog; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Schedule; @@ -47,6 +50,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -269,6 +273,17 @@ public ConnectionRead getConnection(final UUID connectionId) return buildConnectionRead(connectionId); } + public Optional getConnectionAirbyteCatalog(final UUID connectionId) + throws JsonValidationException, ConfigNotFoundException, IOException { + final StandardSync connection = configRepository.getStandardSync(connectionId); + if (connection.getSourceCatalogId() == null) { + return Optional.empty(); + } + final ActorCatalog catalog = configRepository.getActorCatalogById(connection.getSourceCatalogId()); + return Optional.of(CatalogConverter.toApi(Jsons.object(catalog.getCatalog(), + io.airbyte.protocol.models.AirbyteCatalog.class))); + } + public ConnectionReadList searchConnections(final ConnectionSearch connectionSearch) throws JsonValidationException, IOException, ConfigNotFoundException { final List reads = Lists.newArrayList(); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index d76dc026450c..0373d36b6865 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -55,6 +55,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.function.Predicate; @@ -198,19 +199,24 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti final ConnectionRead connection = connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId()); + final Optional discovered; if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) { final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody().sourceId(connection.getSourceId()).disableCache(true); final SourceDiscoverSchemaRead discoverSchema = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq); - final AirbyteCatalog original = connection.getSyncCatalog(); - final AirbyteCatalog discovered = discoverSchema.getCatalog(); - final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered); - + discovered = Optional.of(discoverSchema.getCatalog()); connection.setSourceCatalogId(discoverSchema.getCatalogId()); + } else { + discovered = connectionsHandler.getConnectionAirbyteCatalog(webBackendConnectionRequestBody.getConnectionId()); + } + final AirbyteCatalog original = connection.getSyncCatalog(); + if (discovered.isPresent()) { + final AirbyteCatalog combined = updateSchemaWithDiscovery(original, discovered.get()); connection.setSyncCatalog(combined); + } else { + connection.setSyncCatalog(original); } - return buildWebBackendConnectionRead(connection); } @@ -252,9 +258,10 @@ protected static AirbyteCatalog updateSchemaWithDiscovery(final AirbyteCatalog o } outputStreamConfig.setAliasName(originalStreamConfig.getAliasName()); - outputStreamConfig.setSelected(originalStreamConfig.getSelected()); + outputStreamConfig.setSelected(true); } else { outputStreamConfig = s.getConfig(); + outputStreamConfig.setSelected(false); } final AirbyteStreamAndConfiguration outputStream = new AirbyteStreamAndConfiguration() .stream(Jsons.clone(stream)) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 10eecca96ece..2e2c9a636244 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -680,7 +680,8 @@ public void testUpdateSchemaWithDiscoveryFromEmpty() { .cursorField(Collections.emptyList()) .destinationSyncMode(DestinationSyncMode.OVERWRITE) .primaryKey(Collections.emptyList()) - .aliasName("stream1"); + .aliasName("stream1") + .setSelected(false); final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered); @@ -729,7 +730,8 @@ public void testUpdateSchemaWithDiscoveryResetStream() { .cursorField(Collections.emptyList()) .destinationSyncMode(DestinationSyncMode.OVERWRITE) .primaryKey(Collections.emptyList()) - .aliasName("stream1"); + .aliasName("stream1") + .setSelected(false); final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered); @@ -791,7 +793,8 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() { .cursorField(List.of("field1")) .destinationSyncMode(DestinationSyncMode.APPEND) .primaryKey(Collections.emptyList()) - .aliasName("renamed_stream"); + .aliasName("renamed_stream") + .setSelected(true); final AirbyteStreamAndConfiguration expectedNewStream = ConnectionHelpers.generateBasicApiCatalog().getStreams().get(0); expectedNewStream.getStream() .name("stream2") @@ -803,7 +806,8 @@ public void testUpdateSchemaWithDiscoveryMergeNewStream() { .cursorField(Collections.emptyList()) .destinationSyncMode(DestinationSyncMode.OVERWRITE) .primaryKey(Collections.emptyList()) - .aliasName("stream2"); + .aliasName("stream2") + .setSelected(false); expected.getStreams().add(expectedNewStream); final AirbyteCatalog actual = WebBackendConnectionsHandler.updateSchemaWithDiscovery(original, discovered);