Skip to content

Commit

Permalink
Add write discover fetch event API (#21506)
Browse files Browse the repository at this point in the history
* api changes for writing discover catalog

* api changes

* format

* change return type of the API to return catalogId

* PR comments fix

* format
  • Loading branch information
xiaohansong authored Jan 23, 2023
1 parent b7d5be9 commit 0837781
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 22 deletions.
44 changes: 44 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,27 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"

/v1/sources/write_discover_catalog_result:
post:
tags:
- source
- internal
summary: Should only called from worker, to write result from discover activity back to DB.
operationId: writeDiscoverCatalogResult
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceDiscoverSchemaWriteRequestBody"
required: true
responses:
"200":
description: Successful Operation
content:
application/json:
schema:
$ref: "#/components/schemas/DiscoverCatalogResult"
/v1/destination_definitions/update:
post:
tags:
Expand Down Expand Up @@ -2812,6 +2833,21 @@ components:
type: array
items:
$ref: "#/components/schemas/SourceRead"
SourceDiscoverSchemaWriteRequestBody:
description: to write this requested object to database.
type: object
required:
- catalog
properties:
catalog:
$ref: "#/components/schemas/AirbyteCatalog"
sourceId:
$ref: "#/components/schemas/SourceId"
connectorVersion:
type: string
configurationHash:
type: string

SourceDiscoverSchemaRead:
description: Returns the results of a discover catalog job. If the job was not successful, the catalog field will not be present. jobInfo will aways be present and its status be used to determine if the job was successful or not.
type: object
Expand Down Expand Up @@ -4993,6 +5029,14 @@ components:
properties:
succeeded:
type: boolean
DiscoverCatalogResult:
type: object
required:
- catalogId
properties:
catalogId:
type: string
format: uuid
AttemptNormalizationStatusReadList:
type: object
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import io.airbyte.api.generated.SourceApi;
import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.DiscoverCatalogResult;
import io.airbyte.api.model.generated.SourceCloneRequestBody;
import io.airbyte.api.model.generated.SourceCreate;
import io.airbyte.api.model.generated.SourceDiscoverSchemaRead;
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.generated.SourceDiscoverSchemaWriteRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.SourceReadList;
Expand Down Expand Up @@ -123,4 +125,10 @@ public SourceRead updateSource(final SourceUpdate sourceUpdate) {
return ApiHelper.execute(() -> sourceHandler.updateSource(sourceUpdate));
}

@Post("/write_discover_catalog_result")
@Override
public DiscoverCatalogResult writeDiscoverCatalogResult(final SourceDiscoverSchemaWriteRequestBody request) {
return ApiHelper.execute(() -> sourceHandler.writeDiscoverCatalogResult(request));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate)

// TODO Undesirable behavior: sending a null configured catalog should not be valid?
if (connectionCreate.getSyncCatalog() != null) {
standardSync.withCatalog(CatalogConverter.toProtocol(connectionCreate.getSyncCatalog()));
standardSync.withCatalog(CatalogConverter.toConfiguredProtocol(connectionCreate.getSyncCatalog()));
standardSync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(connectionCreate.getSyncCatalog()));
} else {
standardSync.withCatalog(new ConfiguredAirbyteCatalog().withStreams(Collections.emptyList()));
Expand Down Expand Up @@ -318,7 +318,7 @@ private static void applyPatchToStandardSync(final StandardSync sync, final Conn
// in the patch. Otherwise, leave the field unchanged.

if (patch.getSyncCatalog() != null) {
sync.setCatalog(CatalogConverter.toProtocol(patch.getSyncCatalog()));
sync.setCatalog(CatalogConverter.toConfiguredProtocol(patch.getSyncCatalog()));
sync.withFieldSelectionData(CatalogConverter.getFieldSelectionData(patch.getSyncCatalog()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ private void generateCatalogDiffsAndDisableConnectionsIfNeeded(final SourceDisco
connectionRead.getSyncCatalog();
final CatalogDiff diff =
connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
CatalogConverter.toConfiguredProtocol(currentAirbyteCatalog));
final boolean containsBreakingChange = containsBreakingChange(diff);
final ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(connectionRead.getConnectionId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import com.google.common.collect.Lists;
import io.airbyte.api.model.generated.ActorCatalogWithUpdatedAt;
import io.airbyte.api.model.generated.ConnectionRead;
import io.airbyte.api.model.generated.DiscoverCatalogResult;
import io.airbyte.api.model.generated.SourceCloneConfiguration;
import io.airbyte.api.model.generated.SourceCloneRequestBody;
import io.airbyte.api.model.generated.SourceCreate;
import io.airbyte.api.model.generated.SourceDefinitionIdRequestBody;
import io.airbyte.api.model.generated.SourceDiscoverSchemaWriteRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceRead;
import io.airbyte.api.model.generated.SourceReadList;
Expand All @@ -27,8 +29,10 @@
import io.airbyte.config.persistence.SecretsRepositoryWriter;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.persistence.job.factory.OAuthConfigSupplier;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -261,6 +265,17 @@ public void deleteSource(final SourceRead source)
spec);
}

public DiscoverCatalogResult writeDiscoverCatalogResult(final SourceDiscoverSchemaWriteRequestBody request)
throws JsonValidationException, IOException {
final AirbyteCatalog persistenceCatalog = CatalogConverter.toProtocol(request.getCatalog());
UUID catalogId = configRepository.writeActorCatalogFetchEvent(
persistenceCatalog,
request.getSourceId(),
request.getConnectorVersion(),
request.getConfigurationHash());
return new DiscoverCatalogResult().catalogId(catalogId);
}

private SourceRead buildSourceRead(final UUID sourceId)
throws ConfigNotFoundException, IOException, JsonValidationException {
// read configuration from db
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
Jsons.object(mostRecentActorCatalog.get().getCatalog(), io.airbyte.protocol.models.AirbyteCatalog.class);
final CatalogDiff catalogDiff =
connectionsHandler.getDiff(newAirbyteCatalog, CatalogConverter.toApi(mostRecentAirbyteCatalog),
CatalogConverter.toProtocol(newAirbyteCatalog));
CatalogConverter.toConfiguredProtocol(newAirbyteCatalog));
breakingChange = containsBreakingChange(catalogDiff);
}
}
Expand Down Expand Up @@ -602,7 +602,7 @@ private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendCon
CatalogConverter.getFieldSelectionData(oldConnectionRead.getSyncCatalog()));
final AirbyteCatalog upToDateAirbyteCatalog = updatedConnectionRead.getSyncCatalog();
final CatalogDiff catalogDiff =
connectionsHandler.getDiff(apiExistingCatalog, upToDateAirbyteCatalog, CatalogConverter.toProtocol(upToDateAirbyteCatalog));
connectionsHandler.getDiff(apiExistingCatalog, upToDateAirbyteCatalog, CatalogConverter.toConfiguredProtocol(upToDateAirbyteCatalog));
final List<StreamDescriptor> apiStreamsToReset = getStreamsToReset(catalogDiff);
final Set<StreamDescriptor> changedConfigStreamDescriptors =
connectionsHandler.getConfigurationDiff(apiExistingCatalog, upToDateAirbyteCatalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private static io.airbyte.api.model.generated.AirbyteStream toApi(final io.airby
}

@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
private static io.airbyte.protocol.models.AirbyteStream toProtocol(final AirbyteStream stream, AirbyteStreamConfiguration config)
private static io.airbyte.protocol.models.AirbyteStream toConfiguredProtocol(final AirbyteStream stream, AirbyteStreamConfiguration config)
throws JsonValidationException {
if (config.getFieldSelectionEnabled() != null && config.getFieldSelectionEnabled()) {
// Validate the selected field paths.
Expand Down Expand Up @@ -173,7 +173,8 @@ private static Boolean getStreamHasFieldSelectionEnabled(FieldSelectionData fiel
/**
* Converts the API catalog model into a protocol catalog. Note: returns all streams, regardless of
* selected status. See
* {@link CatalogConverter#toProtocol(AirbyteStream, AirbyteStreamConfiguration)} for context.
* {@link CatalogConverter#toConfiguredProtocol(AirbyteStream, AirbyteStreamConfiguration)} for
* context.
*
* @param catalog api catalog
* @return protocol catalog
Expand All @@ -183,7 +184,35 @@ public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocolKeep
throws JsonValidationException {
final AirbyteCatalog clone = Jsons.clone(catalog);
clone.getStreams().forEach(stream -> stream.getConfig().setSelected(true));
return toProtocol(clone);
return toConfiguredProtocol(clone);
}

/**
* To convert AirbyteCatalog from APIs to model. This is to differentiate between
* toConfiguredProtocol as the other one converts to ConfiguredAirbyteCatalog object instead.
*/
public static io.airbyte.protocol.models.AirbyteCatalog toProtocol(
final io.airbyte.api.model.generated.AirbyteCatalog catalog)
throws JsonValidationException {
final ArrayList<JsonValidationException> errors = new ArrayList<>();

io.airbyte.protocol.models.AirbyteCatalog protoCatalog =
new io.airbyte.protocol.models.AirbyteCatalog();
var airbyteStream = catalog.getStreams().stream().map(stream -> {
try {
return toConfiguredProtocol(stream.getStream(), stream.getConfig());
} catch (JsonValidationException e) {
LOGGER.error("Error parsing catalog: {}", e);
errors.add(e);
return null;
}
}).collect(Collectors.toList());

if (!errors.isEmpty()) {
throw errors.get(0);
}
protoCatalog.withStreams(airbyteStream);
return protoCatalog;
}

/**
Expand All @@ -196,7 +225,7 @@ public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocolKeep
* @param catalog api catalog
* @return protocol catalog
*/
public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocol(final io.airbyte.api.model.generated.AirbyteCatalog catalog)
public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toConfiguredProtocol(final io.airbyte.api.model.generated.AirbyteCatalog catalog)
throws JsonValidationException {
final ArrayList<JsonValidationException> errors = new ArrayList<>();
final List<io.airbyte.protocol.models.ConfiguredAirbyteStream> streams = catalog.getStreams()
Expand All @@ -205,7 +234,7 @@ public static io.airbyte.protocol.models.ConfiguredAirbyteCatalog toProtocol(fin
.map(s -> {
try {
return new io.airbyte.protocol.models.ConfiguredAirbyteStream()
.withStream(toProtocol(s.getStream(), s.getConfig()))
.withStream(toConfiguredProtocol(s.getStream(), s.getConfig()))
.withSyncMode(Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.protocol.models.SyncMode.class))
.withCursorField(s.getConfig().getCursorField())
.withDestinationSyncMode(Enums.convertTo(s.getConfig().getDestinationSyncMode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class CatalogConverterTest {

@Test
void testConvertToProtocol() throws JsonValidationException {
assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), CatalogConverter.toProtocol(ConnectionHelpers.generateBasicApiCatalog()));
assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(),
CatalogConverter.toConfiguredProtocol(ConnectionHelpers.generateBasicApiCatalog()));
}

@Test
Expand All @@ -49,37 +50,37 @@ void testConvertToProtocolColumnSelectionValidation() {
// fieldSelectionEnabled=true but selectedFields=null.
final var catalog = ConnectionHelpers.generateBasicApiCatalog();
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).selectedFields(null);
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});

assertThrows(JsonValidationException.class, () -> {
// JSON schema has no `properties` node.
final var catalog = ConnectionHelpers.generateBasicApiCatalog();
((ObjectNode) catalog.getStreams().get(0).getStream().getJsonSchema()).remove("properties");
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo"));
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});

assertThrows(JsonValidationException.class, () -> {
// SelectedFieldInfo with empty path.
final var catalog = ConnectionHelpers.generateBasicApiCatalog();
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo());
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});

assertThrows(UnsupportedOperationException.class, () -> {
// SelectedFieldInfo with nested field path.
final var catalog = ConnectionHelpers.generateBasicApiCatalog();
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true)
.addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo").addFieldPathItem("bar"));
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});

assertThrows(JsonValidationException.class, () -> {
// SelectedFieldInfo with empty path.
final var catalog = ConnectionHelpers.generateBasicApiCatalog();
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem("foo"));
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});

assertThrows(JsonValidationException.class, () -> {
Expand All @@ -88,7 +89,7 @@ void testConvertToProtocolColumnSelectionValidation() {
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME));
// The sync mode is INCREMENTAL and SECOND_FIELD_NAME is a cursor field.
catalog.getStreams().get(0).getConfig().syncMode(SyncMode.INCREMENTAL).cursorField(List.of(SECOND_FIELD_NAME));
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});

assertDoesNotThrow(() -> {
Expand All @@ -97,7 +98,7 @@ void testConvertToProtocolColumnSelectionValidation() {
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME));
// The cursor field is not selected, but it's okay because it's FULL_REFRESH so it doesn't throw.
catalog.getStreams().get(0).getConfig().syncMode(SyncMode.FULL_REFRESH).cursorField(List.of(SECOND_FIELD_NAME));
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});

assertThrows(JsonValidationException.class, () -> {
Expand All @@ -106,7 +107,7 @@ void testConvertToProtocolColumnSelectionValidation() {
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME));
// The destination sync mode is DEDUP and SECOND_FIELD_NAME is a primary key.
catalog.getStreams().get(0).getConfig().destinationSyncMode(DestinationSyncMode.APPEND_DEDUP).primaryKey(List.of(List.of(SECOND_FIELD_NAME)));
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});

assertDoesNotThrow(() -> {
Expand All @@ -115,15 +116,15 @@ void testConvertToProtocolColumnSelectionValidation() {
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME));
// The primary key is not selected but that's okay because the destination sync mode is OVERWRITE.
catalog.getStreams().get(0).getConfig().destinationSyncMode(DestinationSyncMode.OVERWRITE).primaryKey(List.of(List.of(SECOND_FIELD_NAME)));
CatalogConverter.toProtocol(catalog);
CatalogConverter.toConfiguredProtocol(catalog);
});
}

@Test
void testConvertToProtocolFieldSelection() throws JsonValidationException {
final var catalog = ConnectionHelpers.generateApiCatalogWithTwoFields();
catalog.getStreams().get(0).getConfig().fieldSelectionEnabled(true).addSelectedFieldsItem(new SelectedFieldInfo().addFieldPathItem(FIELD_NAME));
assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), CatalogConverter.toProtocol(catalog));
assertEquals(ConnectionHelpers.generateBasicConfiguredAirbyteCatalog(), CatalogConverter.toConfiguredProtocol(catalog));
}

}
Loading

0 comments on commit 0837781

Please sign in to comment.