Skip to content

Commit

Permalink
DiscoverSchema endpoints calculates diff and breaking change (#18571)
Browse files Browse the repository at this point in the history
* update discover schema endpoint to calculate diff
  • Loading branch information
alovew authored Nov 2, 2022
1 parent 35ceb67 commit d26e5bc
Show file tree
Hide file tree
Showing 10 changed files with 414 additions and 37 deletions.
9 changes: 9 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2822,6 +2822,9 @@ components:
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
connectionId:
type: string
format: uuid
disable_cache:
type: boolean
SourceUpdate:
Expand Down Expand Up @@ -2883,6 +2886,10 @@ components:
catalogId:
type: string
format: uuid
catalogDiff:
$ref: "#/components/schemas/CatalogDiff"
breakingChange:
type: boolean
SourceSearch:
type: object
properties:
Expand Down Expand Up @@ -3375,6 +3382,8 @@ components:
type: boolean
nonBreakingChangesPreference:
$ref: "#/components/schemas/NonBreakingChangesPreference"
breakingChange:
type: boolean
WebBackendConnectionUpdate:
type: object
description: Used to apply a patch-style update to a connection, which means that null properties remain unchanged
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory,
jobPersistence,
configs.getWorkerEnvironment(),
configs.getLogConfigs(),
eventRunner);
eventRunner,
connectionsHandler);

final DbMigrationHandler dbMigrationHandler = new DbMigrationHandler(configsDatabase, configsFlyway, jobsDatabase, jobsFlyway);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ public ConfigurationApi(final ConfigRepository configRepository,

final WorkspaceHelper workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);

schedulerHandler = new SchedulerHandler(
configRepository,
secretsRepositoryReader,
Expand All @@ -182,14 +188,10 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobPersistence,
workerEnvironment,
logConfigs,
eventRunner);
eventRunner,
connectionsHandler);

