Skip to content

Commit

Permalink
Merge branch '7.2.x' into pr_merge_from_7_1_x_to_7_2_x
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Mar 26, 2024
2 parents 517c178 + b2ee170 commit ba34552
Show file tree
Hide file tree
Showing 87 changed files with 2,863 additions and 509 deletions.
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# See go/codeowners - automatically generated for confluentinc/schema-registry:
* @confluentinc/data-governance
2 changes: 1 addition & 1 deletion avro-converter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-parent</artifactId>
<version>7.1.12-0</version>
<version>7.2.10-0</version>
</parent>

<licenses>
Expand Down
7 changes: 6 additions & 1 deletion avro-data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-parent</artifactId>
<version>7.1.12-0</version>
<version>7.2.10-0</version>
</parent>

<licenses>
Expand Down Expand Up @@ -42,6 +42,11 @@
<artifactId>kafka-avro-serializer</artifactId>
<version>${io.confluent.schema-registry.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-converter</artifactId>
<version>${io.confluent.schema-registry.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
Expand Down
171 changes: 104 additions & 67 deletions avro-data/src/main/java/io/confluent/connect/avro/AvroData.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@

public class AvroDataConfig extends AbstractConfig {

public static final String GENERALIZED_SUM_TYPE_SUPPORT_CONFIG = "generalized.sum.type.support";
public static final boolean GENERALIZED_SUM_TYPE_SUPPORT_DEFAULT = false;
public static final String GENERALIZED_SUM_TYPE_SUPPORT_DOC =
"Toggle for enabling/disabling generalized sum type support: interoperability of enum/union "
+ "with other schema formats";

public static final String ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG = "enhanced.avro.schema.support";
public static final boolean ENHANCED_AVRO_SCHEMA_SUPPORT_DEFAULT = false;
public static final String ENHANCED_AVRO_SCHEMA_SUPPORT_DOC =
Expand Down Expand Up @@ -61,6 +67,11 @@ public class AvroDataConfig extends AbstractConfig {

public static ConfigDef baseConfigDef() {
return new ConfigDef()
.define(GENERALIZED_SUM_TYPE_SUPPORT_CONFIG,
ConfigDef.Type.BOOLEAN,
GENERALIZED_SUM_TYPE_SUPPORT_DEFAULT,
ConfigDef.Importance.MEDIUM,
GENERALIZED_SUM_TYPE_SUPPORT_DOC)
.define(ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG,
ConfigDef.Type.BOOLEAN,
ENHANCED_AVRO_SCHEMA_SUPPORT_DEFAULT,
Expand Down Expand Up @@ -88,6 +99,10 @@ public AvroDataConfig(Map<?, ?> props) {
super(baseConfigDef(), props);
}

public boolean isGeneralizedSumTypeSupport() {
return this.getBoolean(GENERALIZED_SUM_TYPE_SUPPORT_CONFIG);
}

public boolean isEnhancedAvroSchemaSupport() {
return this.getBoolean(ENHANCED_AVRO_SCHEMA_SUPPORT_CONFIG);
}
Expand Down
151 changes: 150 additions & 1 deletion avro-data/src/test/java/io/confluent/connect/avro/AvroDataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import java.util.LinkedHashMap;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -211,6 +212,37 @@ public void testFromConnectEnum() {
avroData);
}

@Test
public void testFromConnectEnumWithGeneralizedSumTypeSupport() {
avroData = new AvroData(new AvroDataConfig.Builder()
.with(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 2)
.with(AvroDataConfig.GENERALIZED_SUM_TYPE_SUPPORT_CONFIG, true)
.build());
// Enums are just converted to strings, original enum is preserved in parameters
org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.builder()
.enumeration("TestEnum")
.doc("some documentation")
.symbols("foo", "bar", "baz");
Map<String, String> params = new LinkedHashMap<>();
params.put("io.confluent.connect.avro.enum.doc.TestEnum", "some documentation");
params.put("org.apache.kafka.connect.data.Enum", "TestEnum");
params.put("org.apache.kafka.connect.data.Enum.foo", "0");
params.put("org.apache.kafka.connect.data.Enum.bar", "1");
params.put("org.apache.kafka.connect.data.Enum.baz", "2");
avroSchema.addProp("connect.parameters", params);
avroSchema.addProp("connect.name", "TestEnum");
SchemaBuilder builder = SchemaBuilder.string().name("TestEnum");
builder.parameter(AVRO_ENUM_DOC_PREFIX_PROP + "TestEnum", "some documentation");
builder.parameter(GENERALIZED_TYPE_ENUM, "TestEnum");
int i = 0;
for(String enumSymbol : new String[]{"foo", "bar", "baz"}) {
builder.parameter(GENERALIZED_TYPE_ENUM+"."+enumSymbol, String.valueOf(i++));
}

checkNonRecordConversion(avroSchema, new GenericData.EnumSymbol(avroSchema, "bar"),
builder.build(), "bar", avroData);
}

@Test
public void testFromConnectMapWithStringKey() {
final Schema schema = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA);
Expand Down Expand Up @@ -314,7 +346,54 @@ public void testFromConnectFixedUnion() {
assertEquals(2,
genericData.resolveUnion(unionSchema, avroData.fromConnectData(union, unionSameOther)));
}


@Test
public void testFromConnectUnionWithGeneralizedSumTypeSupport() {
avroData = new AvroData(new AvroDataConfig.Builder()
.with(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 2)
.with(AvroDataConfig.GENERALIZED_SUM_TYPE_SUPPORT_CONFIG, true)
.build());
// Make sure we handle primitive types and named types properly by using a variety of types
org.apache.avro.Schema avroRecordSchema1 = org.apache.avro.SchemaBuilder.builder()
.record("Test1").fields().requiredInt("test").endRecord();
// Add connect name
avroRecordSchema1.addProp("connect.name", "Test1");
org.apache.avro.Schema avroRecordSchema2 = org.apache.avro.SchemaBuilder.builder()
.record("Test2").namespace("io.confluent").fields().requiredInt("test").endRecord();
// Add connect name
avroRecordSchema2.addProp("connect.name", "io.confluent.Test2");
org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.builder().unionOf()
.intType().and()
.stringType().and()
.type(avroRecordSchema1).and()
.type(avroRecordSchema2)
.endUnion();

Schema recordSchema1 = SchemaBuilder.struct().name("Test1")
.field("test", Schema.INT32_SCHEMA).optional().build();
Schema recordSchema2 = SchemaBuilder.struct().name("io.confluent.Test2")
.field("test", Schema.INT32_SCHEMA).optional().build();
Schema schema = SchemaBuilder.struct()
.name("connect_union_0")
.parameter("org.apache.kafka.connect.data.Union", "connect_union_0")
.field("connect_union_field_0", Schema.OPTIONAL_INT32_SCHEMA)
.field("connect_union_field_1", Schema.OPTIONAL_STRING_SCHEMA)
.field("connect_union_field_2", recordSchema1)
.field("connect_union_field_3", recordSchema2)
.build();
assertEquals(12,
avroData.fromConnectData(schema, new Struct(schema).put("connect_union_field_0", 12)));
assertEquals("teststring",
avroData.fromConnectData(schema, new Struct(schema).put("connect_union_field_1", "teststring")));

Struct schema1Test = new Struct(schema).put("connect_union_field_2", new Struct(recordSchema1).put("test", 12));
GenericRecord record1Test = new GenericRecordBuilder(avroRecordSchema1).set("test", 12).build();
Struct schema2Test = new Struct(schema).put("connect_union_field_3", new Struct(recordSchema2).put("test", 12));
GenericRecord record2Test = new GenericRecordBuilder(avroRecordSchema2).set("test", 12).build();
assertEquals(record1Test, avroData.fromConnectData(schema, schema1Test));
assertEquals(record2Test, avroData.fromConnectData(schema, schema2Test));
}

@Test
public void testFromConnectWithInvalidName() {
AvroDataConfig avroDataConfig = new AvroDataConfig.Builder()
Expand Down Expand Up @@ -2123,6 +2202,51 @@ public void testToConnectUnionWithEnhanced() {
avroData.toConnectData(avroSchema, record2Test));
}

@Test
public void testToConnectUnionWithGeneralizedSumTypeSupport() {
avroData = new AvroData(new AvroDataConfig.Builder()
.with(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 2)
.with(AvroDataConfig.GENERALIZED_SUM_TYPE_SUPPORT_CONFIG, true)
.build());
// Make sure we handle primitive types and named types properly by using a variety of types
org.apache.avro.Schema avroRecordSchema1 = org.apache.avro.SchemaBuilder.builder()
.record("Test1").fields().requiredInt("test").endRecord();
org.apache.avro.Schema avroRecordSchema2 = org.apache.avro.SchemaBuilder.builder()
.record("Test2").namespace("io.confluent").fields().requiredInt("test").endRecord();
org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.builder().unionOf()
.intType().and()
.stringType().and()
.type(avroRecordSchema1).and()
.type(avroRecordSchema2)
.endUnion();

Schema recordSchema1 = SchemaBuilder.struct().name("Test1")
.field("test", Schema.INT32_SCHEMA).optional().build();
Schema recordSchema2 = SchemaBuilder.struct().name("io.confluent.Test2")
.field("test", Schema.INT32_SCHEMA).optional().build();
Schema schema = SchemaBuilder.struct()
.name("connect_union_0")
.parameter("org.apache.kafka.connect.data.Union", "connect_union_0")
.field("connect_union_field_0", Schema.OPTIONAL_INT32_SCHEMA)
.field("connect_union_field_1", Schema.OPTIONAL_STRING_SCHEMA)
.field("connect_union_field_2", recordSchema1)
.field("connect_union_field_3", recordSchema2)
.build();
assertEquals(new SchemaAndValue(schema, new Struct(schema).put("connect_union_field_0", 12)),
avroData.toConnectData(avroSchema, 12));
assertEquals(new SchemaAndValue(schema, new Struct(schema).put("connect_union_field_1", "teststring")),
avroData.toConnectData(avroSchema, "teststring"));

Struct schema1Test = new Struct(schema).put("connect_union_field_2", new Struct(recordSchema1).put("test", 12));
GenericRecord record1Test = new GenericRecordBuilder(avroRecordSchema1).set("test", 12).build();
Struct schema2Test = new Struct(schema).put("connect_union_field_3", new Struct(recordSchema2).put("test", 12));
GenericRecord record2Test = new GenericRecordBuilder(avroRecordSchema2).set("test", 12).build();
assertEquals(new SchemaAndValue(schema, schema1Test),
avroData.toConnectData(avroSchema, record1Test));
assertEquals(new SchemaAndValue(schema, schema2Test),
avroData.toConnectData(avroSchema, record2Test));
}

@Test(expected = DataException.class)
public void testToConnectUnionRecordConflict() {
// If the records have the same name but are in different namespaces, we don't support this
Expand Down Expand Up @@ -2221,6 +2345,31 @@ public void testToConnectEnumWithNoDoc() {
avroData.toConnectData(avroSchema, new GenericData.EnumSymbol(avroSchema, "bar")));
}

@Test
public void testToConnectEnumWithGeneralizedSumTypeSupport() {
avroData = new AvroData(new AvroDataConfig.Builder()
.with(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, 2)
.with(AvroDataConfig.GENERALIZED_SUM_TYPE_SUPPORT_CONFIG, true)
.build());
// Enums are just converted to strings, original enum is preserved in parameters
org.apache.avro.Schema avroSchema = org.apache.avro.SchemaBuilder.builder()
.enumeration("TestEnum")
.doc("some documentation")
.symbols("foo", "bar", "baz");
SchemaBuilder builder = SchemaBuilder.string().name("TestEnum");
builder.parameter(AVRO_ENUM_DOC_PREFIX_PROP + "TestEnum", "some documentation");
builder.parameter(GENERALIZED_TYPE_ENUM, "TestEnum");
int i = 0;
for(String enumSymbol : new String[]{"foo", "bar", "baz"}) {
builder.parameter(GENERALIZED_TYPE_ENUM+"."+enumSymbol, String.valueOf(i++));
}

assertEquals(new SchemaAndValue(builder.build(), "bar"),
avroData.toConnectData(avroSchema, "bar"));
assertEquals(new SchemaAndValue(builder.build(), "bar"),
avroData.toConnectData(avroSchema, new GenericData.EnumSymbol(avroSchema, "bar")));
}

@Test
public void testToConnectOptionalPrimitiveWithConnectMetadata() {
Schema schema = SchemaBuilder.string().
Expand Down
2 changes: 1 addition & 1 deletion avro-serde/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-parent</artifactId>
<version>7.1.12-0</version>
<version>7.2.10-0</version>
</parent>

<licenses>
Expand Down
16 changes: 5 additions & 11 deletions avro-serializer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-parent</artifactId>
<version>7.1.12-0</version>
<version>7.2.10-0</version>
</parent>

<licenses>
Expand Down Expand Up @@ -60,16 +60,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>logredactor</artifactId>
</dependency>
</dependencies>

Expand Down
2 changes: 1 addition & 1 deletion benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-parent</artifactId>
<version>7.1.12-0</version>
<version>7.2.10-0</version>
</parent>

<artifactId>kafka-schema-registry-benchmark</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion client-console-scripts/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-parent</artifactId>
<version>7.1.12-0</version>
<version>7.2.10-0</version>
</parent>

<licenses>
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-parent</artifactId>
<version>7.1.12-0</version>
<version>7.2.10-0</version>
</parent>

<licenses>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,30 @@ default void configure(Map<String, ?> configs) {
* @param isNew whether the schema is new
* @return an optional parsed schema
*/
Optional<ParsedSchema> parseSchema(String schemaString,
List<SchemaReference> references,
boolean isNew);
default Optional<ParsedSchema> parseSchema(String schemaString,
List<SchemaReference> references,
boolean isNew) {
try {
return Optional.of(parseSchemaOrElseThrow(schemaString, references, isNew));
} catch (Exception e) {
return Optional.empty();
}
}

default Optional<ParsedSchema> parseSchema(String schemaString,
List<SchemaReference> references) {
return parseSchema(schemaString, references, false);
}

/**
* Parses a string representing a schema.
*
* @param schemaString the schema
* @param references a list of schema references
* @param isNew whether the schema is new
* @return a parsed schema or throw an error
*/
ParsedSchema parseSchemaOrElseThrow(String schemaString,
List<SchemaReference> references,
boolean isNew);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;

import io.confluent.kafka.schemaregistry.AbstractSchemaProvider;
import io.confluent.kafka.schemaregistry.ParsedSchema;
Expand Down Expand Up @@ -47,16 +46,15 @@ public String schemaType() {
}

@Override
public Optional<ParsedSchema> parseSchema(String schemaString,
List<SchemaReference> references,
boolean isNew) {
public ParsedSchema parseSchemaOrElseThrow(String schemaString,
List<SchemaReference> references,
boolean isNew) {
try {
return Optional.of(
new AvroSchema(schemaString, references, resolveReferences(references), null,
validateDefaults && isNew));
return new AvroSchema(schemaString, references, resolveReferences(references), null,
validateDefaults && isNew);
} catch (Exception e) {
log.error("Could not parse Avro schema", e);
return Optional.empty();
throw e;
}
}
}
Loading

0 comments on commit ba34552

Please sign in to comment.