From 4b5feb08342403b5863f1045e1f97b52b3484744 Mon Sep 17 00:00:00 2001 From: Zara Lim Date: Wed, 13 Apr 2022 15:52:00 -0700 Subject: [PATCH] fix: include header columns when injecting schemas (#9023) * fix: include header columns when injecting schemas * unused import * update plans --- .../ksql/inference/DefaultSchemaInjector.java | 7 +- .../inference/DefaultSchemaInjectorTest.java | 121 ++++++++++ .../test/tools/ExpectedRecordComparator.java | 6 + .../7.3.0_1649879131547/plan.json | 215 ++++++++++++++++++ .../7.3.0_1649879131547/spec.json | 193 ++++++++++++++++ .../7.3.0_1649879131547/topology | 13 ++ .../query-validation-tests/headers.json | 24 ++ 7 files changed, 577 insertions(+), 2 deletions(-) create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/plan.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/spec.json create mode 100644 ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/topology diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java index 35914ea4cec6..d86bf0d1ff44 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjector.java @@ -53,7 +53,6 @@ import io.confluent.ksql.util.ErrorMessageUtil; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; -import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -525,7 +524,11 @@ private static TableElements buildElements( final Optional keySchema, final Optional valueSchema ) { - final List elements = new ArrayList<>(); + final List elements = preparedStatement.getStatement() + .getElements() + .stream() + .filter(tableElement -> tableElement.getConstraints().isHeaders()) + .collect(Collectors.toList()); if (keySchema.isPresent()) { final ColumnConstraints constraints = getKeyConstraints(preparedStatement.getStatement()); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java index 337427aa30e2..d3390c4f2e60 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/schema/ksql/inference/DefaultSchemaInjectorTest.java @@ -105,8 +105,16 @@ public class DefaultSchemaInjectorTest { private static final ColumnConstraints PRIMARY_KEY_CONSTRAINT = new ColumnConstraints.Builder().primaryKey().build(); + private static final ColumnConstraints HEADER_CONSTRAINT = + new ColumnConstraints.Builder().header("header").build(); + private static final TableElements SOME_KEY_ELEMENTS_STREAM = TableElements.of( new TableElement(ColumnName.of("bob"), new Type(SqlTypes.STRING), KEY_CONSTRAINT)); + private static final TableElements HEADER_ELEMENTS = TableElements.of( + new TableElement(ColumnName.of("head"), new Type(SqlTypes.BYTES), HEADER_CONSTRAINT)); + private static final TableElements HEADER_AND_VALUE = TableElements.of( + new TableElement(ColumnName.of("head"), new Type(SqlTypes.BYTES), HEADER_CONSTRAINT), + new TableElement(ColumnName.of("bob"), new Type(SqlTypes.STRING))); private static final TableElements SOME_KEY_ELEMENTS_TABLE = TableElements.of( new TableElement(ColumnName.of("bob"), new Type(SqlTypes.STRING), PRIMARY_KEY_CONSTRAINT)); private static final TableElements SOME_VALUE_ELEMENTS = TableElements.of( @@ -570,6 +578,106 @@ public void shouldInjectKeyAndMaintainValueColumnsForCt() { )); } + @Test + public void shouldInjectKeyAndValuesForCs() { + // Given: + givenKeyAndValueInferenceSupported(); + when(cs.getElements()).thenReturn(HEADER_ELEMENTS); + + // When: + final ConfiguredStatement result = injector.inject(csStatement); + + // Then: + assertThat(result.getStatement().getElements(), + is(combineElements(HEADER_ELEMENTS, INFERRED_KSQL_KEY_SCHEMA_STREAM, INFERRED_KSQL_VALUE_SCHEMA))); + assertThat(result.getStatementText(), is( + "CREATE STREAM `cs` (" + + "`head` BYTES HEADER('header'), " + + "`key` STRING KEY, " + + "`intField` INTEGER, " + + "`bigIntField` BIGINT, " + + "`doubleField` DOUBLE, " + + "`stringField` STRING, " + + "`booleanField` BOOLEAN, " + + "`arrayField` ARRAY, " + + "`mapField` MAP, " + + "`structField` STRUCT<`s0` BIGINT>, " + + "`decimalField` DECIMAL(4, 2)) " + + "WITH (KAFKA_TOPIC='some-topic', KEY_FORMAT='protobuf', VALUE_FORMAT='avro');" + )); + } + + @Test + public void shouldInjectKeyAndValuesForCt() { + // Given: + givenKeyAndValueInferenceSupported(); + when(ct.getElements()).thenReturn(HEADER_ELEMENTS); + + // When: + final ConfiguredStatement result = injector.inject(ctStatement); + + // Then: + assertThat(result.getStatement().getElements(), + is(combineElements(HEADER_ELEMENTS, INFERRED_KSQL_KEY_SCHEMA_TABLE, INFERRED_KSQL_VALUE_SCHEMA))); + assertThat(result.getStatementText(), is( + "CREATE TABLE `ct` (" + + "`head` BYTES HEADER('header'), " + + "`key` STRING PRIMARY KEY, " + + "`intField` INTEGER, " + + "`bigIntField` BIGINT, " + + "`doubleField` DOUBLE, " + + "`stringField` STRING, " + + "`booleanField` BOOLEAN, " + + "`arrayField` ARRAY, " + + "`mapField` MAP, " + + "`structField` STRUCT<`s0` BIGINT>, " + + "`decimalField` DECIMAL(4, 2)) " + + "WITH (KAFKA_TOPIC='some-topic', KEY_FORMAT='protobuf', VALUE_FORMAT='avro');" + )); + } + + @Test + public void shouldInjectValuesAndMaintainKeysAndHeadersForCs() { + // Given: + givenKeyAndValueInferenceSupported(); + when(cs.getElements()).thenReturn(HEADER_AND_VALUE); + + // When: + final ConfiguredStatement result = injector.inject(csStatement); + + // Then: + assertThat(result.getStatement().getElements(), + is(combineElements(HEADER_ELEMENTS, INFERRED_KSQL_KEY_SCHEMA_STREAM, SOME_VALUE_ELEMENTS))); + assertThat(result.getStatementText(), is( + "CREATE STREAM `cs` (" + + "`head` BYTES HEADER('header'), " + + "`key` STRING KEY, " + + "`bob` STRING) " + + "WITH (KAFKA_TOPIC='some-topic', KEY_FORMAT='protobuf', VALUE_FORMAT='avro');" + )); + } + + @Test + public void shouldInjectValuesAndMaintainKeysAndHeadersForCt() { + // Given: + givenKeyAndValueInferenceSupported(); + when(ct.getElements()).thenReturn(HEADER_AND_VALUE); + + // When: + final ConfiguredStatement result = injector.inject(ctStatement); + + // Then: + assertThat(result.getStatement().getElements(), + is(combineElements(HEADER_ELEMENTS, INFERRED_KSQL_KEY_SCHEMA_TABLE, SOME_VALUE_ELEMENTS))); + assertThat(result.getStatementText(), is( + "CREATE TABLE `ct` (" + + "`head` BYTES HEADER('header'), " + + "`key` STRING PRIMARY KEY, " + + "`bob` STRING) " + + "WITH (KAFKA_TOPIC='some-topic', KEY_FORMAT='protobuf', VALUE_FORMAT='avro');" + )); + } + @Test public void shouldInjectKeyForCsas() { // Given: @@ -1224,4 +1332,17 @@ private static TableElements combineElements( .collect(Collectors.toList()) ); } + + private static TableElements combineElements( + final TableElements headerElems, + final TableElements keyElems, + final TableElements valueElems + ) { + Stream.of(headerElems.stream(), keyElems.stream(), valueElems.stream()).flatMap(s -> s); + return TableElements.of( + Stream.of(headerElems.stream(), keyElems.stream(), valueElems.stream()) + .flatMap(s -> s) + .collect(Collectors.toList()) + ); + } } \ No newline at end of file diff --git a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/ExpectedRecordComparator.java b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/ExpectedRecordComparator.java index 073a7a055ea8..d66cb0ea42fe 100644 --- a/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/ExpectedRecordComparator.java +++ b/ksqldb-functional-tests/src/main/java/io/confluent/ksql/test/tools/ExpectedRecordComparator.java @@ -24,7 +24,9 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.test.model.TestHeader; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Base64; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -181,6 +183,10 @@ private static boolean compareText(final Object actualValue, final JsonNode expe if (actualValue instanceof BigDecimal) { return new BigDecimal(expected.asText()).equals(actualValue); } + if (actualValue instanceof ByteBuffer) { + return expected.asText().equals( + Base64.getEncoder().encodeToString(((ByteBuffer) actualValue).array())); + } return false; } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/plan.json new file mode 100644 index 000000000000..eff5ace015dd --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/plan.json @@ -0,0 +1,215 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM SOURCE1 (H ARRAY> HEADERS, ROWKEY STRING KEY, V INTEGER) WITH (KAFKA_TOPIC='stream-source', KEY_FORMAT='avro', VALUE_FORMAT='avro');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "SOURCE1", + "schema" : "`H` ARRAY> HEADERS, `ROWKEY` STRING KEY, `V` INTEGER", + "timestampColumn" : null, + "topicName" : "stream-source", + "formats" : { + "keyFormat" : { + "format" : "AVRO", + "properties" : { + "fullSchemaName" : "io.confluent.ksql.avro_schemas.Source1Key" + } + }, + "valueFormat" : { + "format" : "AVRO", + "properties" : { } + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "windowInfo" : null, + "orReplace" : false, + "isSource" : false + }, + "queryPlan" : null + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM SOURCE1 SOURCE1\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `V` INTEGER, `H` ARRAY>", + "timestampColumn" : null, + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "AVRO", + "properties" : { + "fullSchemaName" : "io.confluent.ksql.avro_schemas.OutputKey" + } + }, + "valueFormat" : { + "format" : "AVRO", + "properties" : { } + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "windowInfo" : null, + "orReplace" : false, + "isSource" : false + }, + "queryPlan" : { + "sources" : [ "SOURCE1" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "stream-source", + "formats" : { + "keyFormat" : { + "format" : "AVRO", + "properties" : { + "fullSchemaName" : "io.confluent.ksql.avro_schemas.Source1Key" + } + }, + "valueFormat" : { + "format" : "AVRO", + "properties" : { } + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "timestampColumn" : null, + "sourceSchema" : "`H` ARRAY> HEADERS, `ROWKEY` STRING KEY, `V` INTEGER", + "pseudoColumnVersion" : 1 + }, + "keyColumnNames" : [ "ROWKEY" ], + "selectedKeys" : null, + "selectExpressions" : [ "V AS V", "H AS H" ] + }, + "formats" : { + "keyFormat" : { + "format" : "AVRO", + "properties" : { + "fullSchemaName" : "io.confluent.ksql.avro_schemas.OutputKey" + } + }, + "valueFormat" : { + "format" : "AVRO", + "properties" : { } + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "topicName" : "OUTPUT", + "timestampColumn" : null + }, + "queryId" : "CSAS_OUTPUT_0", + "runtimeId" : null + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "metric.reporters" : "", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.connect.basic.auth.credentials.reload" : "false", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.stream.enabled" : "true", + "ksql.query.push.v2.interpreter.enabled" : "true", + "ksql.queryanonymizer.logs_enabled" : "true", + "ksql.variable.substitution.enable" : "true", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.query.push.v2.alos.enabled" : "true", + "ksql.query.push.v2.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.query.pull.range.scan.enabled" : "true", + "ksql.transient.query.cleanup.service.initial.delay.seconds" : "600", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.lambdas.enabled" : "true", + "ksql.source.table.materialization.enabled" : "true", + "ksql.query.pull.max.hourly.bandwidth.megabytes" : "2147483647", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.connect.basic.auth.credentials.source" : "NONE", + "ksql.schema.registry.url" : "schema_registry.url:0", + "ksql.properties.overrides.denylist" : "", + "ksql.service.id" : "some.ksql.service.id", + "ksql.query.push.v2.max.catchup.consumers" : "5", + "ksql.query.push.v2.enabled" : "false", + "ksql.transient.query.cleanup.service.enable" : "true", + "ksql.query.push.v2.metrics.enabled" : "true", + "ksql.rowpartition.rowoffset.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "true", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.endpoint.migrate.query" : "true", + "ksql.query.push.v2.registry.installed" : "false", + "ksql.streams.num.stream.threads" : "4", + "ksql.metrics.tags.custom" : "", + "ksql.query.push.v2.catchup.consumer.msg.window" : "50", + "ksql.runtime.feature.shared.enabled" : "false", + "ksql.udf.collect.metrics" : "false", + "ksql.new.query.planner.enabled" : "false", + "ksql.connect.request.headers.plugin" : null, + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.headers.columns.enabled" : "true", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.connect.request.timeout.ms" : "5000", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.query.cleanup.shutdown.timeout.ms" : "30000", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.queryanonymizer.cluster_namespace" : null, + "ksql.create.or.replace.enabled" : "true", + "ksql.shared.runtimes.count" : "2", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.transient.query.cleanup.service.period.seconds" : "600", + "ksql.suppress.enabled" : "false", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.connect.basic.auth.credentials.file" : "", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.query.push.v2.new.latest.delay.ms" : "5000", + "ksql.query.push.v2.latest.reset.age.ms" : "30000", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.query.pull.limit.clause.enabled" : "true", + "ksql.query.pull.router.thread.pool.size" : "50", + "ksql.query.push.v2.continuation.tokens.enabled" : "false", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.nested.error.set.null" : "true", + "ksql.query.pull.thread.pool.size" : "50", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/spec.json new file mode 100644 index 000000000000..0d2a4d43ba8b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/spec.json @@ -0,0 +1,193 @@ +{ + "version" : "7.3.0", + "timestamp" : 1649879131547, + "path" : "query-validation-tests/headers.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`H` ARRAY> HEADERS, `ROWKEY` STRING KEY, `V` INTEGER", + "keyFormat" : { + "format" : "AVRO", + "properties" : { + "fullSchemaName" : "io.confluent.ksql.avro_schemas.Source1Key" + }, + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "AVRO" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ROWKEY` STRING KEY, `V` INTEGER, `H` ARRAY>", + "keyFormat" : { + "format" : "AVRO", + "properties" : { + "fullSchemaName" : "io.confluent.ksql.avro_schemas.OutputKey" + }, + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "AVRO" + } + } + }, + "testCase" : { + "name" : "Can infer key and value schemas", + "inputs" : [ { + "topic" : "stream-source", + "key" : "k1", + "value" : { + "v" : 40000 + }, + "headers" : [ ] + }, { + "topic" : "stream-source", + "key" : "k2", + "value" : { + "v" : 40000 + }, + "headers" : [ { + "KEY" : "abc", + "VALUE" : "IQ==" + } ] + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "k1", + "value" : { + "V" : 40000, + "H" : [ ] + }, + "headers" : [ ] + }, { + "topic" : "OUTPUT", + "key" : "k2", + "value" : { + "V" : 40000, + "H" : [ { + "KEY" : "abc", + "VALUE" : "IQ==" + } ] + }, + "headers" : [ { + "KEY" : "abc", + "VALUE" : "IQ==" + } ] + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "stream-source", + "keySchema" : "string", + "valueSchema" : { + "type" : "record", + "name" : "most_recent_value_schema_at_SR", + "fields" : [ { + "name" : "v", + "type" : [ "null", "int" ] + } ] + }, + "keyFormat" : "AVRO", + "valueFormat" : "AVRO", + "replicas" : 1, + "numPartitions" : 1 + } ], + "statements" : [ "CREATE STREAM SOURCE1 (H ARRAY> HEADERS) WITH (kafka_topic='stream-source', value_format='avro', key_format='avro');", "CREATE STREAM OUTPUT AS SELECT * FROM SOURCE1;" ], + "post" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ROWKEY` STRING KEY, `V` INTEGER, `H` ARRAY>", + "keyFormat" : { + "format" : "AVRO" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ "UNWRAP_SINGLES" ], + "valueFeatures" : [ ], + "isSource" : false + }, { + "name" : "SOURCE1", + "type" : "STREAM", + "schema" : "`H` ARRAY> HEADERS, `ROWKEY` STRING KEY, `V` INTEGER", + "keyFormat" : { + "format" : "AVRO" + }, + "valueFormat" : "AVRO", + "keyFeatures" : [ "UNWRAP_SINGLES" ], + "valueFeatures" : [ ], + "isSource" : false + } ], + "topics" : { + "topics" : [ { + "name" : "stream-source", + "keyFormat" : { + "format" : "AVRO", + "properties" : { + "fullSchemaName" : "io.confluent.ksql.avro_schemas.Source1Key" + }, + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 1, + "keySchema" : "string", + "valueSchema" : { + "type" : "record", + "name" : "most_recent_value_schema_at_SR", + "fields" : [ { + "name" : "v", + "type" : [ "null", "int" ], + "default" : null + } ], + "connect.name" : "most_recent_value_schema_at_SR" + } + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "AVRO", + "properties" : { + "fullSchemaName" : "io.confluent.ksql.avro_schemas.OutputKey" + }, + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "AVRO" + }, + "partitions" : 4, + "keySchema" : "string", + "valueSchema" : { + "type" : "record", + "name" : "KsqlDataSourceSchema", + "namespace" : "io.confluent.ksql.avro_schemas", + "fields" : [ { + "name" : "V", + "type" : [ "null", "int" ], + "default" : null + }, { + "name" : "H", + "type" : [ "null", { + "type" : "array", + "items" : [ "null", { + "type" : "record", + "name" : "KsqlDataSourceSchema_H", + "fields" : [ { + "name" : "KEY", + "type" : [ "null", "string" ], + "default" : null + }, { + "name" : "VALUE", + "type" : [ "null", "bytes" ], + "default" : null + } ] + } ] + } ], + "default" : null + } ] + } + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/topology new file mode 100644 index 000000000000..3a0bb1ceeefc --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/headers_-_Can_infer_key_and_value_schemas/7.3.0_1649879131547/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [stream-source]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/headers.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/headers.json index a754b89019ea..53d533490f94 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/headers.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/headers.json @@ -198,6 +198,30 @@ "type": "io.confluent.ksql.util.KsqlException", "message": "Invalid type for HEADER('key') column: expected BYTES, got INTEGER" } + }, + { + "name": "Can infer key and value schemas", + "statements": [ + "CREATE STREAM SOURCE1 (H ARRAY> HEADERS) WITH (kafka_topic='stream-source', value_format='avro', key_format='avro');", + "CREATE STREAM OUTPUT AS SELECT * FROM SOURCE1;" + ], + "topics": [ + { + "name": "stream-source", + "keySchema": {"type": "string"}, + "keyFormat": "AVRO", + "valueSchema": {"type": "record", "name": "most_recent_value_schema_at_SR", "fields": [{"name": "v", "type": ["null", "int"]}]}, + "valueFormat": "AVRO" + } + ], + "inputs": [ + {"topic": "stream-source", "key": "k1", "value": {"v": 40000}, "headers": []}, + {"topic": "stream-source", "key": "k2", "value": {"v": 40000}, "headers": [{"KEY": "abc", "VALUE": "IQ=="}]} + ], + "outputs": [ + {"topic": "OUTPUT", "key": "k1", "value": {"V": 40000, "H": []}, "headers": []}, + {"topic": "OUTPUT", "key": "k2", "value": {"V": 40000, "H": [{"KEY": "abc", "VALUE": "IQ=="}]}, "headers": [{"KEY": "abc", "VALUE": "IQ=="}]} + ] } ] } \ No newline at end of file