From af44d2e82df832a37b6e130fc4e788591d55ea07 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Mon, 25 Mar 2024 09:34:32 -0700 Subject: [PATCH] Merge pull request #3059 from MaciasMucias/master fix: nested schemas were not considered in matching the correct Json Union schema --- .../connect/json/JsonSchemaData.java | 14 ++++- .../connect/json/JsonSchemaDataTest.java | 57 +++++++++++++++++++ 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaData.java b/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaData.java index 85af5c5d930..d2a9b5dc674 100644 --- a/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaData.java +++ b/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaData.java @@ -183,7 +183,7 @@ public class JsonSchemaData { }); TO_CONNECT_CONVERTERS.put(Schema.Type.STRUCT, (schema, value) -> { if (schema.name() != null && schema.name().equals(JSON_TYPE_ONE_OF)) { - int numMatchingProperties = -1; + int numMatchingProperties = 0; Field matchingField = null; for (Field field : schema.fields()) { Schema fieldSchema = field.schema(); @@ -255,7 +255,7 @@ private static boolean isInstanceOfSchemaTypeForSimpleSchema(Schema fieldSchema, private static int matchStructSchema(Schema fieldSchema, JsonNode value) { if (fieldSchema.type() != Schema.Type.STRUCT || !value.isObject()) { - return -1; + return 0; } Set schemaFields = fieldSchema.fields() .stream() @@ -267,7 +267,15 @@ private static int matchStructSchema(Schema fieldSchema, JsonNode value) { } Set intersectSet = new HashSet<>(schemaFields); intersectSet.retainAll(objectFields); - return intersectSet.size(); + + int childrenMatchFactor = 0; + for (String intersectedElement: intersectSet) { + Schema childSchema = fieldSchema.field(intersectedElement).schema(); + JsonNode childValue = value.get(intersectedElement); + childrenMatchFactor += matchStructSchema(childSchema, childValue); + } + + return intersectSet.size() + childrenMatchFactor; } // Convert values in Kafka Connect form into their logical types. These logical converters are diff --git a/json-schema-converter/src/test/java/io/confluent/connect/json/JsonSchemaDataTest.java b/json-schema-converter/src/test/java/io/confluent/connect/json/JsonSchemaDataTest.java index 1cdee6669ce..5358a9e8ae8 100644 --- a/json-schema-converter/src/test/java/io/confluent/connect/json/JsonSchemaDataTest.java +++ b/json-schema-converter/src/test/java/io/confluent/connect/json/JsonSchemaDataTest.java @@ -1382,6 +1382,63 @@ public void testToConnectUnionDifferentStruct() { checkNonObjectConversion(connectSchema, expected, schema, obj); } + @Test + public void testToConnectUnionSecondNestedSchemas() { + StringSchema stringSchema = StringSchema.builder() + .unprocessedProperties(ImmutableMap.of("connect.index", 0)) + .build(); + ObjectSchema firstSchemaNested = ObjectSchema.builder() + .addPropertySchema("differentFieldNameA", stringSchema) + .build(); + ObjectSchema firstSchema = ObjectSchema.builder() + .addPropertySchema("commonFieldName", firstSchemaNested) + .unprocessedProperties(ImmutableMap.of("connect.index", 0)) + .build(); + ObjectSchema secondSchemaNested = ObjectSchema.builder() + .addPropertySchema("differentFieldNameB", stringSchema) + .build(); + ObjectSchema secondSchema = ObjectSchema.builder() + .addPropertySchema("commonFieldName", secondSchemaNested) + .unprocessedProperties(ImmutableMap.of("connect.index", 1)) + .build(); + CombinedSchema schema = CombinedSchema.oneOf(ImmutableList.of(firstSchema, secondSchema)) + .build(); + + Schema field0Nested = SchemaBuilder.struct().field("differentFieldNameA", Schema.STRING_SCHEMA).build(); + Schema field0 = SchemaBuilder.struct() + .field("commonFieldName", field0Nested) + .optional() + .build(); + Schema field1Nested = SchemaBuilder.struct().field("differentFieldNameB", Schema.STRING_SCHEMA).build(); + Schema field1 = SchemaBuilder.struct() + .field("commonFieldName", field1Nested) + .optional() + .build(); + Schema connectSchema = SchemaBuilder.struct().name(JSON_TYPE_ONE_OF) + .field(JSON_TYPE_ONE_OF + ".field.0", field0) + .field(JSON_TYPE_ONE_OF + ".field.1", field1) + .build(); + + ObjectNode firstObj = JsonNodeFactory.instance.objectNode() + .set("differentFieldNameA", TextNode.valueOf("sample string A")); + ObjectNode secondObj = JsonNodeFactory.instance.objectNode() + .set("differentFieldNameB", TextNode.valueOf("sample string B")); + Struct firstStruct = new Struct(field0Nested).put("differentFieldNameA", "sample string A"); + Struct secondStruct = new Struct(field1Nested).put("differentFieldNameB", "sample string B"); + + ObjectNode obj = JsonNodeFactory.instance.objectNode(). + set("commonFieldName", firstObj); + Struct struct = new Struct(field0).put("commonFieldName", firstStruct); + Struct expected = new Struct(connectSchema).put(JSON_TYPE_ONE_OF + ".field.0", struct); + checkNonObjectConversion(connectSchema, expected, schema, obj); + + obj = JsonNodeFactory.instance.objectNode(). + set("commonFieldName", secondObj); + struct = new Struct(field1).put("commonFieldName", secondStruct); + expected = new Struct(connectSchema).put(JSON_TYPE_ONE_OF + ".field.1", struct); + checkNonObjectConversion(connectSchema, expected, schema, obj); + } + @Test public void testToConnectMapOptionalValue() { testToConnectMapOptional("some value");