stateHandler = new StateHandler(statePersistence);
connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
trackingClient,
eventRunner);
sourceHandler = new SourceHandler(
configRepository,
secretsRepositoryReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ private static void applyPatchToStandardSync(final StandardSync sync, final Conn
if (patch.getGeography() != null) {
sync.setGeography(ApiPojoConverters.toPersistenceGeography(patch.getGeography()));
}

if (patch.getBreakingChange() != null) {
sync.setBreakingChange(patch.getBreakingChange());
}
}

private void validateConnectionPatch(final WorkspaceHelper workspaceHelper, final StandardSync persistedSync, final ConnectionUpdate patch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
import com.google.common.hash.Hashing;
import io.airbyte.api.model.generated.AdvancedAuth;
import io.airbyte.api.model.generated.AuthSpecification;
import io.airbyte.api.model.generated.CatalogDiff;
import io.airbyte.api.model.generated.CheckConnectionRead;
import io.airbyte.api.model.generated.CheckConnectionRead.StatusEnum;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationCoreConfig;
import io.airbyte.api.model.generated.DestinationDefinitionIdWithWorkspaceId;
import io.airbyte.api.model.generated.DestinationDefinitionSpecificationRead;
import io.airbyte.api.model.generated.DestinationIdRequestBody;
import io.airbyte.api.model.generated.DestinationSyncMode;
import io.airbyte.api.model.generated.DestinationUpdate;
import io.airbyte.api.model.generated.FieldTransform;
import io.airbyte.api.model.generated.JobConfigType;
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobInfoRead;
Expand All @@ -31,6 +34,8 @@
import io.airbyte.api.model.generated.SourceDiscoverSchemaRequestBody;
import io.airbyte.api.model.generated.SourceIdRequestBody;
import io.airbyte.api.model.generated.SourceUpdate;
import io.airbyte.api.model.generated.StreamTransform;
import io.airbyte.api.model.generated.StreamTransform.TransformTypeEnum;
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -70,6 +75,7 @@
import java.util.ArrayList;
import java.util.Optional;
import java.util.UUID;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand All @@ -80,6 +86,7 @@ public class SchedulerHandler {
private static final ImmutableSet<ErrorCode> VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET =
ImmutableSet.of(ErrorCode.WORKFLOW_DELETED, ErrorCode.WORKFLOW_RUNNING);

private final ConnectionsHandler connectionsHandler;
private final ConfigRepository configRepository;
private final SecretsRepositoryWriter secretsRepositoryWriter;
private final SynchronousSchedulerClient synchronousSchedulerClient;
Expand All @@ -96,7 +103,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final EventRunner eventRunner) {
final EventRunner eventRunner,
final ConnectionsHandler connectionsHandler) {
this(
configRepository,
secretsRepositoryWriter,
Expand All @@ -105,7 +113,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
new JsonSchemaValidator(),
jobPersistence,
eventRunner,
new JobConverter(workerEnvironment, logConfigs));
new JobConverter(workerEnvironment, logConfigs),
connectionsHandler);
}

@VisibleForTesting
Expand All @@ -116,7 +125,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final JsonSchemaValidator jsonSchemaValidator,
final JobPersistence jobPersistence,
final EventRunner eventRunner,
final JobConverter jobConverter) {
final JobConverter jobConverter,
final ConnectionsHandler connectionsHandler) {
this.configRepository = configRepository;
this.secretsRepositoryWriter = secretsRepositoryWriter;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -125,6 +135,7 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.jobPersistence = jobPersistence;
this.eventRunner = eventRunner;
this.jobConverter = jobConverter;
this.connectionsHandler = connectionsHandler;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -230,7 +241,13 @@ public SourceDiscoverSchemaRead discoverSchemaForSourceFromSourceId(final Source
if (currentCatalog.isEmpty() || bustActorCatalogCache) {
final SynchronousResponse<UUID> persistedCatalogId =
synchronousSchedulerClient.createDiscoverSchemaJob(source, imageName, connectorVersion, new Version(sourceDef.getProtocolVersion()));
return retrieveDiscoveredSchema(persistedCatalogId);
final SourceDiscoverSchemaRead discoveredSchema = retrieveDiscoveredSchema(persistedCatalogId);

if (discoverSchemaRequestBody.getConnectionId() != null) {
discoveredSchemaWithCatalogDiff(discoveredSchema, discoverSchemaRequestBody);
}

return discoveredSchema;
}
final AirbyteCatalog airbyteCatalog = Jsons.object(currentCatalog.get().getCatalog(), AirbyteCatalog.class);
final SynchronousJobRead emptyJob = new SynchronousJobRead()
Expand Down Expand Up @@ -339,6 +356,22 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
return submitCancellationToWorker(jobIdRequestBody.getId());
}

private void discoveredSchemaWithCatalogDiff(SourceDiscoverSchemaRead discoveredSchema, SourceDiscoverSchemaRequestBody discoverSchemaRequestBody)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Optional<io.airbyte.api.model.generated.AirbyteCatalog> catalogUsedToMakeConfiguredCatalog = connectionsHandler
.getConnectionAirbyteCatalog(discoverSchemaRequestBody.getConnectionId());
final io.airbyte.api.model.generated.@NotNull AirbyteCatalog currentAirbyteCatalog =
connectionsHandler.getConnection(discoverSchemaRequestBody.getConnectionId()).getSyncCatalog();
CatalogDiff diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(currentAirbyteCatalog), discoveredSchema.getCatalog(),
CatalogConverter.toProtocol(currentAirbyteCatalog));
boolean containsBreakingChange = containsBreakingChange(diff);
ConnectionUpdate updateObject =
new ConnectionUpdate().breakingChange(containsBreakingChange).connectionId(discoverSchemaRequestBody.getConnectionId());
connectionsHandler.updateConnection(updateObject);
discoveredSchema.catalogDiff(diff).breakingChange(containsBreakingChange);

}

