Skip to content

Commit

Permalink
chore: schema id serde (#8411)
Browse files Browse the repository at this point in the history
* chore: use schema id for serialization

* test: add more tests and refactor

* chore: add tests and fix schema name

* fix: checkstyle

* test: add more test and fix test

* fix: test and property inheritence

* fix: checkstyle and comments

* fix: add ignore back

* fix: fix for optional

* fix: address comments

* test: more tests

* fix: checkstyle

* fix: use ConnectDataTranslator to deserialize schema id data
  • Loading branch information
lihaosky authored Dec 4, 2021
1 parent bff539e commit 8a11ac2
Show file tree
Hide file tree
Showing 56 changed files with 1,309 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.properties.with.CommonCreateConfigs;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaAndId;
import io.confluent.ksql.schema.ksql.inference.TopicSchemaSupplier.SchemaResult;
import io.confluent.ksql.serde.Format;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.SerdeFeature;
Expand Down Expand Up @@ -213,28 +214,36 @@ private static boolean shouldInferSchema(
* 1. If schema id is provided, format must support schema inference
*/

final String formatProp = isKey ? CommonCreateConfigs.KEY_FORMAT_PROPERTY
: CommonCreateConfigs.VALUE_FORMAT_PROPERTY;
final String schemaIdName =
isKey ? CommonCreateConfigs.KEY_SCHEMA_ID : CommonCreateConfigs.VALUE_SCHEMA_ID;
final String formatPropMsg = String.format("%s should support schema inference when %s is "
+ "provided. Current format is %s.", formatProp, schemaIdName, formatInfo.getFormat());

final Format format;
try {
format = FormatFactory.of(formatInfo);
} catch (KsqlException e) {
if (e.getMessage().contains("does not support the following configs: [schemaId]")) {
throw new KsqlException(formatPropMsg);
}
throw e;
}
final boolean hasTableElements =
isKey ? hasKeyElements(statement) : hasValueElements(statement);
if (schemaId.isPresent()) {
if (!formatSupportsSchemaInference(formatInfo)) {
final String formatProp = isKey ? CommonCreateConfigs.KEY_FORMAT_PROPERTY
: CommonCreateConfigs.VALUE_FORMAT_PROPERTY;
final String schemaIdName =
isKey ? CommonCreateConfigs.KEY_SCHEMA_ID : CommonCreateConfigs.VALUE_SCHEMA_ID;
final String msg = String.format("%s should support schema inference when %s is provided. "
+ "Current format is %s.", formatProp, schemaIdName, formatInfo.getFormat());
throw new KsqlException(msg);
if (!formatSupportsSchemaInference(format)) {
throw new KsqlException(formatPropMsg);
}
if (hasTableElements) {
final String schemaIdName =
isKey ? CommonCreateConfigs.KEY_SCHEMA_ID : CommonCreateConfigs.VALUE_SCHEMA_ID;
final String msg = "Table elements and " + schemaIdName + " cannot both exist for create "
+ "statement.";
throw new KsqlException(msg);
}
return true;
}
return !hasTableElements && formatSupportsSchemaInference(formatInfo);
return !hasTableElements && formatSupportsSchemaInference(format);
}

private static boolean hasKeyElements(
Expand All @@ -253,8 +262,8 @@ private static boolean hasValueElements(
.anyMatch(e -> !e.isKey() && !e.isPrimaryKey() && !e.isHeaders());
}

private static boolean formatSupportsSchemaInference(final FormatInfo format) {
return FormatFactory.of(format).supportsFeature(SerdeFeature.SCHEMA_INFERENCE);
private static boolean formatSupportsSchemaInference(final Format format) {
return format.supportsFeature(SerdeFeature.SCHEMA_INFERENCE);
}

private static CreateSource addSchemaFields(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ static SchemaRegistryClient createProxy(final SchemaRegistryClient delegate) {

return LimitedProxyBuilder.forClass(SchemaRegistryClient.class)
.swallow("register", anyParams(), 123)
.swallow("getId", anyParams(), 123)
.forward("getAllSubjects", methodParams(), delegate)
.forward("getSchemaById", methodParams(int.class), delegate)
.forward("getLatestSchemaMetadata", methodParams(String.class), delegate)
.forward("getSchemaBySubjectAndId", methodParams(String.class, int.class), delegate)
.forward("testCompatibility",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,8 @@ public void shouldThrowIfKeyFormatDoesNotSupportSchemaIdInference() {

// Then:
assertThat(e.getMessage(),
containsString("KAFKA does not support the following configs: [schemaId]"));
containsString("KEY_FORMAT should support schema inference when KEY_SCHEMA_ID is provided. "
+ "Current format is KAFKA."));
}

@Test
Expand All @@ -685,7 +686,8 @@ public void shouldThrowIfValueFormatDoesNotSupportSchemaIdInference() {

// Then:
assertThat(e.getMessage(),
containsString("DELIMITED does not support the following configs: [schemaId]"));
containsString("VALUE_FORMAT should support schema inference when VALUE_SCHEMA_ID is provided. "
+ "Current format is DELIMITED."));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -64,7 +66,11 @@ public static Collection<TestCase<SchemaRegistryClient>> getMethodsToTest() {
.ignore("testCompatibility", String.class, ParsedSchema.class)
.ignore("deleteSubject", String.class)
.ignore("getAllSubjects")
.ignore("getId", String.class, ParsedSchema.class)
.ignore("getId", String.class, ParsedSchema.class, boolean.class)
.ignore("getId", String.class, Schema.class)
.ignore("getVersion", String.class, ParsedSchema.class)
.ignore("getSchemaById", int.class)
.build();
}

Expand Down Expand Up @@ -94,6 +100,8 @@ public static class SupportedMethods {
@Mock
private AvroSchema schema;
@Mock
private ParsedSchema parsedSchema;
@Mock
private SchemaMetadata schemaMetadata;
private SchemaRegistryClient sandboxedClient;

Expand Down Expand Up @@ -173,5 +181,30 @@ public void shouldGetVersion() throws Exception {
// Then:
assertThat(version, is(6));
}

@Test
public void shouldGetSchemaById() throws Exception {
// Given:
when(delegate.getSchemaById(anyInt())).thenReturn(parsedSchema);

// When:
final ParsedSchema retSchema = sandboxedClient.getSchemaById(1);

// Then:
assertThat(retSchema, is(parsedSchema));
}

@Test
public void shouldGetId() throws Exception {
// When:
final int id = sandboxedClient.getId("some subject", schema);
final int id1 = sandboxedClient.getId("some subject", parsedSchema);
final int id2 = sandboxedClient.getId("some subject", parsedSchema, true);

// Then:
assertThat(id, is(123));
assertThat(id1, is(123));
assertThat(id2, is(123));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public static Optional<TestCasePlan> latestForTestCase(final TestCase testCase)

/**
* Create a TestCasePlan for all saved plans for a test case
* @param testCase the test case to load saved lans for
* @param testCase the test case to load saved plans for
* @return a list of the loaded plans.
*/
public static List<TestCasePlan> allForTestCase(final TestCase testCase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "1"
}
"format" : "JSON_SR"
}
},
"orReplace" : false,
Expand Down Expand Up @@ -83,10 +80,7 @@
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "1"
}
"format" : "JSON_SR"
}
},
"topicName" : "OUTPUT"
Expand Down Expand Up @@ -159,6 +153,7 @@
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "true",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.query.pull.limit.clause.enabled" : "true",
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.router.thread.pool.size" : "50",
"ksql.query.push.v2.registry.installed" : "false",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"version" : "7.2.0",
"timestamp" : 1638217459891,
"timestamp" : 1638433521594,
"path" : "query-validation-tests/elements.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
Expand All @@ -21,10 +21,7 @@
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "1"
}
"format" : "JSON_SR"
}
}
},
Expand Down Expand Up @@ -114,10 +111,7 @@
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "1"
}
"format" : "JSON_SR"
},
"partitions" : 4,
"valueSchema" : {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (`c1` INTEGER) WITH (KAFKA_TOPIC='input', KEY_FORMAT='KAFKA', VALUE_AVRO_SCHEMA_FULL_NAME='ConnectDefault1', VALUE_FORMAT='PROTOBUF', VALUE_SCHEMA_ID=1);",
"statementText" : "CREATE STREAM INPUT (`c1` INTEGER) WITH (KAFKA_TOPIC='input', KEY_FORMAT='KAFKA', VALUE_FORMAT='PROTOBUF', VALUE_SCHEMA_FULL_NAME='ConnectDefault1', VALUE_SCHEMA_ID=1);",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
Expand Down Expand Up @@ -37,8 +37,7 @@
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : {
"fullSchemaName" : "ConnectDefault1",
"schemaId" : "1"
"fullSchemaName" : "ConnectDefault1"
}
}
},
Expand Down Expand Up @@ -88,8 +87,7 @@
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : {
"fullSchemaName" : "ConnectDefault1",
"schemaId" : "1"
"fullSchemaName" : "ConnectDefault1"
}
}
},
Expand Down Expand Up @@ -163,6 +161,7 @@
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "true",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.query.pull.limit.clause.enabled" : "true",
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.router.thread.pool.size" : "50",
"ksql.query.push.v2.registry.installed" : "false",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"version" : "7.2.0",
"timestamp" : 1638217459806,
"timestamp" : 1638433521495,
"path" : "query-validation-tests/elements.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
Expand All @@ -24,8 +24,7 @@
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : {
"fullSchemaName" : "ConnectDefault1",
"schemaId" : "1"
"fullSchemaName" : "ConnectDefault1"
}
}
}
Expand Down Expand Up @@ -105,8 +104,7 @@
"valueFormat" : {
"format" : "PROTOBUF",
"properties" : {
"fullSchemaName" : "ConnectDefault1",
"schemaId" : "1"
"fullSchemaName" : "ConnectDefault1"
}
},
"partitions" : 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,10 @@
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "1"
}
"format" : "JSON_SR"
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "2"
}
"format" : "JSON_SR"
},
"keyFeatures" : [ "UNWRAP_SINGLES" ]
},
Expand Down Expand Up @@ -93,16 +87,10 @@
},
"formats" : {
"keyFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "1"
}
"format" : "JSON_SR"
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "2"
}
"format" : "JSON_SR"
},
"keyFeatures" : [ "UNWRAP_SINGLES" ]
},
Expand Down Expand Up @@ -176,6 +164,7 @@
"ksql.streams.commit.interval.ms" : "2000",
"ksql.query.pull.table.scan.enabled" : "true",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.query.pull.limit.clause.enabled" : "true",
"ksql.streams.topology.optimization" : "all",
"ksql.query.pull.router.thread.pool.size" : "50",
"ksql.query.push.v2.registry.installed" : "false",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"version" : "7.2.0",
"timestamp" : 1638217460208,
"timestamp" : 1638433521844,
"path" : "query-validation-tests/elements.json",
"schemas" : {
"CSAS_OUTPUT_0.KsqlTopic.Source" : {
Expand All @@ -23,16 +23,10 @@
"schema" : "`ROWKEY` INTEGER KEY, `c1` BIGINT",
"keyFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "1"
},
"features" : [ "UNWRAP_SINGLES" ]
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "2"
}
"format" : "JSON_SR"
}
}
},
Expand Down Expand Up @@ -141,16 +135,10 @@
"name" : "OUTPUT",
"keyFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "1"
},
"features" : [ "UNWRAP_SINGLES" ]
},
"valueFormat" : {
"format" : "JSON_SR",
"properties" : {
"schemaId" : "2"
}
"format" : "JSON_SR"
},
"partitions" : 4,
"keySchema" : {
Expand Down
Loading

0 comments on commit 8a11ac2

Please sign in to comment.