diff --git a/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaData.java b/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaData.java index 03877d160d7..dfbf730791a 100644 --- a/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaData.java +++ b/json-schema-converter/src/main/java/io/confluent/connect/json/JsonSchemaData.java @@ -196,7 +196,7 @@ public class JsonSchemaData { String fieldNamePrefix = generalizedSumTypeSupport ? GENERALIZED_TYPE_UNION_FIELD_PREFIX : JSON_TYPE_ONE_OF + ".field."; - int numMatchingProperties = -1; + int numMatchingProperties = 0; Field matchingField = null; for (Field field : schema.fields()) { Schema fieldSchema = field.schema(); @@ -268,7 +268,7 @@ private static boolean isInstanceOfSchemaTypeForSimpleSchema(Schema fieldSchema, private static int matchStructSchema(Schema fieldSchema, JsonNode value) { if (fieldSchema.type() != Schema.Type.STRUCT || !value.isObject()) { - return -1; + return 0; } Set schemaFields = fieldSchema.fields() .stream() @@ -280,7 +280,15 @@ private static int matchStructSchema(Schema fieldSchema, JsonNode value) { } Set intersectSet = new HashSet<>(schemaFields); intersectSet.retainAll(objectFields); - return intersectSet.size(); + + int childrenMatchFactor = 0; + for (String intersectedElement: intersectSet) { + Schema childSchema = fieldSchema.field(intersectedElement).schema(); + JsonNode childValue = value.get(intersectedElement); + childrenMatchFactor += matchStructSchema(childSchema, childValue); + } + + return intersectSet.size() + childrenMatchFactor; } // Convert values in Kafka Connect form into their logical types. These logical converters are diff --git a/json-schema-converter/src/test/java/io/confluent/connect/json/JsonSchemaDataTest.java b/json-schema-converter/src/test/java/io/confluent/connect/json/JsonSchemaDataTest.java index b9394a70db1..81897b949bd 100644 --- a/json-schema-converter/src/test/java/io/confluent/connect/json/JsonSchemaDataTest.java +++ b/json-schema-converter/src/test/java/io/confluent/connect/json/JsonSchemaDataTest.java @@ -1581,6 +1581,63 @@ public void testToConnectUnionDifferentStruct() { checkNonObjectConversion(connectSchema, expected, schema, obj); } + @Test + public void testToConnectUnionSecondNestedSchemas() { + StringSchema stringSchema = StringSchema.builder() + .unprocessedProperties(ImmutableMap.of("connect.index", 0)) + .build(); + ObjectSchema firstSchemaNested = ObjectSchema.builder() + .addPropertySchema("differentFieldNameA", stringSchema) + .build(); + ObjectSchema firstSchema = ObjectSchema.builder() + .addPropertySchema("commonFieldName", firstSchemaNested) + .unprocessedProperties(ImmutableMap.of("connect.index", 0)) + .build(); + ObjectSchema secondSchemaNested = ObjectSchema.builder() + .addPropertySchema("differentFieldNameB", stringSchema) + .build(); + ObjectSchema secondSchema = ObjectSchema.builder() + .addPropertySchema("commonFieldName", secondSchemaNested) + .unprocessedProperties(ImmutableMap.of("connect.index", 1)) + .build(); + CombinedSchema schema = CombinedSchema.oneOf(ImmutableList.of(firstSchema, secondSchema)) + .build(); + + Schema field0Nested = SchemaBuilder.struct().field("differentFieldNameA", Schema.STRING_SCHEMA).build(); + Schema field0 = SchemaBuilder.struct() + .field("commonFieldName", field0Nested) + .optional() + .build(); + Schema field1Nested = SchemaBuilder.struct().field("differentFieldNameB", Schema.STRING_SCHEMA).build(); + Schema field1 = SchemaBuilder.struct() + .field("commonFieldName", field1Nested) + .optional() + .build(); + Schema connectSchema = SchemaBuilder.struct().name(JSON_TYPE_ONE_OF) + .field(JSON_TYPE_ONE_OF + ".field.0", field0) + .field(JSON_TYPE_ONE_OF + ".field.1", field1) + .build(); + + ObjectNode firstObj = JsonNodeFactory.instance.objectNode() + .set("differentFieldNameA", TextNode.valueOf("sample string A")); + ObjectNode secondObj = JsonNodeFactory.instance.objectNode() + .set("differentFieldNameB", TextNode.valueOf("sample string B")); + Struct firstStruct = new Struct(field0Nested).put("differentFieldNameA", "sample string A"); + Struct secondStruct = new Struct(field1Nested).put("differentFieldNameB", "sample string B"); + + ObjectNode obj = JsonNodeFactory.instance.objectNode(). + set("commonFieldName", firstObj); + Struct struct = new Struct(field0).put("commonFieldName", firstStruct); + Struct expected = new Struct(connectSchema).put(JSON_TYPE_ONE_OF + ".field.0", struct); + checkNonObjectConversion(connectSchema, expected, schema, obj); + + obj = JsonNodeFactory.instance.objectNode(). + set("commonFieldName", secondObj); + struct = new Struct(field1).put("commonFieldName", secondStruct); + expected = new Struct(connectSchema).put(JSON_TYPE_ONE_OF + ".field.1", struct); + checkNonObjectConversion(connectSchema, expected, schema, obj); + } + @Test public void testToConnectMapOptionalValue() { testToConnectMapOptional("some value");