diff --git a/airbyte-commons-protocol/build.gradle b/airbyte-commons-protocol/build.gradle index 502c714ffd8e..cd5f7132b5fb 100644 --- a/airbyte-commons-protocol/build.gradle +++ b/airbyte-commons-protocol/build.gradle @@ -6,6 +6,7 @@ dependencies { testImplementation libs.bundles.micronaut.test implementation project(':airbyte-protocol:protocol-models') + implementation project(':airbyte-json-validation') } Task publishArtifactsTask = getPublishArtifactsTask("$rootProject.ext.version", project) diff --git a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1.java b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1.java index a6175765a8e9..2caad78ac7a7 100644 --- a/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1.java +++ b/airbyte-commons-protocol/src/main/java/io/airbyte/commons/protocol/migrations/AirbyteMessageMigrationV1.java @@ -4,6 +4,8 @@ package io.airbyte.commons.protocol.migrations; +import static io.airbyte.protocol.models.JsonSchemaReferenceTypes.REF_KEY; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -11,23 +13,59 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.commons.version.Version; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.JsonSchemaReferenceTypes; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.validation.json.JsonSchemaValidator; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.StreamSupport; public class AirbyteMessageMigrationV1 implements AirbyteMessageMigration { + private final ConfiguredAirbyteCatalog catalog; + private final JsonSchemaValidator validator; + + public AirbyteMessageMigrationV1(ConfiguredAirbyteCatalog catalog, JsonSchemaValidator validator) { + this.catalog = catalog; + this.validator = validator; + } + @Override public io.airbyte.protocol.models.v0.AirbyteMessage downgrade(AirbyteMessage oldMessage) { - // TODO implement downgrade - return null; + io.airbyte.protocol.models.v0.AirbyteMessage newMessage = Jsons.object( + Jsons.jsonNode(oldMessage), + io.airbyte.protocol.models.v0.AirbyteMessage.class); + if (oldMessage.getType() == Type.CATALOG) { + for (io.airbyte.protocol.models.v0.AirbyteStream stream : newMessage.getCatalog().getStreams()) { + JsonNode schema = stream.getJsonSchema(); + downgradeSchema(schema); + } + } else if (oldMessage.getType() == Type.RECORD) { + io.airbyte.protocol.models.v0.AirbyteRecordMessage record = newMessage.getRecord(); + Optional maybeStream = catalog.getStreams().stream() + .filter(stream -> Objects.equals(stream.getStream().getName(), record.getStream()) + && Objects.equals(stream.getStream().getNamespace(), record.getNamespace())) + .findFirst(); + // If this record doesn't belong to any configured stream, then there's no point downgrading it + // So only do the downgrade if we can find its stream + if (maybeStream.isPresent()) { + JsonNode schema = maybeStream.get().getStream().getJsonSchema(); + JsonNode oldData = record.getData(); + DowngradedNode downgradedNode = downgradeNode(oldData, schema); + record.setData(downgradedNode.node); + } + } + return newMessage; } @Override @@ -60,6 +98,16 @@ private void upgradeSchema(JsonNode schema) { schema); } + /** + * Perform the {$ref: foo} -> {type: foo} downgrade. Modifies the schema in-place. + */ + private void downgradeSchema(JsonNode schema) { + mutateSchemas( + this::isPrimitiveReferenceTypeDeclaration, + this::downgradeTypeDeclaration, + schema); + } + /** * Detects any schema that looks like a primitive type declaration, e.g.: { "type": "string" } or { * "type": ["string", "object"] } @@ -77,11 +125,34 @@ private boolean isPrimitiveTypeDeclaration(JsonNode schema) { } } + /** + * Detects any schema that looks like a reference type declaration, e.g.: { "$ref": + * "WellKnownTypes.json...." } or { "oneOf": [{"$ref": "..."}, {"type": "object"}] } + */ + private boolean isPrimitiveReferenceTypeDeclaration(JsonNode schema) { + if (!schema.isObject()) { + // Non-object schemas (i.e. true/false) never need to be modified + return false; + } else if (schema.hasNonNull("$ref") && schema.get("$ref").asText().startsWith("WellKnownTypes.json")) { + // If this schema has a $ref, then we need to convert it back to type/airbyte_type/format + return true; + } else if (schema.hasNonNull("oneOf")) { + // If this is a oneOf with at least one primitive $ref option, then we should consider converting it + // back + List subschemas = getSubschemas(schema, "oneOf"); + return subschemas.stream().anyMatch( + subschema -> subschema.hasNonNull("$ref") + && subschema.get("$ref").asText().startsWith("WellKnownTypes.json")); + } else { + return false; + } + } + /** * Modifies the schema in-place to upgrade from the old-style type declaration to the new-style $ref * declaration. Assumes that the schema is an ObjectNode containing a primitive declaration, i.e. * either something like: {"type": "string"} or: {"type": ["string", "object"]} - * + *

* In the latter case, the schema may contain subschemas. This method mutually recurses with * {@link #mutateSchemas(Function, Consumer, JsonNode)} to upgrade those subschemas. * @@ -154,6 +225,98 @@ private void upgradeTypeDeclaration(JsonNode schema) { } } + /** + * Modifies the schema in-place to downgrade from the new-style $ref declaration to the old-style + * type declaration. Assumes that the schema is an ObjectNode containing a primitive declaration, + * i.e. either something like: {"$ref": "WellKnownTypes..."} or: {"oneOf": [{"$ref": + * "WellKnownTypes..."}, ...]} + *

+ * In the latter case, the schema may contain subschemas. This method mutually recurses with + * {@link #mutateSchemas(Function, Consumer, JsonNode)} to downgrade those subschemas. + * + * @param schema An ObjectNode representing a primitive type declaration + */ + private void downgradeTypeDeclaration(JsonNode schema) { + if (schema.hasNonNull("$ref")) { + // If this is a direct type declaration, then we can just replace it with the old-style declaration + String referenceType = schema.get("$ref").asText(); + ((ObjectNode) schema).removeAll(); + ((ObjectNode) schema).setAll(JsonSchemaReferenceTypes.REFERENCE_TYPE_TO_OLD_TYPE.get(referenceType)); + } else if (schema.hasNonNull("oneOf")) { + // If this is a oneOf, then we need to check whether we can recombine it into a single type + // declaration. + // This means we must do three things: + // 1. Downgrade each subschema + // 2. Build a new `type` array, containing the `type` of each subschema + // 3. Combine all the fields in each subschema (properties, items, etc) + // If any two subschemas have the same `type`, or the same field, then we can't combine them, but we + // should still downgrade them. + // See V0ToV1MigrationTest.CatalogDowngradeTest#testDowngradeMultiTypeFields for some examples. + + // We'll build up a node containing the combined subschemas. + ObjectNode replacement = (ObjectNode) Jsons.emptyObject(); + // As part of this, we need to build up a list of `type` entries. For ease of access, we'll keep it + // in a List. + List types = new ArrayList<>(); + + boolean canRecombineSubschemas = true; + for (JsonNode subschemaNode : schema.get("oneOf")) { + // No matter what - we always need to downgrade the subschema node. + downgradeSchema(subschemaNode); + + if (subschemaNode instanceof ObjectNode subschema) { + // If this subschema is an object, then we can attempt to combine it with the other subschemas. + + // First, update our list of types. + JsonNode subschemaType = subschema.get("type"); + if (subschemaType != null) { + if (types.contains(subschemaType.asText())) { + // If another subschema has the same type, then we can't combine them. + canRecombineSubschemas = false; + } else { + types.add(subschemaType.asText()); + } + } + + // Then, update the combined schema with this subschema's fields. + if (canRecombineSubschemas) { + Iterator> fields = subschema.fields(); + while (fields.hasNext()) { + Entry field = fields.next(); + if ("type".equals(field.getKey())) { + // We're handling the `type` field outside this loop, so ignore it here. + continue; + } + if (replacement.has(field.getKey())) { + // A previous subschema is already using this field, so we should stop trying to combine them. + canRecombineSubschemas = false; + break; + } else { + replacement.set(field.getKey(), field.getValue()); + } + } + } + } else { + // If this subschema is a boolean, then the oneOf is doing something funky, and we shouldn't attempt + // to + // combine it into a single type entry + canRecombineSubschemas = false; + } + } + + if (canRecombineSubschemas) { + // Update our replacement node with the full list of types + ArrayNode typeNode = Jsons.arrayNode(); + types.forEach(typeNode::add); + replacement.set("type", typeNode); + + // And commit our changes to the actual schema node + ((ObjectNode) schema).removeAll(); + ((ObjectNode) schema).setAll(replacement); + } + } + } + private static void copyKey(ObjectNode source, ObjectNode target, String key) { if (source.hasNonNull(key)) { target.set(key, source.get(key)); @@ -200,7 +363,7 @@ yield switch (schemaNode.get("format").asText()) { * Generic utility method that recurses through all type declarations in the schema. For each type * declaration that are accepted by matcher, mutate them using transformer. For all other type * declarations, recurse into their subschemas (if any). - * + *

* Note that this modifies the schema in-place. Callers who need a copy of the old schema should * save schema.deepCopy() before calling this method. * @@ -268,12 +431,12 @@ private static void mutateSchemas(Function matcher, Consumer< /** * If schema contains key, then grab the subschema(s) at schema[key] and add them to the subschemas * list. - * + *

* For example: schema = {"items": [{"type": "string}]} key = "items" -> add {"type": "string"} to * subschemas - * + *

* schema = {"items": {"type": "string"}} key = "items" -> add {"type": "string"} to subschemas - * + *

* schema = {"additionalProperties": true} key = "additionalProperties" -> add nothing to subschemas * (technically `true` is a valid JsonSchema, but we don't want to modify it) */ @@ -290,6 +453,12 @@ private static void findSubschemas(List subschemas, JsonNode schema, S } } + private static List getSubschemas(JsonNode schema, String key) { + List subschemas = new ArrayList<>(); + findSubschemas(subschemas, schema, key); + return subschemas; + } + /** * Returns a copy of oldData, with numeric values converted to strings. String and boolean values * are returned as-is for convenience, i.e. this is not a true deep copy. @@ -326,6 +495,253 @@ private static JsonNode upgradeRecord(JsonNode oldData) { } } + /** + * Quick and dirty tuple. Used internally by {@link #downgradeNode(JsonNode, JsonNode)}; callers + * probably only actually need the node. + * + * matchedSchema is useful for downgrading using a oneOf schema, where we need to recognize the + * correct subschema. + * + * @param node Our attempt at downgrading the node, under the given schema + * @param matchedSchema Whether the original node actually matched the schema + */ + private record DowngradedNode(JsonNode node, boolean matchedSchema) {} + + /** + * We need the schema to recognize which fields are integers, since it would be wrong to just assume + * any numerical string should be parsed out. + * + * Works on a best-effort basis. If the schema doesn't match the data, we'll do our best to + * downgrade anything that we can definitively say is a number. Should _not_ throw an exception if + * bad things happen (e.g. we try to parse a non-numerical string as a number). + */ + private DowngradedNode downgradeNode(JsonNode data, JsonNode schema) { + // If this is a oneOf node that looks like an upgraded v0 message, then we need to handle each oneOf + // case. + if (!schema.hasNonNull(REF_KEY) && !schema.hasNonNull("type") && schema.hasNonNull("oneOf")) { + return downgradeOneofNode(data, schema); + } + // Otherwise, we need to do something specific to whatever the data is. + if (data.isTextual()) { + return downgradeTextualNode(data, schema); + } else if (data.isObject()) { + return downgradeObjectNode(data, schema); + } else if (data.isArray()) { + return downgradeArrayNode(data, schema); + } else { + // A primitive, non-text node never needs to be modified: + // Protocol v1 didn't change how booleans work + // And protocol v0 expects raw numbers anyway + // So we just check whether the schema is correct and return the node as-is. + return new DowngradedNode(data, validator.validate(schema, data).isEmpty()); + } + } + + /** + * Attempt to downgrade using each oneOf option in sequence. Returns the result from downgrading + * using the first subschema that matches the data, or if none match, then the result of using the + * first subschema. + */ + private DowngradedNode downgradeOneofNode(JsonNode data, JsonNode schema) { + JsonNode schemaOptions = schema.get("oneOf"); + if (schemaOptions.size() == 0) { + // If the oneOf has no options, then don't do anything interesting. + return new DowngradedNode(data, validator.validate(schema, data).isEmpty()); + } + + // Attempt to downgrade the node against each oneOf schema. + // Return the first schema that matches the data, or the first schema if none matched successfully. + DowngradedNode downgradedNode = null; + for (JsonNode maybeSchema : schemaOptions) { + DowngradedNode maybeDowngradedNode = downgradeNode(data, maybeSchema); + if (downgradedNode == null) { + // If this is the first subschema, then just take it + downgradedNode = maybeDowngradedNode; + } else if (!downgradedNode.matchedSchema() && maybeDowngradedNode.matchedSchema()) { + // Otherwise - if we've found a matching schema, then return immediately + downgradedNode = maybeDowngradedNode; + break; + } + } + // None of the schemas matched, so just return whatever we found first + return downgradedNode; + } + + /** + * Downgrade a textual node. This could either be a string/date/timestamp/etc, in which case we need + * to do nothing. Or it could be a number/integer, in which case we should convert it to a JSON + * numerical node. + * + * If the data doesn't match the schema, then just return it without modification. + */ + private DowngradedNode downgradeTextualNode(JsonNode data, JsonNode schema) { + JsonNode refNode = schema.get(REF_KEY); + if (refNode != null) { + // If this is a valid v1 schema, then we _must_ have a $ref schema. + String refType = refNode.asText(); + if (JsonSchemaReferenceTypes.NUMBER_REFERENCE.equals(refType) + || JsonSchemaReferenceTypes.INTEGER_REFERENCE.equals(refType)) { + // We could do this as a try-catch, but this migration will run on every RecordMessage + // so it does need to be reasonably performant. + // Instead, we use a regex to check for numeric literals. + // Note that this does _not_ allow infinity/nan, even though protocol v1 _does_ allow them. + // This is because JSON numeric literals don't allow those values. + if (data.asText().matches("-?\\d+(\\.\\d+)?")) { + // If this string is a numeric literal, convert it to a numeric node. + return new DowngradedNode(Jsons.deserialize(data.asText()), true); + } else { + // Otherwise, just leave the node unchanged. + return new DowngradedNode(data, false); + } + } else { + // This is a non-numeric string (so could be a date/timestamp/etc) + // Run it through the validator, but don't modify the data. + return new DowngradedNode(data, validator.validate(schema, data).isEmpty()); + } + } else { + // Otherwise - the schema is invalid. + return new DowngradedNode(data, false); + } + } + + /** + * If data is an object, then we need to recursively downgrade all of its fields. + */ + private DowngradedNode downgradeObjectNode(JsonNode data, JsonNode schema) { + boolean isObjectSchema; + // First, check whether the schema is supposed to be an object at all. + if (schema.hasNonNull(REF_KEY)) { + // If the schema uses a reference type, then it's not an object schema. + isObjectSchema = false; + } else if (schema.hasNonNull("type")) { + // If the schema declares {type: object} or {type: [..., object, ...]} + // Then this is an object schema + JsonNode typeNode = schema.get("type"); + if (typeNode.isArray()) { + isObjectSchema = false; + for (JsonNode typeItem : typeNode) { + if ("object".equals(typeItem.asText())) { + isObjectSchema = true; + } + } + } else { + isObjectSchema = "object".equals(typeNode.asText()); + } + } else { + // If the schema doesn't declare a type at all (which is bad practice, but let's handle it anyway) + // Then check for a properties entry, and assume that this is an object if it's present + isObjectSchema = schema.hasNonNull("properties"); + } + + if (!isObjectSchema) { + // If it's not supposed to be an object, then we can't do anything here. + // Return the data without modification. + return new DowngradedNode(data, false); + } else { + // If the schema _is_ for an object, then recurse into each field + ObjectNode downgradedData = (ObjectNode) Jsons.emptyObject(); + JsonNode propertiesNode = schema.get("properties"); + + Iterator> dataFields = data.fields(); + boolean matchedSchema = true; + while (dataFields.hasNext()) { + Entry field = dataFields.next(); + String key = field.getKey(); + JsonNode value = field.getValue(); + if (propertiesNode != null && propertiesNode.hasNonNull(key)) { + // If we have a schema for this property, downgrade the value + JsonNode subschema = propertiesNode.get(key); + DowngradedNode downgradedNode = downgradeNode(value, subschema); + downgradedData.set(key, downgradedNode.node); + if (!downgradedNode.matchedSchema) { + matchedSchema = false; + } + } else { + // Else it's an additional property - we _could_ check additionalProperties, + // but that's annoying. We don't actually respect that in destinations/normalization anyway. + downgradedData.set(key, value); + } + } + + return new DowngradedNode(downgradedData, matchedSchema); + } + } + + /** + * Much like objects, arrays must be recursively downgraded. + */ + private DowngradedNode downgradeArrayNode(JsonNode data, JsonNode schema) { + // Similar to objects, we first check whether this is even supposed to be an array. + boolean isArraySchema; + if (schema.hasNonNull(REF_KEY)) { + // If the schema uses a reference type, then it's not an array schema. + isArraySchema = false; + } else if (schema.hasNonNull("type")) { + // If the schema declares {type: array} or {type: [..., array, ...]} + // Then this is an array schema + JsonNode typeNode = schema.get("type"); + if (typeNode.isArray()) { + isArraySchema = false; + for (JsonNode typeItem : typeNode) { + if ("array".equals(typeItem.asText())) { + isArraySchema = true; + } + } + } else { + isArraySchema = "array".equals(typeNode.asText()); + } + } else { + // If the schema doesn't declare a type at all (which is bad practice, but let's handle it anyway) + // Then check for an items entry, and assume that this is an array if it's present + isArraySchema = schema.hasNonNull("items"); + } + + if (!isArraySchema) { + return new DowngradedNode(data, false); + } else { + ArrayNode downgradedItems = Jsons.arrayNode(); + JsonNode itemsNode = schema.get("items"); + if (itemsNode == null) { + // We _could_ check additionalItems, but much like the additionalProperties comment for objects: + // it's a lot of work for no payoff + return new DowngradedNode(data, true); + } else if (itemsNode.isArray()) { + // In the case of {items: [schema1, schema2, ...]} + // We need to check schema1 against the first element of the array, + // schema2 against the second element, etc. + boolean allSchemasMatched = true; + for (int i = 0; i < data.size(); i++) { + JsonNode element = data.get(i); + if (itemsNode.size() > i) { + // If we have a schema for this element, then try downgrading the element + DowngradedNode downgradedElement = downgradeNode(element, itemsNode.get(i)); + if (!downgradedElement.matchedSchema()) { + allSchemasMatched = false; + } + downgradedItems.add(downgradedElement.node()); + } + } + // If there were more elements in `data` than there were schemas in `itemsNode`, + // then just blindly add the rest of those elements. + for (int i = itemsNode.size(); i < data.size(); i++) { + downgradedItems.add(data.get(i)); + } + return new DowngradedNode(downgradedItems, allSchemasMatched); + } else { + // IN the case of {items: schema}, we just check every array element against that schema. + boolean matchedSchema = true; + for (JsonNode item : data) { + DowngradedNode downgradedNode = downgradeNode(item, itemsNode); + downgradedItems.add(downgradedNode.node); + if (!downgradedNode.matchedSchema) { + matchedSchema = false; + } + } + return new DowngradedNode(downgradedItems, matchedSchema); + } + } + } + @Override public Version getPreviousVersion() { return AirbyteProtocolVersion.V0; diff --git a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/migrations/V0ToV1MigrationTest.java b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/migrations/V0ToV1MigrationTest.java index c16a20ff5b64..3a21d9f4277b 100644 --- a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/migrations/V0ToV1MigrationTest.java +++ b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/migrations/V0ToV1MigrationTest.java @@ -8,7 +8,16 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; +import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.validation.json.JsonSchemaValidator; +import java.net.URI; +import java.net.URISyntaxException; import java.util.List; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -16,11 +25,16 @@ public class V0ToV1MigrationTest { + JsonSchemaValidator validator; private AirbyteMessageMigrationV1 migration; @BeforeEach - public void setup() { - migration = new AirbyteMessageMigrationV1(); + public void setup() throws URISyntaxException { + // TODO this should probably just get generated as part of the airbyte-protocol build, and + // airbyte-workers / airbyte-commons-protocol would reference it directly + URI parentUri = MoreResources.readResourceAsFile("WellKnownTypes.json").getAbsoluteFile().toURI(); + validator = new JsonSchemaValidator(parentUri); + migration = new AirbyteMessageMigrationV1(null, validator); } @Test @@ -36,253 +50,264 @@ public class CatalogUpgradeTest { public void testBasicUpgrade() { // This isn't actually a valid stream schema (since it's not an object) // but this test case is mostly about preserving the message structure, so it's not super relevant - JsonNode oldSchema = Jsons.deserialize(""" - { - "type": "string" - } - """); + JsonNode oldSchema = Jsons.deserialize( + """ + { + "type": "string" + } + """); AirbyteMessage upgradedMessage = migration.upgrade(createCatalogMessage(oldSchema)); - AirbyteMessage expectedMessage = Jsons.deserialize(""" - { - "type": "CATALOG", - "catalog": { - "streams": [ - { - "json_schema": { - "$ref": "WellKnownTypes.json#/definitions/String" - } - } - ] - } - } - """, + AirbyteMessage expectedMessage = Jsons.deserialize( + """ + { + "type": "CATALOG", + "catalog": { + "streams": [ + { + "json_schema": { + "$ref": "WellKnownTypes.json#/definitions/String" + } + } + ] + } + } + """, AirbyteMessage.class); assertEquals(expectedMessage, upgradedMessage); } - @Test - public void testUpgradeAllPrimitives() { - JsonNode oldSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "example_string": { - "type": "string" - }, - "example_number": { - "type": "number" - }, - "example_integer": { - "type": "integer" - }, - "example_airbyte_integer": { - "type": "number", - "airbyte_type": "integer" - }, - "example_boolean": { - "type": "boolean" - }, - "example_timestamptz": { - "type": "string", - "format": "date-time", - "airbyte_type": "timestamp_with_timezone" - }, - "example_timestamptz_implicit": { - "type": "string", - "format": "date-time" - }, - "example_timestamp_without_tz": { - "type": "string", - "format": "date-time", - "airbyte_type": "timestamp_without_timezone" - }, - "example_timez": { - "type": "string", - "format": "time", - "airbyte_type": "time_with_timezone" - }, - "example_timetz_implicit": { - "type": "string", - "format": "time" - }, - "example_time_without_tz": { - "type": "string", - "format": "time", - "airbyte_type": "time_without_timezone" - }, - "example_date": { - "type": "string", - "format": "date" - }, - "example_binary": { - "type": "string", - "contentEncoding": "base64" - } - } - } - """); + /** + * Utility method to upgrade the oldSchema, and assert that the result is equal to expectedSchema + * + * @param oldSchemaString The schema to be upgraded + * @param expectedSchemaString The expected schema after upgrading + */ + private void doTest(String oldSchemaString, String expectedSchemaString) { + JsonNode oldSchema = Jsons.deserialize(oldSchemaString); AirbyteMessage upgradedMessage = migration.upgrade(createCatalogMessage(oldSchema)); - JsonNode expectedSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "example_string": { - "$ref": "WellKnownTypes.json#/definitions/String" - }, - "example_number": { - "$ref": "WellKnownTypes.json#/definitions/Number" - }, - "example_integer": { - "$ref": "WellKnownTypes.json#/definitions/Integer" - }, - "example_airbyte_integer": { - "$ref": "WellKnownTypes.json#/definitions/Integer" - }, - "example_boolean": { - "$ref": "WellKnownTypes.json#/definitions/Boolean" - }, - "example_timestamptz": { - "$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone" - }, - "example_timestamptz_implicit": { - "$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone" - }, - "example_timestamp_without_tz": { - "$ref": "WellKnownTypes.json#/definitions/TimestampWithoutTimezone" - }, - "example_timez": { - "$ref": "WellKnownTypes.json#/definitions/TimeWithTimezone" - }, - "example_timetz_implicit": { - "$ref": "WellKnownTypes.json#/definitions/TimeWithTimezone" - }, - "example_time_without_tz": { - "$ref": "WellKnownTypes.json#/definitions/TimeWithoutTimezone" - }, - "example_date": { - "$ref": "WellKnownTypes.json#/definitions/Date" - }, - "example_binary": { - "$ref": "WellKnownTypes.json#/definitions/BinaryData" - } - } - } - """); + JsonNode expectedSchema = Jsons.deserialize(expectedSchemaString); assertEquals(expectedSchema, upgradedMessage.getCatalog().getStreams().get(0).getJsonSchema()); } @Test - public void testUpgradeNestedFields() { - JsonNode oldSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "basic_array": { - "items": {"type": "string"} - }, - "tuple_array": { - "items": [ - {"type": "string"}, - {"type": "integer"} - ], - "additionalItems": {"type": "string"}, - "contains": {"type": "integer"} - }, - "nested_object": { - "properties": { - "id": {"type": "integer"}, - "nested_oneof": { - "oneOf": [ - {"type": "string"}, - {"type": "integer"} - ] - }, - "nested_anyof": { - "anyOf": [ - {"type": "string"}, - {"type": "integer"} - ] - }, - "nested_allof": { - "allOf": [ - {"type": "string"}, - {"type": "integer"} - ] - }, - "nested_not": { - "not": [ - {"type": "string"}, - {"type": "integer"} - ] - } - }, - "patternProperties": { - "integer_.*": {"type": "integer"} - }, - "additionalProperties": {"type": "string"} - } - } - } - """); - - AirbyteMessage upgradedMessage = migration.upgrade(createCatalogMessage(oldSchema)); + public void testUpgradeAllPrimitives() { + doTest( + """ + { + "type": "object", + "properties": { + "example_string": { + "type": "string" + }, + "example_number": { + "type": "number" + }, + "example_integer": { + "type": "integer" + }, + "example_airbyte_integer": { + "type": "number", + "airbyte_type": "integer" + }, + "example_boolean": { + "type": "boolean" + }, + "example_timestamptz": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "example_timestamptz_implicit": { + "type": "string", + "format": "date-time" + }, + "example_timestamp_without_tz": { + "type": "string", + "format": "date-time", + "airbyte_type": "timestamp_without_timezone" + }, + "example_timez": { + "type": "string", + "format": "time", + "airbyte_type": "time_with_timezone" + }, + "example_timetz_implicit": { + "type": "string", + "format": "time" + }, + "example_time_without_tz": { + "type": "string", + "format": "time", + "airbyte_type": "time_without_timezone" + }, + "example_date": { + "type": "string", + "format": "date" + }, + "example_binary": { + "type": "string", + "contentEncoding": "base64" + } + } + } + """, + """ + { + "type": "object", + "properties": { + "example_string": { + "$ref": "WellKnownTypes.json#/definitions/String" + }, + "example_number": { + "$ref": "WellKnownTypes.json#/definitions/Number" + }, + "example_integer": { + "$ref": "WellKnownTypes.json#/definitions/Integer" + }, + "example_airbyte_integer": { + "$ref": "WellKnownTypes.json#/definitions/Integer" + }, + "example_boolean": { + "$ref": "WellKnownTypes.json#/definitions/Boolean" + }, + "example_timestamptz": { + "$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone" + }, + "example_timestamptz_implicit": { + "$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone" + }, + "example_timestamp_without_tz": { + "$ref": "WellKnownTypes.json#/definitions/TimestampWithoutTimezone" + }, + "example_timez": { + "$ref": "WellKnownTypes.json#/definitions/TimeWithTimezone" + }, + "example_timetz_implicit": { + "$ref": "WellKnownTypes.json#/definitions/TimeWithTimezone" + }, + "example_time_without_tz": { + "$ref": "WellKnownTypes.json#/definitions/TimeWithoutTimezone" + }, + "example_date": { + "$ref": "WellKnownTypes.json#/definitions/Date" + }, + "example_binary": { + "$ref": "WellKnownTypes.json#/definitions/BinaryData" + } + } + } + """); + } - JsonNode expectedSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "basic_array": { - "items": {"$ref": "WellKnownTypes.json#/definitions/String"} - }, - "tuple_array": { - "items": [ - {"$ref": "WellKnownTypes.json#/definitions/String"}, - {"$ref": "WellKnownTypes.json#/definitions/Integer"} - ], - "additionalItems": {"$ref": "WellKnownTypes.json#/definitions/String"}, - "contains": {"$ref": "WellKnownTypes.json#/definitions/Integer"} - }, - "nested_object": { - "properties": { - "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, - "nested_oneof": { - "oneOf": [ - {"$ref": "WellKnownTypes.json#/definitions/String"}, - {"$ref": "WellKnownTypes.json#/definitions/Integer"} - ] - }, - "nested_anyof": { - "anyOf": [ - {"$ref": "WellKnownTypes.json#/definitions/String"}, - {"$ref": "WellKnownTypes.json#/definitions/Integer"} - ] - }, - "nested_allof": { - "allOf": [ - {"$ref": "WellKnownTypes.json#/definitions/String"}, - {"$ref": "WellKnownTypes.json#/definitions/Integer"} - ] - }, - "nested_not": { - "not": [ - {"$ref": "WellKnownTypes.json#/definitions/String"}, - {"$ref": "WellKnownTypes.json#/definitions/Integer"} - ] - } - }, - "patternProperties": { - "integer_.*": {"$ref": "WellKnownTypes.json#/definitions/Integer"} - }, - "additionalProperties": {"$ref": "WellKnownTypes.json#/definitions/String"} - } - } - } - """); - assertEquals(expectedSchema, upgradedMessage.getCatalog().getStreams().get(0).getJsonSchema()); + @Test + public void testUpgradeNestedFields() { + doTest( + """ + { + "type": "object", + "properties": { + "basic_array": { + "items": {"type": "string"} + }, + "tuple_array": { + "items": [ + {"type": "string"}, + {"type": "integer"} + ], + "additionalItems": {"type": "string"}, + "contains": {"type": "integer"} + }, + "nested_object": { + "properties": { + "id": {"type": "integer"}, + "nested_oneof": { + "oneOf": [ + {"type": "string"}, + {"type": "integer"} + ] + }, + "nested_anyof": { + "anyOf": [ + {"type": "string"}, + {"type": "integer"} + ] + }, + "nested_allof": { + "allOf": [ + {"type": "string"}, + {"type": "integer"} + ] + }, + "nested_not": { + "not": [ + {"type": "string"}, + {"type": "integer"} + ] + } + }, + "patternProperties": { + "integer_.*": {"type": "integer"} + }, + "additionalProperties": {"type": "string"} + } + } + } + """, + """ + { + "type": "object", + "properties": { + "basic_array": { + "items": {"$ref": "WellKnownTypes.json#/definitions/String"} + }, + "tuple_array": { + "items": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ], + "additionalItems": {"$ref": "WellKnownTypes.json#/definitions/String"}, + "contains": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "nested_object": { + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "nested_oneof": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + }, + "nested_anyof": { + "anyOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + }, + "nested_allof": { + "allOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + }, + "nested_not": { + "not": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + } + }, + "patternProperties": { + "integer_.*": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "additionalProperties": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + } + } + """); } @Test @@ -325,7 +350,7 @@ public void testUpgradeBooleanSchemas() { } } """; - assertUpgradeIsNoop(schemaString); + doTest(schemaString, schemaString); } @Test @@ -368,7 +393,7 @@ public void testUpgradeEmptySchema() { } } """; - assertUpgradeIsNoop(schemaString); + doTest(schemaString, schemaString); } @Test @@ -387,7 +412,7 @@ public void testUpgradeLiteralSchema() { } } """; - assertUpgradeIsNoop(schemaString); + doTest(schemaString, schemaString); } @Test @@ -397,132 +422,133 @@ public void testUpgradeMalformedSchemas() { // i.e. it will disregard the option for a boolean. // Generating this sort of schema is just wrong; sources shouldn't do this to begin with. But let's // verify that we behave mostly correctly here. - JsonNode oldSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "bad_timestamptz": { - "type": ["boolean", "string"], - "format": "date-time", - "airbyte_type": "timestamp_with_timezone" - }, - "bad_integer": { - "type": "string", - "format": "date-time", - "airbyte_type": "integer" - } - } - } - """); - - AirbyteMessage upgradedMessage = migration.upgrade(createCatalogMessage(oldSchema)); - - JsonNode expectedSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "bad_timestamptz": {"$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone"}, - "bad_integer": {"$ref": "WellKnownTypes.json#/definitions/Integer"} - } - } - """); - assertEquals(expectedSchema, upgradedMessage.getCatalog().getStreams().get(0).getJsonSchema()); + doTest( + """ + { + "type": "object", + "properties": { + "bad_timestamptz": { + "type": ["boolean", "string"], + "format": "date-time", + "airbyte_type": "timestamp_with_timezone" + }, + "bad_integer": { + "type": "string", + "format": "date-time", + "airbyte_type": "integer" + } + } + } + """, + """ + { + "type": "object", + "properties": { + "bad_timestamptz": {"$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone"}, + "bad_integer": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + } + } + """); } @Test public void testUpgradeMultiTypeFields() { - JsonNode oldSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "multityped_field": { - "type": ["string", "object", "array"], - "properties": { - "id": {"type": "string"} - }, - "patternProperties": { - "integer_.*": {"type": "integer"} - }, - "additionalProperties": {"type": "string"}, - "items": {"type": "string"}, - "additionalItems": {"type": "string"}, - "contains": {"type": "string"} - }, - "nullable_multityped_field": { - "type": ["null", "string", "array", "object"], - "items": [{"type": "string"}, {"type": "integer"}], - "properties": { - "id": {"type": "integer"} - } - }, - "multityped_date_field": { - "type": ["string", "integer"], - "format": "date" - }, - "sneaky_singletype_field": { - "type": ["string", "null"], - "format": "date-time" - } - } - } - """); - - AirbyteMessage upgradedMessage = migration.upgrade(createCatalogMessage(oldSchema)); + doTest( + """ + { + "type": "object", + "properties": { + "multityped_field": { + "type": ["string", "object", "array"], + "properties": { + "id": {"type": "string"} + }, + "patternProperties": { + "integer_.*": {"type": "integer"} + }, + "additionalProperties": {"type": "string"}, + "items": {"type": "string"}, + "additionalItems": {"type": "string"}, + "contains": {"type": "string"} + }, + "nullable_multityped_field": { + "type": ["null", "string", "array", "object"], + "items": [{"type": "string"}, {"type": "integer"}], + "properties": { + "id": {"type": "integer"} + } + }, + "multityped_date_field": { + "type": ["string", "integer"], + "format": "date" + }, + "sneaky_singletype_field": { + "type": ["string", "null"], + "format": "date-time" + } + } + } + """, + """ + { + "type": "object", + "properties": { + "multityped_field": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + { + "type": "object", + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/String"} + }, + "patternProperties": { + "integer_.*": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "additionalProperties": {"$ref": "WellKnownTypes.json#/definitions/String"} + }, + { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/String"}, + "additionalItems": {"$ref": "WellKnownTypes.json#/definitions/String"}, + "contains": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + ] + }, + "nullable_multityped_field": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + { + "type": "array", + "items": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + }, + { + "type": "object", + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + } + } + ] + }, + "multityped_date_field": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/Date"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + }, + "sneaky_singletype_field": {"$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone"} + } + } + """); + } - JsonNode expectedSchema = Jsons.deserialize(""" - { - "type": "object", - "properties": { - "multityped_field": { - "oneOf": [ - {"$ref": "WellKnownTypes.json#/definitions/String"}, - { - "type": "object", - "properties": { - "id": {"$ref": "WellKnownTypes.json#/definitions/String"} - }, - "patternProperties": { - "integer_.*": {"$ref": "WellKnownTypes.json#/definitions/Integer"} - }, - "additionalProperties": {"$ref": "WellKnownTypes.json#/definitions/String"} - }, - { - "type": "array", - "items": {"$ref": "WellKnownTypes.json#/definitions/String"}, - "additionalItems": {"$ref": "WellKnownTypes.json#/definitions/String"}, - "contains": {"$ref": "WellKnownTypes.json#/definitions/String"} - } - ] - }, - "nullable_multityped_field": { - "oneOf": [ - {"$ref": "WellKnownTypes.json#/definitions/String"}, - { - "type": "array", - "items": [ - {"$ref": "WellKnownTypes.json#/definitions/String"}, - {"$ref": "WellKnownTypes.json#/definitions/Integer"} - ] - }, - { - "type": "object", - "properties": { - "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"} - } - } - ] - }, - "multityped_date_field": { - "oneOf": [ - {"$ref": "WellKnownTypes.json#/definitions/Date"}, - {"$ref": "WellKnownTypes.json#/definitions/Integer"} - ] - }, - "sneaky_singletype_field": {"$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone"} - } - } - """); - assertEquals(expectedSchema, upgradedMessage.getCatalog().getStreams().get(0).getJsonSchema()); + private io.airbyte.protocol.models.v0.AirbyteMessage createCatalogMessage(JsonNode schema) { + return new io.airbyte.protocol.models.v0.AirbyteMessage().withType(io.airbyte.protocol.models.v0.AirbyteMessage.Type.CATALOG) + .withCatalog( + new io.airbyte.protocol.models.v0.AirbyteCatalog().withStreams(List.of(new io.airbyte.protocol.models.v0.AirbyteStream().withJsonSchema( + schema)))); } } @@ -532,113 +558,1034 @@ public class RecordUpgradeTest { @Test public void testBasicUpgrade() { - JsonNode oldData = Jsons.deserialize(""" - { - "id": 42 - } - """); + JsonNode oldData = Jsons.deserialize( + """ + { + "id": 42 + } + """); AirbyteMessage upgradedMessage = migration.upgrade(createRecordMessage(oldData)); - AirbyteMessage expectedMessage = Jsons.deserialize(""" - { - "type": "RECORD", - "record": { - "data": { - "id": "42" - } - } - } - """, + AirbyteMessage expectedMessage = Jsons.deserialize( + """ + { + "type": "RECORD", + "record": { + "data": { + "id": "42" + } + } + } + """, AirbyteMessage.class); assertEquals(expectedMessage, upgradedMessage); } - @Test - public void testNestedUpgrade() { - JsonNode oldData = Jsons.deserialize(""" - { - "int": 42, - "float": 42.0, - "float2": 42.2, - "sub_object": { - "sub_int": 42, - "sub_float": 42.0, - "sub_float2": 42.2 - }, - "sub_array": [42, 42.0, 42.2] - } - """); + /** + * Utility method to upgrade the oldData, and assert that the result is equal to expectedData + * + * @param oldDataString The data of the record to be upgraded + * @param expectedDataString The expected data after upgrading + */ + private void doTest(String oldDataString, String expectedDataString) { + JsonNode oldData = Jsons.deserialize(oldDataString); AirbyteMessage upgradedMessage = migration.upgrade(createRecordMessage(oldData)); - JsonNode expectedData = Jsons.deserialize(""" - { - "int": "42", - "float": "42.0", - "float2": "42.2", - "sub_object": { - "sub_int": "42", - "sub_float": "42.0", - "sub_float2": "42.2" - }, - "sub_array": ["42", "42.0", "42.2"] - } - """); + JsonNode expectedData = Jsons.deserialize(expectedDataString); assertEquals(expectedData, upgradedMessage.getRecord().getData()); } @Test - public void testNonUpgradableValues() { - JsonNode oldData = Jsons.deserialize(""" - { - "boolean": true, - "string": "arst", - "sub_object": { - "boolean": true, - "string": "arst" - }, - "sub_array": [true, "arst"] - } - """); + public void testNestedUpgrade() { + doTest( + """ + { + "int": 42, + "float": 42.0, + "float2": 42.2, + "sub_object": { + "sub_int": 42, + "sub_float": 42.0, + "sub_float2": 42.2 + }, + "sub_array": [42, 42.0, 42.2] + } + """, + """ + { + "int": "42", + "float": "42.0", + "float2": "42.2", + "sub_object": { + "sub_int": "42", + "sub_float": "42.0", + "sub_float2": "42.2" + }, + "sub_array": ["42", "42.0", "42.2"] + } + """); + } - AirbyteMessage upgradedMessage = migration.upgrade(createRecordMessage(oldData)); + @Test + public void testNonUpgradableValues() { + doTest( + """ + { + "boolean": true, + "string": "arst", + "sub_object": { + "boolean": true, + "string": "arst" + }, + "sub_array": [true, "arst"] + } + """, + """ + { + "boolean": true, + "string": "arst", + "sub_object": { + "boolean": true, + "string": "arst" + }, + "sub_array": [true, "arst"] + } + """); + } - JsonNode expectedData = Jsons.deserialize(""" - { - "boolean": true, - "string": "arst", - "sub_object": { - "boolean": true, - "string": "arst" - }, - "sub_array": [true, "arst"] - } - """); - assertEquals(expectedData, upgradedMessage.getRecord().getData()); + private io.airbyte.protocol.models.v0.AirbyteMessage createRecordMessage(JsonNode data) { + return new io.airbyte.protocol.models.v0.AirbyteMessage().withType(io.airbyte.protocol.models.v0.AirbyteMessage.Type.RECORD) + .withRecord(new io.airbyte.protocol.models.v0.AirbyteRecordMessage().withData(data)); } } - private io.airbyte.protocol.models.v0.AirbyteMessage createCatalogMessage(JsonNode schema) { - return new io.airbyte.protocol.models.v0.AirbyteMessage().withType(io.airbyte.protocol.models.v0.AirbyteMessage.Type.CATALOG) - .withCatalog( - new io.airbyte.protocol.models.v0.AirbyteCatalog().withStreams(List.of(new io.airbyte.protocol.models.v0.AirbyteStream().withJsonSchema( - schema)))); - } + @Nested + public class CatalogDowngradeTest { + + @Test + public void testBasicDowngrade() { + // This isn't actually a valid stream schema (since it's not an object) + // but this test case is mostly about preserving the message structure, so it's not super relevant + JsonNode newSchema = Jsons.deserialize( + """ + { + "$ref": "WellKnownTypes.json#/definitions/String" + } + """); + + io.airbyte.protocol.models.v0.AirbyteMessage downgradedMessage = migration.downgrade(createCatalogMessage(newSchema)); + + io.airbyte.protocol.models.v0.AirbyteMessage expectedMessage = Jsons.deserialize( + """ + { + "type": "CATALOG", + "catalog": { + "streams": [ + { + "json_schema": { + "type": "string" + } + } + ] + } + } + """, + io.airbyte.protocol.models.v0.AirbyteMessage.class); + assertEquals(expectedMessage, downgradedMessage); + } + + /** + * Utility method to downgrade the oldSchema, and assert that the result is equal to expectedSchema + * + * @param oldSchemaString The schema to be downgraded + * @param expectedSchemaString The expected schema after downgrading + */ + private void doTest(String oldSchemaString, String expectedSchemaString) { + JsonNode oldSchema = Jsons.deserialize(oldSchemaString); + + io.airbyte.protocol.models.v0.AirbyteMessage downgradedMessage = migration.downgrade(createCatalogMessage(oldSchema)); + + JsonNode expectedSchema = Jsons.deserialize(expectedSchemaString); + assertEquals(expectedSchema, downgradedMessage.getCatalog().getStreams().get(0).getJsonSchema()); + } + + @Test + public void testDowngradeAllPrimitives() { + doTest( + """ + { + "type": "object", + "properties": { + "example_string": { + "$ref": "WellKnownTypes.json#/definitions/String" + }, + "example_number": { + "$ref": "WellKnownTypes.json#/definitions/Number" + }, + "example_integer": { + "$ref": "WellKnownTypes.json#/definitions/Integer" + }, + "example_boolean": { + "$ref": "WellKnownTypes.json#/definitions/Boolean" + }, + "example_timestamptz": { + "$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone" + }, + "example_timestamp_without_tz": { + "$ref": "WellKnownTypes.json#/definitions/TimestampWithoutTimezone" + }, + "example_timez": { + "$ref": "WellKnownTypes.json#/definitions/TimeWithTimezone" + }, + "example_time_without_tz": { + "$ref": "WellKnownTypes.json#/definitions/TimeWithoutTimezone" + }, + "example_date": { + "$ref": "WellKnownTypes.json#/definitions/Date" + }, + "example_binary": { + "$ref": "WellKnownTypes.json#/definitions/BinaryData" + } + } + } + """, + """ + { + "type": "object", + "properties": { + "example_string": { + "type": "string" + }, + "example_number": { + "type": "number" + }, + "example_integer": { + "type": "integer" + }, + "example_boolean": { + "type": "boolean" + }, + "example_timestamptz": { + "type": "string", + "airbyte_type": "timestamp_with_timezone", + "format": "date-time" + }, + "example_timestamp_without_tz": { + "type": "string", + "airbyte_type": "timestamp_without_timezone", + "format": "date-time" + }, + "example_timez": { + "type": "string", + "airbyte_type": "time_with_timezone", + "format": "time" + }, + "example_time_without_tz": { + "type": "string", + "airbyte_type": "time_without_timezone", + "format": "time" + }, + "example_date": { + "type": "string", + "format": "date" + }, + "example_binary": { + "type": "string", + "contentEncoding": "base64" + } + } + } + """); + } + + @Test + public void testDowngradeNestedFields() { + doTest( + """ + { + "type": "object", + "properties": { + "basic_array": { + "items": {"$ref": "WellKnownTypes.json#/definitions/String"} + }, + "tuple_array": { + "items": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ], + "additionalItems": {"$ref": "WellKnownTypes.json#/definitions/String"}, + "contains": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "nested_object": { + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "nested_oneof": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone"} + ] + }, + "nested_anyof": { + "anyOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + }, + "nested_allof": { + "allOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + }, + "nested_not": { + "not": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + } + }, + "patternProperties": { + "integer_.*": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "additionalProperties": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + } + } + """, + """ + { + "type": "object", + "properties": { + "basic_array": { + "items": {"type": "string"} + }, + "tuple_array": { + "items": [ + {"type": "string"}, + {"type": "integer"} + ], + "additionalItems": {"type": "string"}, + "contains": {"type": "integer"} + }, + "nested_object": { + "properties": { + "id": {"type": "integer"}, + "nested_oneof": { + "oneOf": [ + {"type": "string"}, + {"type": "string", "format": "date-time", "airbyte_type": "timestamp_with_timezone"} + ] + }, + "nested_anyof": { + "anyOf": [ + {"type": "string"}, + {"type": "integer"} + ] + }, + "nested_allof": { + "allOf": [ + {"type": "string"}, + {"type": "integer"} + ] + }, + "nested_not": { + "not": [ + {"type": "string"}, + {"type": "integer"} + ] + } + }, + "patternProperties": { + "integer_.*": {"type": "integer"} + }, + "additionalProperties": {"type": "string"} + } + } + } + """); + } + + @Test + public void testDowngradeBooleanSchemas() { + // Most of these should never happen in reality, but let's handle them just in case + // The only ones that we're _really_ expecting are additionalItems and additionalProperties + String schemaString = """ + { + "type": "object", + "properties": { + "basic_array": { + "items": true + }, + "tuple_array": { + "items": [true], + "additionalItems": true, + "contains": true + }, + "nested_object": { + "properties": { + "id": true, + "nested_oneof": { + "oneOf": [true] + }, + "nested_anyof": { + "anyOf": [true] + }, + "nested_allof": { + "allOf": [true] + }, + "nested_not": { + "not": [true] + } + }, + "patternProperties": { + "integer_.*": true + }, + "additionalProperties": true + } + } + } + """; + doTest(schemaString, schemaString); + } + + @Test + public void testDowngradeEmptySchema() { + // Sources shouldn't do this, but we should have handling for it anyway, since it's not currently + // enforced by SATs + String schemaString = """ + { + "type": "object", + "properties": { + "basic_array": { + "items": {} + }, + "tuple_array": { + "items": [{}], + "additionalItems": {}, + "contains": {} + }, + "nested_object": { + "properties": { + "id": {}, + "nested_oneof": { + "oneOf": [{}] + }, + "nested_anyof": { + "anyOf": [{}] + }, + "nested_allof": { + "allOf": [{}] + }, + "nested_not": { + "not": [{}] + } + }, + "patternProperties": { + "integer_.*": {} + }, + "additionalProperties": {} + } + } + } + """; + doTest(schemaString, schemaString); + } + + @Test + public void testDowngradeLiteralSchema() { + // Verify that we do _not_ recurse into places we shouldn't + String schemaString = """ + { + "type": "object", + "properties": { + "example_schema": { + "type": "object", + "default": {"$ref": "WellKnownTypes.json#/definitions/String"}, + "enum": [{"$ref": "WellKnownTypes.json#/definitions/String"}], + "const": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + } + } + """; + doTest(schemaString, schemaString); + } - private void assertUpgradeIsNoop(String schemaString) { - JsonNode oldSchema = Jsons.deserialize(schemaString); + @Test + public void testDowngradeMultiTypeFields() { + doTest( + """ + { + "type": "object", + "properties": { + "multityped_field": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + { + "type": "object", + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/String"} + }, + "patternProperties": { + "integer_.*": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "additionalProperties": {"$ref": "WellKnownTypes.json#/definitions/String"} + }, + { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/String"}, + "additionalItems": {"$ref": "WellKnownTypes.json#/definitions/String"}, + "contains": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + ] + }, + "multityped_date_field": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/Date"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + }, + "boolean_field": { + "oneOf": [ + true, + {"$ref": "WellKnownTypes.json#/definitions/String"}, + false + ] + }, + "conflicting_field": { + "oneOf": [ + {"type": "object", "properties": {"id": {"$ref": "WellKnownTypes.json#/definitions/String"}}}, + {"type": "object", "properties": {"name": {"$ref": "WellKnownTypes.json#/definitions/String"}}}, + {"$ref": "WellKnownTypes.json#/definitions/String"} + ] + }, + "conflicting_primitives": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/TimestampWithoutTimezone"}, + {"$ref": "WellKnownTypes.json#/definitions/TimestampWithTimezone"} + ] + } + } + } + """, + """ + { + "type": "object", + "properties": { + "multityped_field": { + "type": ["string", "object", "array"], + "properties": { + "id": {"type": "string"} + }, + "patternProperties": { + "integer_.*": {"type": "integer"} + }, + "additionalProperties": {"type": "string"}, + "items": {"type": "string"}, + "additionalItems": {"type": "string"}, + "contains": {"type": "string"} + }, + "multityped_date_field": { + "type": ["string", "integer"], + "format": "date" + }, + "boolean_field": { + "oneOf": [ + true, + {"type": "string"}, + false + ] + }, + "conflicting_field": { + "oneOf": [ + {"type": "object", "properties": {"id": {"type": "string"}}}, + {"type": "object", "properties": {"name": {"type": "string"}}}, + {"type": "string"} + ] + }, + "conflicting_primitives": { + "oneOf": [ + {"type": "string", "format": "date-time", "airbyte_type": "timestamp_without_timezone"}, + {"type": "string", "format": "date-time", "airbyte_type": "timestamp_with_timezone"} + ] + } + } + } + """); + } - AirbyteMessage upgradedMessage = migration.upgrade(createCatalogMessage(oldSchema)); + @Test + public void testDowngradeWeirdSchemas() { + // old_style_schema isn't actually valid (i.e. v1 schemas should always be using $ref) + // but we should check that it behaves well anyway + doTest( + """ + { + "type": "object", + "properties": { + "old_style_schema": {"type": "string"} + } + } + """, + """ + { + "type": "object", + "properties": { + "old_style_schema": {"type": "string"} + } + } + """); + } + + private AirbyteMessage createCatalogMessage(JsonNode schema) { + return new AirbyteMessage().withType(AirbyteMessage.Type.CATALOG) + .withCatalog( + new AirbyteCatalog().withStreams(List.of(new AirbyteStream().withJsonSchema( + schema)))); + } - JsonNode expectedSchema = Jsons.deserialize(schemaString); - assertEquals(expectedSchema, upgradedMessage.getCatalog().getStreams().get(0).getJsonSchema()); } - private io.airbyte.protocol.models.v0.AirbyteMessage createRecordMessage(JsonNode data) { - return new io.airbyte.protocol.models.v0.AirbyteMessage().withType(io.airbyte.protocol.models.v0.AirbyteMessage.Type.RECORD) - .withRecord(new io.airbyte.protocol.models.v0.AirbyteRecordMessage().withData(data)); + @Nested + public class RecordDowngradeTest { + + private static final String STREAM_NAME = "foo_stream"; + private static final String NAMESPACE_NAME = "foo_namespace"; + + @Test + public void testBasicDowngrade() { + ConfiguredAirbyteCatalog catalog = createConfiguredAirbyteCatalog( + """ + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + """); + JsonNode oldData = Jsons.deserialize( + """ + "42" + """); + + io.airbyte.protocol.models.v0.AirbyteMessage downgradedMessage = new AirbyteMessageMigrationV1(catalog, validator) + .downgrade(createRecordMessage(oldData)); + + io.airbyte.protocol.models.v0.AirbyteMessage expectedMessage = Jsons.deserialize( + """ + { + "type": "RECORD", + "record": { + "stream": "foo_stream", + "namespace": "foo_namespace", + "data": 42 + } + } + """, + io.airbyte.protocol.models.v0.AirbyteMessage.class); + assertEquals(expectedMessage, downgradedMessage); + } + + /** + * Utility method to use the given catalog to downgrade the oldData, and assert that the result is + * equal to expectedDataString + * + * @param schemaString The JSON schema of the record + * @param oldDataString The data of the record to be downgraded + * @param expectedDataString The expected data after downgrading + */ + private void doTest(String schemaString, String oldDataString, String expectedDataString) { + ConfiguredAirbyteCatalog catalog = createConfiguredAirbyteCatalog(schemaString); + JsonNode oldData = Jsons.deserialize(oldDataString); + + io.airbyte.protocol.models.v0.AirbyteMessage downgradedMessage = new AirbyteMessageMigrationV1(catalog, validator) + .downgrade(createRecordMessage(oldData)); + + JsonNode expectedDowngradedRecord = Jsons.deserialize(expectedDataString); + assertEquals(expectedDowngradedRecord, downgradedMessage.getRecord().getData()); + } + + @Test + public void testNestedDowngrade() { + doTest( + """ + { + "type": "object", + "properties": { + "int": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "num": {"$ref": "WellKnownTypes.json#/definitions/Number"}, + "binary": {"$ref": "WellKnownTypes.json#/definitions/BinaryData"}, + "bool": {"$ref": "WellKnownTypes.json#/definitions/Boolean"}, + "object": { + "type": "object", + "properties": { + "int": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "arr": { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + } + } + }, + "array": { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "array_multitype": { + "type": "array", + "items": [{"$ref": "WellKnownTypes.json#/definitions/Integer"}, {"$ref": "WellKnownTypes.json#/definitions/String"}] + }, + "oneof": { + "type": "array", + "items": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + {"$ref": "WellKnownTypes.json#/definitions/Boolean"} + ] + } + } + } + } + """, + """ + { + "int": "42", + "num": "43.2", + "string": "42", + "bool": true, + "object": { + "int": "42" + }, + "array": ["42"], + "array_multitype": ["42", "42"], + "oneof": ["42", true], + "additionalProperty": "42" + } + """, + """ + { + "int": 42, + "num": 43.2, + "string": "42", + "bool": true, + "object": { + "int": 42 + }, + "array": [42], + "array_multitype": [42, "42"], + "oneof": [42, true], + "additionalProperty": "42" + } + """); + } + + @Test + public void testWeirdDowngrade() { + doTest( + """ + { + "type": "object", + "properties": { + "raw_int": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "raw_num": {"$ref": "WellKnownTypes.json#/definitions/Number"}, + "bad_int": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "typeless_object": { + "properties": { + "foo": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + } + }, + "typeless_array": { + "items": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "arr_obj_union1": { + "type": ["array", "object"], + "items": { + "type": "object", + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "name": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + }, + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "name": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + }, + "arr_obj_union2": { + "type": ["array", "object"], + "items": { + "type": "object", + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "name": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + }, + "properties": { + "id": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "name": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + }, + "empty_oneof": { + "oneOf": [] + } + } + } + """, + """ + { + "raw_int": 42, + "raw_num": 43.2, + "bad_int": "foo", + "typeless_object": { + "foo": "42" + }, + "typeless_array": ["42"], + "arr_obj_union1": [{"id": "42", "name": "arst"}, {"id": "43", "name": "qwfp"}], + "arr_obj_union2": {"id": "42", "name": "arst"}, + "empty_oneof": "42" + } + """, + """ + { + "raw_int": 42, + "raw_num": 43.2, + "bad_int": "foo", + "typeless_object": { + "foo": 42 + }, + "typeless_array": [42], + "arr_obj_union1": [{"id": 42, "name": "arst"}, {"id": 43, "name": "qwfp"}], + "arr_obj_union2": {"id": 42, "name": "arst"}, + "empty_oneof": "42" + } + """); + } + + @Test + public void testEmptySchema() { + doTest( + """ + { + "type": "object", + "properties": { + "empty_schema_primitive": {}, + "empty_schema_array": {}, + "empty_schema_object": {}, + "implicit_array": { + "items": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "implicit_object": { + "properties": { + "foo": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + } + } + } + } + """, + """ + { + "empty_schema_primitive": "42", + "empty_schema_array": ["42", false], + "empty_schema_object": {"foo": "42"}, + "implicit_array": ["42"], + "implicit_object": {"foo": "42"} + } + """, + """ + { + "empty_schema_primitive": "42", + "empty_schema_array": ["42", false], + "empty_schema_object": {"foo": "42"}, + "implicit_array": [42], + "implicit_object": {"foo": 42} + } + """); + } + + @Test + public void testBacktracking() { + // These test cases verify that we correctly choose the most-correct oneOf option. + doTest( + """ + { + "type": "object", + "properties": { + "valid_option": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/Boolean"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + {"$ref": "WellKnownTypes.json#/definitions/String"} + ] + }, + "all_invalid": { + "oneOf": [ + { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/Boolean"} + } + ] + }, + "nested_oneof": { + "oneOf": [ + { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + { + "type": "array", + "items": { + "type": "object", + "properties": { + "foo": { + "oneOf": [ + {"$ref": "WellKnownTypes.json#/definitions/Boolean"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + } + } + } + } + ] + }, + "mismatched_primitive": { + "oneOf": [ + { + "type": "object", + "properties": { + "foo": {"type": "object"}, + "bar": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + }, + { + "type": "object", + "properties": { + "foo": {"$ref": "WellKnownTypes.json#/definitions/Boolean"}, + "bar": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + } + } + ] + }, + "mismatched_text": { + "oneOf": [ + { + "type": "object", + "properties": { + "foo": {"type": "object"}, + "bar": {"$ref": "WellKnownTypes.json#/definitions/String"} + } + }, + { + "type": "object", + "properties": { + "foo": {"$ref": "WellKnownTypes.json#/definitions/String"}, + "bar": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + } + } + ] + }, + "mismatch_array": { + "oneOf": [ + { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + { + "type": "array", + "items": [ + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/String"}, + {"$ref": "WellKnownTypes.json#/definitions/Integer"} + ] + } + ] + } + } + } + """, + """ + { + "valid_option": "42", + "all_invalid": ["42", "arst"], + "nested_oneof": [{"foo": "42"}], + "mismatched_primitive": { + "foo": true, + "bar": "42" + }, + "mismatched_text": { + "foo": "bar", + "bar": "42" + }, + "mismatch_array": ["arst", "41", "42"] + } + """, + """ + { + "valid_option": 42, + "all_invalid": [42, "arst"], + "nested_oneof": [{"foo": 42}], + "mismatched_primitive": { + "foo": true, + "bar": 42 + }, + "mismatched_text": { + "foo": "bar", + "bar": 42 + }, + "mismatch_array": ["arst", "41", 42] + } + """); + } + + @Test + public void testIncorrectSchema() { + doTest( + """ + { + "type": "object", + "properties": { + "bad_int": {"$ref": "WellKnownTypes.json#/definitions/Integer"}, + "bad_int_array": { + "type": "array", + "items": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + }, + "bad_int_obj": { + "type": "object", + "properties": { + "foo": {"$ref": "WellKnownTypes.json#/definitions/Integer"} + } + } + } + } + """, + """ + { + "bad_int": "arst", + "bad_int_array": ["arst"], + "bad_int_obj": {"foo": "arst"} + } + """, + """ + { + "bad_int": "arst", + "bad_int_array": ["arst"], + "bad_int_obj": {"foo": "arst"} + } + """); + } + + private ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(String schema) { + return new ConfiguredAirbyteCatalog() + .withStreams(List.of(new ConfiguredAirbyteStream().withStream(new io.airbyte.protocol.models.AirbyteStream() + .withName(STREAM_NAME) + .withNamespace(NAMESPACE_NAME) + .withJsonSchema(Jsons.deserialize(schema))))); + } + + private AirbyteMessage createRecordMessage(JsonNode data) { + return new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withNamespace(NAMESPACE_NAME).withData(data)); + } + } } diff --git a/airbyte-commons-protocol/src/test/resources/WellKnownTypes.json b/airbyte-commons-protocol/src/test/resources/WellKnownTypes.json new file mode 100644 index 000000000000..95d2ff9e26fa --- /dev/null +++ b/airbyte-commons-protocol/src/test/resources/WellKnownTypes.json @@ -0,0 +1,65 @@ +{ + "definitions": { + "String": { + "type": "string", + "description": "Arbitrary text" + }, + "BinaryData": { + "type": "string", + "description": "Arbitrary binary data. Represented as base64-encoded strings in the JSON transport. In the future, if we support other transports, may be encoded differently.\n", + "pattern": "^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$" + }, + "Date": { + "type": "string", + "pattern": "^\\d{4}-\\d{2}-\\d{2}( BC)?$", + "description": "RFC 3339\u00a75.6's full-date format, extended with BC era support" + }, + "TimestampWithTimezone": { + "type": "string", + "pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+\\-]\\d{1,2}:\\d{2})( BC)?$", + "description": "An instant in time. Frequently simply referred to as just a timestamp, or timestamptz. Uses RFC 3339\u00a75.6's date-time format, requiring a \"T\" separator, and extended with BC era support. Note that we do _not_ accept Unix epochs here.\n" + }, + "TimestampWithoutTimezone": { + "type": "string", + "pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?( BC)?$", + "description": "Also known as a localdatetime, or just datetime. Under RFC 3339\u00a75.6, this would be represented as `full-date \"T\" partial-time`, extended with BC era support.\n" + }, + "TimeWithTimezone": { + "type": "string", + "pattern": "^\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+\\-]\\d{1,2}:\\d{2})$", + "description": "An RFC 3339\u00a75.6 full-time" + }, + "TimeWithoutTimezone": { + "type": "string", + "pattern": "^\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?$", + "description": "An RFC 3339\u00a75.6 partial-time" + }, + "Number": { + "type": "string", + "oneOf": [ + { + "pattern": "-?(0|[0-9]\\d*)(\\.\\d+)?" + }, + { + "enum": ["Infinity", "-Infinity", "NaN"] + } + ], + "description": "Note the mix of regex validation for normal numbers, and enum validation for special values." + }, + "Integer": { + "type": "string", + "oneOf": [ + { + "pattern": "-?(0|[0-9]\\d*)" + }, + { + "enum": ["Infinity", "-Infinity", "NaN"] + } + ] + }, + "Boolean": { + "type": "boolean", + "description": "Note the direct usage of a primitive boolean rather than string. Unlike Numbers and Integers, we don't expect unusual values here." + } + } +} diff --git a/airbyte-json-validation/src/main/java/io/airbyte/validation/json/JsonSchemaValidator.java b/airbyte-json-validation/src/main/java/io/airbyte/validation/json/JsonSchemaValidator.java index 5c9507bee977..b38c987829ff 100644 --- a/airbyte-json-validation/src/main/java/io/airbyte/validation/json/JsonSchemaValidator.java +++ b/airbyte-json-validation/src/main/java/io/airbyte/validation/json/JsonSchemaValidator.java @@ -58,7 +58,7 @@ public JsonSchemaValidator() { * @param baseUri The base URI for schema resolution */ @VisibleForTesting - protected JsonSchemaValidator(URI baseUri) { + public JsonSchemaValidator(URI baseUri) { this.jsonSchemaFactory = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V7); this.baseUri = baseUri; } diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java index 01658d114856..b913f62beb1e 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java @@ -4,8 +4,10 @@ package io.airbyte.protocol.models; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airbyte.commons.json.Jsons; import java.util.Map; import java.util.Set; @@ -17,16 +19,19 @@ public class JsonSchemaReferenceTypes { "integer", "boolean"); - public static final String STRING_REFERENCE = "WellKnownTypes.json#/definitions/String"; - public static final String BINARY_DATA_REFERENCE = "WellKnownTypes.json#/definitions/BinaryData"; - public static final String NUMBER_REFERENCE = "WellKnownTypes.json#/definitions/Number"; - public static final String INTEGER_REFERENCE = "WellKnownTypes.json#/definitions/Integer"; - public static final String BOOLEAN_REFERENCE = "WellKnownTypes.json#/definitions/Boolean"; - public static final String DATE_REFERENCE = "WellKnownTypes.json#/definitions/Date"; - public static final String TIMESTAMP_WITH_TIMEZONE_REFERENCE = "WellKnownTypes.json#/definitions/TimestampWithTimezone"; - public static final String TIMESTAMP_WITHOUT_TIMEZONE_REFERENCE = "WellKnownTypes.json#/definitions/TimestampWithoutTimezone"; - public static final String TIME_WITH_TIMEZONE_REFERENCE = "WellKnownTypes.json#/definitions/TimeWithTimezone"; - public static final String TIME_WITHOUT_TIMEZONE_REFERENCE = "WellKnownTypes.json#/definitions/TimeWithoutTimezone"; + public static final String REF_KEY = "$ref"; + + public static final String WELL_KNOWN_TYPES_FILENAME = "WellKnownTypes.json"; + public static final String STRING_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/String"; + public static final String BINARY_DATA_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/BinaryData"; + public static final String NUMBER_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/Number"; + public static final String INTEGER_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/Integer"; + public static final String BOOLEAN_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/Boolean"; + public static final String DATE_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/Date"; + public static final String TIMESTAMP_WITH_TIMEZONE_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/TimestampWithTimezone"; + public static final String TIMESTAMP_WITHOUT_TIMEZONE_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/TimestampWithoutTimezone"; + public static final String TIME_WITH_TIMEZONE_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/TimeWithTimezone"; + public static final String TIME_WITHOUT_TIMEZONE_REFERENCE = WELL_KNOWN_TYPES_FILENAME + "#/definitions/TimeWithoutTimezone"; /** * This is primarily useful for migrating from protocol v0 to v1. It provides a mapping from the old @@ -44,4 +49,47 @@ public class JsonSchemaReferenceTypes { "boolean", BOOLEAN_REFERENCE, "date", DATE_REFERENCE); + public static final Map REFERENCE_TYPE_TO_OLD_TYPE = ImmutableMap.of( + TIMESTAMP_WITH_TIMEZONE_REFERENCE, + (ObjectNode) Jsons.deserialize( + """ + {"type": "string", "airbyte_type": "timestamp_with_timezone", "format": "date-time"} + """), + TIMESTAMP_WITHOUT_TIMEZONE_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "string", "airbyte_type": "timestamp_without_timezone", "format": "date-time"} + """), + TIME_WITH_TIMEZONE_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "string", "airbyte_type": "time_with_timezone", "format": "time"} + """), + TIME_WITHOUT_TIMEZONE_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "string", "airbyte_type": "time_without_timezone", "format": "time"} + """), + DATE_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "string", "format": "date"} + """), + INTEGER_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "integer"} + """), + NUMBER_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "number"} + """), + BOOLEAN_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "boolean"} + """), + STRING_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "string"} + """), + BINARY_DATA_REFERENCE, (ObjectNode) Jsons.deserialize( + """ + {"type": "string", "contentEncoding": "base64"} + """)); + }