Skip to content

Commit

Permalink
Add on the fly catalog downgrades to v0 when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Feb 1, 2023
1 parent 1745ddd commit 564a34a
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,102 @@ private static boolean hasV0DataType(final JsonNode schema) {
return false;
}

/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
*
* @param configuredAirbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (containsV1DataTypes(configuredAirbyteCatalog)) {
downgradeSchema(configuredAirbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v1 to v0 if v1 data types are detected
*
* @param airbyteCatalog to migrate
*/
public static void downgradeSchemaIfNeeded(final AirbyteCatalog airbyteCatalog) {
if (containsV1DataTypes(airbyteCatalog)) {
downgradeSchema(airbyteCatalog);
}
}

/**
* Performs an in-place migration of the schema from v1 to v0
*
* @param configuredAirbyteCatalog to migrate
*/
private static void downgradeSchema(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
for (final var stream : configuredAirbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getStream().getJsonSchema());
}
}

/**
* Performs an in-place migration of the schema from v1 to v0
*
* @param airbyteCatalog to migrate
*/
private static void downgradeSchema(final AirbyteCatalog airbyteCatalog) {
for (final var stream : airbyteCatalog.getStreams()) {
SchemaMigrationV1.downgradeSchema(stream.getJsonSchema());
}
}

/**
* Returns true if catalog contains v1 data types
*/
private static boolean containsV1DataTypes(final ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
if (configuredAirbyteCatalog == null) {
return false;
}

return configuredAirbyteCatalog
.getStreams()
.stream().findFirst()
.map(ConfiguredAirbyteStream::getStream)
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}

/**
* Returns true if catalog contains v1 data types
*/
private static boolean containsV1DataTypes(final AirbyteCatalog airbyteCatalog) {
if (airbyteCatalog == null) {
return false;
}

return airbyteCatalog
.getStreams()
.stream().findFirst()
.map(CatalogMigrationV1Helper::streamContainsV1DataTypes)
.orElse(false);
}

private static boolean streamContainsV1DataTypes(final AirbyteStream airbyteStream) {
if (airbyteStream == null || airbyteStream.getJsonSchema() == null) {
return false;
}
return hasV1DataType(airbyteStream.getJsonSchema());
}

/**
* Performs of search of a v0 data type node, returns true at the first node found.
*/
private static boolean hasV1DataType(final JsonNode schema) {
if (SchemaMigrationV1.isPrimitiveReferenceTypeDeclaration(schema)) {
return true;
}

for (final JsonNode subSchema : SchemaMigrations.findSubschemas(schema)) {
if (hasV1DataType(subSchema)) {
return true;
}
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static boolean isPrimitiveTypeDeclaration(final JsonNode schema) {
* Detects any schema that looks like a reference type declaration, e.g.: { "$ref":
* "WellKnownTypes.json...." } or { "oneOf": [{"$ref": "..."}, {"type": "object"}] }
*/
private static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
static boolean isPrimitiveReferenceTypeDeclaration(final JsonNode schema) {
if (!schema.isObject()) {
// Non-object schemas (i.e. true/false) never need to be modified
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ public static StandardSync buildStandardSync(final Record record, final List<UUI
private static ConfiguredAirbyteCatalog parseConfiguredAirbyteCatalog(final String configuredAirbyteCatalogString) {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = Jsons.deserialize(configuredAirbyteCatalogString, ConfiguredAirbyteCatalog.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(configuredAirbyteCatalog);
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(configuredAirbyteCatalog);
return configuredAirbyteCatalog;
}

Expand Down Expand Up @@ -249,7 +251,9 @@ public static ActorCatalogWithUpdatedAt buildActorCatalogWithUpdatedAt(final Rec
public static AirbyteCatalog parseAirbyteCatalog(final String airbyteCatalogString) {
final AirbyteCatalog airbyteCatalog = Jsons.deserialize(airbyteCatalogString, AirbyteCatalog.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(airbyteCatalog);
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(airbyteCatalog);
return airbyteCatalog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,13 @@ private static JobConfig parseJobConfigFromString(final String jobConfigString)
final JobConfig jobConfig = Jsons.deserialize(jobConfigString, JobConfig.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
if (jobConfig.getConfigType() == ConfigType.SYNC && jobConfig.getSync() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getSync().getConfiguredAirbyteCatalog());
} else if (jobConfig.getConfigType() == ConfigType.RESET_CONNECTION && jobConfig.getResetConnection() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobConfig.getResetConnection().getConfiguredAirbyteCatalog());
}
return jobConfig;
}
Expand All @@ -960,9 +964,13 @@ private static JobOutput parseJobOutputFromString(final String jobOutputString)
final JobOutput jobOutput = Jsons.deserialize(jobOutputString, JobOutput.class);
// On-the-fly migration of persisted data types related objects (protocol v0->v1)
if (jobOutput.getOutputType() == OutputType.DISCOVER_CATALOG && jobOutput.getDiscoverCatalog() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getDiscoverCatalog().getCatalog());
} else if (jobOutput.getOutputType() == OutputType.SYNC && jobOutput.getSync() != null) {
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
// TODO feature flag this for data types rollout
// CatalogMigrationV1Helper.upgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(jobOutput.getSync().getOutputCatalog());
}
return jobOutput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,19 @@ public NormalizationSummary normalize(final JobRunConfig jobRunConfig,

// Check the version of normalization
// We require at least version 0.3.0 to support data types v1. Using an older version would lead to
// all columns being typed as JSONB. We should fail before coercing the types into an unexpected
// form.
// all columns being typed as JSONB. If normalization is using an older version, fallback to using
// v0 data types.
if (!normalizationSupportsV1DataTypes(destinationLauncherConfig)) {
throw new IllegalStateException("Normalization is too old, a version >=\"0.3.0\" is required but got \""
+ destinationLauncherConfig.getNormalizationDockerImage() + "\" instead");
}
CatalogMigrationV1Helper.downgradeSchemaIfNeeded(fullInput.getCatalog());
} else {

// This should only be useful for syncs that started before the release that contained v1 migration.
// However, we lack the effective way to detect those syncs so this code should remain until we
// phase v0 out.
// Performance impact should be low considering the nature of the check compared to the time to run
// normalization.
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog());
// This should only be useful for syncs that started before the release that contained v1 migration.
// However, we lack the effective way to detect those syncs so this code should remain until we
// phase v0 out.
// Performance impact should be low considering the nature of the check compared to the time to run
// normalization.
CatalogMigrationV1Helper.upgradeSchemaIfNeeded(fullInput.getCatalog());
}

final Supplier<NormalizationInput> inputSupplier = () -> {
airbyteConfigValidator.ensureAsRuntime(ConfigSchema.NORMALIZATION_INPUT, Jsons.jsonNode(fullInput));
Expand Down Expand Up @@ -173,8 +173,8 @@ static boolean normalizationSupportsV1DataTypes(final IntegrationLauncherConfig
return normalizationVersion.greaterThanOrEqualTo(MINIMAL_VERSION_FOR_DATATYPES_V1);
} catch (final IllegalArgumentException e) {
// IllegalArgument here means that the version isn't in a semver format.
// The current behavior is to assume it supports v1 data types for dev purposes.
return true;
// The current behavior is to assume it supports v0 data types for dev purposes.
return false;
}
}

Expand Down

0 comments on commit 564a34a

Please sign in to comment.