private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<StandardCheckConnectionOutput> response) {
final CheckConnectionRead checkConnectionRead = new CheckConnectionRead()
.jobInfo(jobConverter.getSynchronousJobRead(response));
Expand Down Expand Up @@ -409,4 +442,19 @@ private JobInfoRead readJobFromResult(final ManualOperationResult manualOperatio
return jobConverter.getJobInfoRead(job);
}

private boolean containsBreakingChange(final CatalogDiff diff) {
for (StreamTransform streamTransform : diff.getTransforms()) {
if (streamTransform.getTransformType() != TransformTypeEnum.UPDATE_STREAM) {
continue;
}

boolean anyBreakingFieldTransforms = streamTransform.getUpdateStream().stream().anyMatch(FieldTransform::getBreaking);
if (anyBreakingFieldTransforms) {
return true;
}
}

return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
*/
final Optional<SourceDiscoverSchemaRead> refreshedCatalog;
if (MoreBooleans.isTruthy(webBackendConnectionRequestBody.getWithRefreshedCatalog())) {
refreshedCatalog = getRefreshedSchema(connection.getSourceId());
refreshedCatalog = getRefreshedSchema(connection.getSourceId(), connection.getConnectionId());
} else {
refreshedCatalog = Optional.empty();
}
Expand All @@ -351,9 +351,8 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
* but was present at time of configuration will appear in the diff as an added stream which is
* confusing. We need to figure out why source_catalog_id is not always populated in the db.
*/
diff = connectionsHandler.getDiff(catalogUsedToMakeConfiguredCatalog.orElse(configuredCatalog), refreshedCatalog.get().getCatalog(),
CatalogConverter.toProtocol(configuredCatalog));

diff = refreshedCatalog.get().getCatalogDiff();
connection.setBreakingChange(refreshedCatalog.get().getBreakingChange());
} else if (catalogUsedToMakeConfiguredCatalog.isPresent()) {
// reconstructs a full picture of the full schema at the time the catalog was configured.
syncCatalog = updateSchemaWithDiscovery(configuredCatalog, catalogUsedToMakeConfiguredCatalog.get());
Expand All @@ -370,12 +369,14 @@ public WebBackendConnectionRead webBackendGetConnection(final WebBackendConnecti
return buildWebBackendConnectionRead(connection, currentSourceCatalogId).catalogDiff(diff);
}

private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId)
private Optional<SourceDiscoverSchemaRead> getRefreshedSchema(final UUID sourceId, final UUID connectionId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final SourceDiscoverSchemaRequestBody discoverSchemaReadReq = new SourceDiscoverSchemaRequestBody()
.sourceId(sourceId)
.disableCache(true);
return Optional.ofNullable(schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq));
.disableCache(true)
.connectionId(connectionId);
SourceDiscoverSchemaRead schemaRead = schedulerHandler.discoverSchemaForSourceFromSourceId(discoverSchemaReadReq);
return Optional.ofNullable(schemaRead);
}

/**
Expand Down Expand Up @@ -480,10 +481,7 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
connectionRead = connectionsHandler.updateConnection(connectionPatch);

// detect if any streams need to be reset based on the patch and initial catalog, if so, reset them
// and fetch
// an up-to-date connectionRead
connectionRead = resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead);

resetStreamsIfNeeded(webBackendConnectionPatch, oldConfiguredCatalog, connectionRead);
/*
* This catalog represents the full catalog that was used to create the configured catalog. It will
* have all streams that were present at the time. It will have no configuration set.
Expand All @@ -504,9 +502,9 @@ public WebBackendConnectionRead webBackendUpdateConnection(final WebBackendConne
* Given a fully updated connection, check for a diff between the old catalog and the updated
* catalog to see if any streams need to be reset.
*/
private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch,
final ConfiguredAirbyteCatalog oldConfiguredCatalog,
final ConnectionRead updatedConnectionRead)
private void resetStreamsIfNeeded(final WebBackendConnectionUpdate webBackendConnectionPatch,
final ConfiguredAirbyteCatalog oldConfiguredCatalog,
final ConnectionRead updatedConnectionRead)
throws IOException, JsonValidationException, ConfigNotFoundException {

final UUID connectionId = webBackendConnectionPatch.getConnectionId();
Expand Down Expand Up @@ -535,13 +533,8 @@ private ConnectionRead resetStreamsIfNeeded(final WebBackendConnectionUpdate web
eventRunner.resetConnection(
connectionId,
streamsToReset, true);

// return updated connectionRead after reset
return connectionsHandler.getConnection(connectionId);
}
}
// if no reset was necessary, return the connectionRead without changes
return updatedConnectionRead;
}

private List<UUID> createOperations(final WebBackendConnectionCreate webBackendConnectionCreate)
Expand Down
Loading

0 comments on commit d26e5bc

Please sign in to comment.