From 083778183e788e09728a0e13eca35b8fccc48a1d Mon Sep 17 00:00:00 2001 From: Xiaohan Song Date: Mon, 23 Jan 2023 11:02:30 -0800 Subject: [PATCH] Add write discover fetch event API (#21506) * api changes for writing discover catalog * api changes * format * change return type of the API to return catalogId * PR comments fix * format --- airbyte-api/src/main/openapi/config.yaml | 44 ++++++ .../server/apis/SourceApiController.java | 8 ++ .../server/handlers/ConnectionsHandler.java | 4 +- .../server/handlers/SchedulerHandler.java | 2 +- .../server/handlers/SourceHandler.java | 15 +++ .../WebBackendConnectionsHandler.java | 4 +- .../handlers/helpers/CatalogConverter.java | 39 +++++- .../converters/CatalogConverterTest.java | 23 ++-- .../server/handlers/SourceHandlerTest.java | 28 ++++ .../WebBackendConnectionsHandlerTest.java | 2 +- .../api/generated-api-html/index.html | 125 ++++++++++++++++++ 11 files changed, 272 insertions(+), 22 deletions(-) diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 2107c31066f7..3d22973efe42 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -754,6 +754,27 @@ paths: $ref: "#/components/responses/NotFoundResponse" "422": $ref: "#/components/responses/InvalidInputResponse" + + /v1/sources/write_discover_catalog_result: + post: + tags: + - source + - internal + summary: Should only called from worker, to write result from discover activity back to DB. + operationId: writeDiscoverCatalogResult + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/SourceDiscoverSchemaWriteRequestBody" + required: true + responses: + "200": + description: Successful Operation + content: + application/json: + schema: + $ref: "#/components/schemas/DiscoverCatalogResult" /v1/destination_definitions/update: post: tags: @@ -2812,6 +2833,21 @@ components: type: array items: $ref: "#/components/schemas/SourceRead" + SourceDiscoverSchemaWriteRequestBody: + description: to write this requested object to database. + type: object + required: + - catalog + properties: + catalog: + $ref: "#/components/schemas/AirbyteCatalog" + sourceId: + $ref: "#/components/schemas/SourceId" + connectorVersion: + type: string + configurationHash: + type: string + SourceDiscoverSchemaRead: description: Returns the results of a discover catalog job. If the job was not successful, the catalog field will not be present. jobInfo will aways be present and its status be used to determine if the job was successful or not. type: object @@ -4993,6 +5029,14 @@ components: properties: succeeded: type: boolean + DiscoverCatalogResult: + type: object + required: + - catalogId + properties: + catalogId: + type: string + format: uuid AttemptNormalizationStatusReadList: type: object properties: diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java index c1d5970465ca..d7946484d8ee 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/SourceApiController.java @@ -10,10 +10,12 @@ import io.airbyte.api.generated.SourceApi; import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.CheckConnectionRead; +import io.airbyte.api.model.generated.DiscoverCatalogResult; import io.airbyte.api.model.generated.SourceCloneRequestBody; import io.airbyte.api.model.generated.SourceCreate; import io.airbyte.api.model.generated.SourceDiscoverSchemaRead; import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody; +import io.airbyte.api.model.generated.SourceDiscoverSchemaWriteRequestBody; import io.airbyte.api.model.generated.SourceIdRequestBody; import io.airbyte.api.model.generated.SourceRead; import io.airbyte.api.model.generated.SourceReadList; @@ -123,4 +125,10 @@ public SourceRead updateSource(final SourceUpdate sourceUpdate) { return ApiHelper.execute(() -> sourceHandler.updateSource(sourceUpdate)); } + @Post("/write_discover_catalog_result") + @Override + public DiscoverCatalogResult writeDiscoverCatalogResult(final SourceDiscoverSchemaWriteRequestBody request) { + return ApiHelper.execute(() -> sourceHandler.writeDiscoverCatalogResult(request)); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index ba32bfeb380e..12aff047d12a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -160,7 +160,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) // TODO Undesirable behavior: sending a null configured catalog should not be valid? if (connectionCreate.getSyncCatalog() != null) { - standardSync.withCatalog(CatalogConverter.toProtocol(connectionCreate.getSyncCatalog())); + standardSync.withCatalog(CatalogConverter.toConfiguredProtocol(connectionCreate.getSyncCatalog())); standardSync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(connectionCreate.getSyncCatalog())); } else { standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList())); @@ -318,7 +318,7 @@ private static void applyPatchToStandardSync(final StandardSync sync, final Conn // in the patch. Otherwise, leave the field unchanged. if (patch.getSyncCatalog() != null) { - sync.setCatalog(CatalogConverter.toProtocol(patch.getSyncCatalog())); + sync.setCatalog(CatalogConverter.toConfiguredProtocol(patch.getSyncCatalog())); sync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(patch.getSyncCatalog())); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index d2270ccf172b..97e8bfb5cc00 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -404,7 +404,7 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco connectionRead.getSyncCatalog(); final CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(), - CatalogConverter.toProtocol(currentAirbyteCatalog)); + CatalogConverter.toConfiguredProtocol(currentAirbyteCatalog)); final boolean containsBreakingChange = containsBreakingChange(diff); final ConnectionUpdate updateObject = new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId()); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java index cfb0fb278b8c..c8c366bab4a8 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceHandler.java @@ -8,10 +8,12 @@ import com.google.common.collect.Lists; import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt; import io.airbyte.api.model.generated.ConnectionRead; +import io.airbyte.api.model.generated.DiscoverCatalogResult; import io.airbyte.api.model.generated.SourceCloneConfiguration; import io.airbyte.api.model.generated.SourceCloneRequestBody; import io.airbyte.api.model.generated.SourceCreate; import io.airbyte.api.model.generated.SourceDefinitionIdRequestBody; +import io.airbyte.api.model.generated.SourceDiscoverSchemaWriteRequestBody; import io.airbyte.api.model.generated.SourceIdRequestBody; import io.airbyte.api.model.generated.SourceRead; import io.airbyte.api.model.generated.SourceReadList; @@ -27,8 +29,10 @@ import io.airbyte.config.persistence.SecretsRepositoryWriter; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.persistence.job.factory.OAuthConfigSupplier; +import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.server.converters.ConfigurationUpdate; +import io.airbyte.server.handlers.helpers.CatalogConverter; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import jakarta.inject.Inject; @@ -261,6 +265,17 @@ public void deleteSource(final SourceRead source) spec); } + public DiscoverCatalogResult writeDiscoverCatalogResult(final SourceDiscoverSchemaWriteRequestBody request) + throws JsonValidationException, IOException { + final AirbyteCatalog persistenceCatalog = CatalogConverter.toProtocol(request.getCatalog()); + UUID catalogId = configRepository.writeActorCatalogFetchEvent( + persistenceCatalog, + request.getSourceId(), + request.getConnectorVersion(), + request.getConfigurationHash()); + return new DiscoverCatalogResult().catalogId(catalogId); + } + private SourceRead buildSourceRead(final UUID sourceId) throws ConfigNotFoundException, IOException, JsonValidationException { // read configuration from db diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 99303a29f615..e45c35686704 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -546,7 +546,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne Jsons.object(mostRecentActorCatalog.get().getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class); final CatalogDiff catalogDiff = connectionsHandler.getDiff(newAirbyteCatalog, CatalogConverter.toApi(mostRecentAirbyteCatalog), - CatalogConverter.toProtocol(newAirbyteCatalog)); + CatalogConverter.toConfiguredProtocol(newAirbyteCatalog)); breakingChange = containsBreakingChange(catalogDiff); } } @@ -602,7 +602,7 @@ private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendCon CatalogConverter.getFieldSelectionData(oldConnectionRead.getSyncCatalog())); final AirbyteCatalog upToDateAirbyteCatalog = updatedConnectionRead.getSyncCatalog(); final CatalogDiff catalogDiff = - connectionsHandler.getDiff(apiExistingCatalog, upToDateAirbyteCatalog, CatalogConverter.toProtocol(upToDateAirbyteCatalog)); + connectionsHandler.getDiff(apiExistingCatalog, upToDateAirbyteCatalog, CatalogConverter.toConfiguredProtocol(upToDateAirbyteCatalog)); final List apiStreamsToReset = getStreamsToReset(catalogDiff); final Set changedConfigStreamDescriptors = connectionsHandler.getConfigurationDiff(apiExistingCatalog, upToDateAirbyteCatalog); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java index 8864c1b8130f..6cf626f381b4 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/helpers/CatalogConverter.java @@ -48,7 +48,7 @@ private static io.airbyte.api.model.generated.AirbyteStream toApi(final io.airby } @SuppressWarnings("PMD.AvoidLiteralsInIfCondition") - private static io.airbyte.protocol.models.AirbyteStream toProtocol(final AirbyteStream stream, AirbyteStreamConfiguration config) + private static io.airbyte.protocol.models.AirbyteStream toConfiguredProtocol(final AirbyteStream stream, AirbyteStreamConfiguration config) throws JsonValidationException { if (config.getFieldSelectionEnabled() != null && config.getFieldSelectionEnabled()) { // Validate the selected field paths. @@ -173,7 +173,8 @@ private static Boolean getStreamHasFieldSelectionEnabled(FieldSelectionData fiel /** * Converts the API catalog model into a protocol catalog. Note: returns all streams, regardless of * selected status. See - * {@link CatalogConverter#toProtocol(AirbyteStream, AirbyteStreamConfiguration)} for context. + * {@link CatalogConverter#toConfiguredProtocol(AirbyteStream, AirbyteStreamConfiguration)} for + * context. * * @param catalog api catalog * @return protocol catalog @@ -183,7 +184,35 @@ public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocolKeep throws JsonValidationException { final AirbyteCatalog clone = Jsons.clone(catalog); clone.getStreams().forEach(stream -> stream.getConfig().setSelected(true)); - return toProtocol(clone); + return toConfiguredProtocol(clone); + } + + /** + * To convert AirbyteCatalog from APIs to model. This is to differentiate between + * toConfiguredProtocol as the other one converts to ConfiguredAirbyteCatalog object instead. + */ + public static io.airbyte.protocol.models.AirbyteCatalog toProtocol( + final io.airbyte.api.model.generated.AirbyteCatalog catalog) + throws JsonValidationException { + final ArrayList errors = new ArrayList<>(); + + io.airbyte.protocol.models.AirbyteCatalog protoCatalog = + new io.airbyte.protocol.models.AirbyteCatalog(); + var airbyteStream = catalog.getStreams().stream().map(stream -> { + try { + return toConfiguredProtocol(stream.getStream(), stream.getConfig()); + } catch (JsonValidationException e) { + LOGGER.error("Error parsing catalog: {}", e); + errors.add(e); + return null; + } + }).collect(Collectors.toList()); + + if (!errors.isEmpty()) { + throw errors.get(0); + } + protoCatalog.withStreams(airbyteStream); + return protoCatalog; } /** @@ -196,7 +225,7 @@ public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocolKeep * @param catalog api catalog * @return protocol catalog */ - public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocol(final io.airbyte.api.model.generated.AirbyteCatalog catalog) + public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toConfiguredProtocol(final io.airbyte.api.model.generated.AirbyteCatalog catalog) throws JsonValidationException { final ArrayList errors = new ArrayList<>(); final List streams = catalog.getStreams() @@ -205,7 +234,7 @@ public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocol(fin .map(s -> { try { return new io.airbyte.protocol.models.ConfiguredAirbyteStream() - .withStream(toProtocol(s.getStream(), s.getConfig())) + .withStream(toConfiguredProtocol(s.getStream(), s.getConfig())) .withSyncMode(Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.protocol.models.SyncMode.class)) .withCursorField(s.getConfig().getCursorField()) .withDestinationSyncMode(Enums.convertTo(s.getConfig().getDestinationSyncMode(), diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/CatalogConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/CatalogConverterTest.java index 59602b2e7263..d9d41bda7ec4 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/CatalogConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/CatalogConverterTest.java @@ -28,7 +28,8 @@ class CatalogConverterTest { @Test void testConvertToProtocol() throws JsonValidationException { - assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), CatalogConverter.toProtocol(ConnectionHelpers.generateBasicApiCatalog())); + assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), + CatalogConverter.toConfiguredProtocol(ConnectionHelpers.generateBasicApiCatalog())); } @Test @@ -49,7 +50,7 @@ void testConvertToProtocolColumnSelectionValidation() { // fieldSelectionEnabled=true but selectedFields=null. final var catalog = ConnectionHelpers.generateBasicApiCatalog(); catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).selectedFields(null); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); assertThrows(JsonValidationException.class, () -> { @@ -57,14 +58,14 @@ void testConvertToProtocolColumnSelectionValidation() { final var catalog = ConnectionHelpers.generateBasicApiCatalog(); ((ObjectNode) catalog.getStreams().get(0).getStream().getJsonSchema()).remove("properties"); catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo")); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); assertThrows(JsonValidationException.class, () -> { // SelectedFieldInfo with empty path. final var catalog = ConnectionHelpers.generateBasicApiCatalog(); catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo()); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); assertThrows(UnsupportedOperationException.class, () -> { @@ -72,14 +73,14 @@ void testConvertToProtocolColumnSelectionValidation() { final var catalog = ConnectionHelpers.generateBasicApiCatalog(); catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true) .addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo").addFieldPathItem("bar")); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); assertThrows(JsonValidationException.class, () -> { // SelectedFieldInfo with empty path. final var catalog = ConnectionHelpers.generateBasicApiCatalog(); catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo")); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); assertThrows(JsonValidationException.class, () -> { @@ -88,7 +89,7 @@ void testConvertToProtocolColumnSelectionValidation() { catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)); // The sync mode is INCREMENTAL and SECOND_FIELD_NAME is a cursor field. catalog.getStreams().get(0).getConfig().syncMode(SyncMode.INCREMENTAL).cursorField(List.of(SECOND_FIELD_NAME)); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); assertDoesNotThrow(() -> { @@ -97,7 +98,7 @@ void testConvertToProtocolColumnSelectionValidation() { catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)); // The cursor field is not selected, but it's okay because it's FULL_REFRESH so it doesn't throw. catalog.getStreams().get(0).getConfig().syncMode(SyncMode.FULL_REFRESH).cursorField(List.of(SECOND_FIELD_NAME)); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); assertThrows(JsonValidationException.class, () -> { @@ -106,7 +107,7 @@ void testConvertToProtocolColumnSelectionValidation() { catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)); // The destination sync mode is DEDUP and SECOND_FIELD_NAME is a primary key. catalog.getStreams().get(0).getConfig().destinationSyncMode(DestinationSyncMode.APPEND_DEDUP).primaryKey(List.of(List.of(SECOND_FIELD_NAME))); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); assertDoesNotThrow(() -> { @@ -115,7 +116,7 @@ void testConvertToProtocolColumnSelectionValidation() { catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)); // The primary key is not selected but that's okay because the destination sync mode is OVERWRITE. catalog.getStreams().get(0).getConfig().destinationSyncMode(DestinationSyncMode.OVERWRITE).primaryKey(List.of(List.of(SECOND_FIELD_NAME))); - CatalogConverter.toProtocol(catalog); + CatalogConverter.toConfiguredProtocol(catalog); }); } @@ -123,7 +124,7 @@ void testConvertToProtocolColumnSelectionValidation() { void testConvertToProtocolFieldSelection() throws JsonValidationException { final var catalog = ConnectionHelpers.generateApiCatalogWithTwoFields(); catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME)); - assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), CatalogConverter.toProtocol(catalog)); + assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), CatalogConverter.toConfiguredProtocol(catalog)); } } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java index 758bad37f909..61a27553bea3 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceHandlerTest.java @@ -15,11 +15,13 @@ import com.google.common.collect.Lists; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.ConnectionReadList; +import io.airbyte.api.model.generated.DiscoverCatalogResult; import io.airbyte.api.model.generated.SourceCloneConfiguration; import io.airbyte.api.model.generated.SourceCloneRequestBody; import io.airbyte.api.model.generated.SourceCreate; import io.airbyte.api.model.generated.SourceDefinitionIdRequestBody; import io.airbyte.api.model.generated.SourceDefinitionSpecificationRead; +import io.airbyte.api.model.generated.SourceDiscoverSchemaWriteRequestBody; import io.airbyte.api.model.generated.SourceIdRequestBody; import io.airbyte.api.model.generated.SourceRead; import io.airbyte.api.model.generated.SourceReadList; @@ -36,8 +38,13 @@ import io.airbyte.config.persistence.SecretsRepositoryWriter; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; import io.airbyte.persistence.job.factory.OAuthConfigSupplier; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.server.converters.ConfigurationUpdate; +import io.airbyte.server.handlers.helpers.CatalogConverter; import io.airbyte.server.helpers.ConnectionHelpers; import io.airbyte.server.helpers.ConnectorSpecificationHelpers; import io.airbyte.server.helpers.SourceHelpers; @@ -67,6 +74,11 @@ class SourceHandlerTest { private ConnectorSpecification connectorSpecification; private OAuthConfigSupplier oAuthConfigSupplier; + private static final String SHOES = "shoes"; + private static final String SKU = "sku"; + private static final AirbyteCatalog airbyteCatalog = CatalogHelpers.createAirbyteCatalog(SHOES, + Field.of(SKU, JsonSchemaType.STRING)); + // needs to match name of file in src/test/resources/icons private static final String ICON = "test-source.svg"; @@ -369,4 +381,20 @@ void testDeleteSource() throws JsonValidationException, ConfigNotFoundException, verify(connectionsHandler).deleteConnection(connectionRead.getConnectionId()); } + @Test + void testWriteDiscoverCatalogResult() throws JsonValidationException, IOException { + UUID actorId = UUID.randomUUID(); + UUID catalogId = UUID.randomUUID(); + String connectorVersion = "0.0.1"; + String hashValue = "0123456789abcd"; + SourceDiscoverSchemaWriteRequestBody request = new SourceDiscoverSchemaWriteRequestBody().catalog( + CatalogConverter.toApi(airbyteCatalog)).sourceId(actorId).connectorVersion(connectorVersion).configurationHash(hashValue); + + when(configRepository.writeActorCatalogFetchEvent(airbyteCatalog, actorId, connectorVersion, hashValue)).thenReturn(catalogId); + DiscoverCatalogResult result = sourceHandler.writeDiscoverCatalogResult(request); + + verify(configRepository).writeActorCatalogFetchEvent(airbyteCatalog, actorId, connectorVersion, hashValue); + assert (result.getCatalogId()).equals(catalogId); + } + } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 0103d8f0f5fd..abf564dee15a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -1016,7 +1016,7 @@ void testUpdateConnectionNoStreamsToReset() throws JsonValidationException, Conf final ConnectionIdRequestBody connectionId = new ConnectionIdRequestBody().connectionId(result.getConnectionId()); verify(connectionsHandler).getDiff(expected.getSyncCatalog(), expectedWithNewSchema.getSyncCatalog(), - CatalogConverter.toProtocol(result.getSyncCatalog())); + CatalogConverter.toConfiguredProtocol(result.getSyncCatalog())); verify(connectionsHandler).getConfigurationDiff(expected.getSyncCatalog(), expectedWithNewSchema.getSyncCatalog()); verify(schedulerHandler, times(0)).resetConnection(connectionId); verify(schedulerHandler, times(0)).syncConnection(connectionId); diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 7489b2e65eef..147055182ed2 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -274,6 +274,7 @@

Internal

  • post /v1/jobs/get_normalization_status
  • post /v1/attempt/save_stats
  • post /v1/attempt/set_workflow_in_attempt
  • +
  • post /v1/sources/write_discover_catalog_result
  • Jobs

    SourceDefinition