diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java index 44d6cadf4530..78e59ad91488 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PlanNode.java @@ -17,7 +17,9 @@ import static java.util.Objects.requireNonNull; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.common.collect.Streams; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; @@ -35,6 +37,7 @@ import io.confluent.ksql.util.GrammaticalJoiner; import io.confluent.ksql.util.KsqlException; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -192,10 +195,13 @@ static Stream orderColumns( final LogicalSchema schema ) { // When doing a `select *` key columns should be at the front of the column list - // but are added at the back during processing for performance reasons. + // but are added at the back during processing for performance reasons. Furthermore, + // the keys should be selected in the same order as they appear in the source. // Switch them around here: - final Stream keys = columns.stream() - .filter(c -> schema.isKeyColumn(c.name())); + final ImmutableMap columnsByName = Maps.uniqueIndex(columns, Column::name); + final Stream keys = schema.key().stream() + .map(key -> columnsByName.get(key.name())) + .filter(Objects::nonNull); final Stream windowBounds = columns.stream() .filter(c -> SystemColumns.isWindowBound(c.name())); diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java index 091987c805c8..04b61c65af7a 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/SelectionUtil.java @@ -15,7 +15,6 @@ package io.confluent.ksql.planner.plan; -import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.UnqualifiedColumnReferenceExp; @@ -27,11 +26,15 @@ import io.confluent.ksql.parser.tree.SelectItem; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.schema.ksql.Column; +import io.confluent.ksql.schema.ksql.Column.Namespace; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.LogicalSchema.Builder; import io.confluent.ksql.schema.ksql.types.SqlType; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -41,6 +44,39 @@ public final class SelectionUtil { private SelectionUtil() { } + /* + * The algorithm behind this method feels unnecessarily complicated and is begging + * for someone to come along and improve it, but until that time here is + * a description of what's going on. + * + * Essentially, we need to build a logical schema that mirrors the physical + * schema until https://github.com/confluentinc/ksql/issues/6374 is addressed. + * That means that the keys must be ordered in the same way as the parent schema + * (e.g. if the source schema was K1 INT KEY, K2 INT KEY and the projection is + * SELECT K2, K1 this method will produce an output schema that is K1, K2 + * despite the way that the keys were ordered in the projection) - see + * https://github.com/confluentinc/ksql/pull/7477 for context on the bug. + * + * But we cannot simply select all the keys and then the values, we must maintain + * the interleaving of key and values because transient queries return all columns + * to the user as "value columns". If someone issues a SELECT VALUE, * FROM FOO + * it is expected that VALUE shows up _before_ the key fields. This means we need to + * reorder the key columns within the list of projections without affecting the + * relative order the keys/values. + * + * To spice things up even further, there's the possibility that the same key is + * aliased multiple times (SELECT K1 AS X, K2 AS Y FROM ...), which is not supported + * but is verified later when building the final projection - so we maintain it here. + * + * Now on to the algorithm itself: we make two passes through the list of projections. + * The first pass builds a mapping from source key to all the projections for that key. + * We will use this mapping to sort the keys in the second pass. This mapping is two + * dimensional to address the possibility of the same key with multiple aliases. + * + * The second pass goes through the list of projections again and builds the logical schema, + * but this time if we encounter a projection that references a key column, we instead take + * it from the list we built in the first pass (in order defined by the parent schema). + */ public static LogicalSchema buildProjectionSchema( final LogicalSchema parentSchema, final List projection, @@ -51,24 +87,46 @@ public static LogicalSchema buildProjectionSchema( functionRegistry ); - final Builder builder = LogicalSchema.builder(); - - final ImmutableMap.Builder keys = ImmutableMap.builder(); + // keyExpressions[i] represents the expressions found in projection + // that are associated with parentSchema's key at index i + final List> keyExpressions = new ArrayList<>(parentSchema.key().size()); + for (int i = 0; i < parentSchema.key().size(); i++) { + keyExpressions.add(new ArrayList<>()); + } + // first pass to construct keyExpressions, keyExpressionMembership + // is just a convenience data structure so that we don't have to do + // the isKey check in the second iteration below + final Set keyExpressionMembership = new HashSet<>(); for (final SelectExpression select : projection) { final Expression expression = select.getExpression(); + if (expression instanceof ColumnReferenceExp) { + final ColumnName name = ((ColumnReferenceExp) expression).getColumnName(); + parentSchema.findColumn(name) + .filter(c -> c.namespace() == Namespace.KEY) + .ifPresent(c -> { + keyExpressions.get(c.index()).add(select); + keyExpressionMembership.add(select); + }); + } + } - final SqlType expressionType = expressionTypeManager - .getExpressionSqlType(expression); - - final boolean keyColumn = expression instanceof ColumnReferenceExp - && parentSchema.isKeyColumn(((ColumnReferenceExp) expression).getColumnName()); - - if (keyColumn) { - builder.keyColumn(select.getAlias(), expressionType); - keys.put(select.getAlias(), expressionType); + // second pass, which iterates the projections but ignores any key expressions, + // instead taking them from the ordered keyExpressions list + final Builder builder = LogicalSchema.builder(); + int currKeyIdx = 0; + for (final SelectExpression select : projection) { + if (keyExpressionMembership.contains(select)) { + while (keyExpressions.get(currKeyIdx).isEmpty()) { + currKeyIdx++; + } + final SelectExpression keyExp = keyExpressions.get(currKeyIdx).remove(0); + final SqlType type = expressionTypeManager.getExpressionSqlType(keyExp.getExpression()); + builder.keyColumn(keyExp.getAlias(), type); } else { - builder.valueColumn(select.getAlias(), expressionType); + final Expression expression = select.getExpression(); + final SqlType type = expressionTypeManager.getExpressionSqlType(expression); + builder.valueColumn(select.getAlias(), type); } } diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/plan.json new file mode 100644 index 000000000000..fe242d30d9d7 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/plan.json @@ -0,0 +1,157 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (NAME STRING KEY, ID INTEGER, AGE INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nPARTITION BY INPUT.ID, INPUT.NAME\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` INTEGER KEY, `NAME` STRING KEY, `AGE` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "sourceSchema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER" + }, + "keyExpression" : [ "ID", "NAME" ] + }, + "keyColumnNames" : [ "ID", "NAME" ], + "selectExpressions" : [ "AGE AS AGE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/spec.json new file mode 100644 index 000000000000..009d85f1d8d0 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/spec.json @@ -0,0 +1,156 @@ +{ + "version" : "6.2.0", + "timestamp" : 1620261642747, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON", + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` INTEGER KEY, `NAME` STRING KEY, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "multiple columns - select * - some key some value", + "inputs" : [ { + "topic" : "input", + "key" : "bob", + "value" : { + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : "bob", + "value" : { + "ID" : null, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : "bob", + "value" : null + }, { + "topic" : "input", + "key" : null, + "value" : null + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "NAME" : "bob" + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : "bob" + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "NAME" : null + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : "bob" + }, + "value" : null + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : null + }, + "value" : null + } ], + "topics" : [ { + "name" : "input", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", "CREATE STREAM OUTPUT AS SELECT * FROM input PARTITION BY id, name;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ "UNWRAP_SINGLES" ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `NAME` STRING KEY, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input", + "keyFormat" : { + "format" : "JSON", + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/topology new file mode 100644 index 000000000000..a8325d21f26b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value/6.2.0_1620261642747/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/plan.json new file mode 100644 index 000000000000..604d79183bec --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/plan.json @@ -0,0 +1,157 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (NAME STRING KEY, ID INTEGER, AGE INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nPARTITION BY INPUT.NAME, INPUT.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "sourceSchema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER" + }, + "keyExpression" : [ "NAME", "ID" ] + }, + "keyColumnNames" : [ "NAME", "ID" ], + "selectExpressions" : [ "AGE AS AGE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/spec.json new file mode 100644 index 000000000000..d2bcac1d8e1b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/spec.json @@ -0,0 +1,156 @@ +{ + "version" : "6.2.0", + "timestamp" : 1620261642829, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON", + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "multiple columns - select * - some key some value - key first", + "inputs" : [ { + "topic" : "input", + "key" : "bob", + "value" : { + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : "bob", + "value" : { + "ID" : null, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : "bob", + "value" : null + }, { + "topic" : "input", + "key" : null, + "value" : null + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "NAME" : "bob" + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : "bob" + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "NAME" : null + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : "bob" + }, + "value" : null + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : null + }, + "value" : null + } ], + "topics" : [ { + "name" : "input", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", "CREATE STREAM OUTPUT AS SELECT * FROM input PARTITION BY name, id;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ "UNWRAP_SINGLES" ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input", + "keyFormat" : { + "format" : "JSON", + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/topology new file mode 100644 index 000000000000..a8325d21f26b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select___-_some_key_some_value_-_key_first/6.2.0_1620261642829/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/plan.json new file mode 100644 index 000000000000..0035305614b1 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/plan.json @@ -0,0 +1,155 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (NAME STRING, ID INTEGER, AGE INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.AGE AGE,\n INPUT.ID ID,\n INPUT.NAME NAME\nFROM INPUT INPUT\nPARTITION BY INPUT.AGE, INPUT.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER" + }, + "keyExpression" : [ "AGE", "ID" ] + }, + "keyColumnNames" : [ "AGE", "ID" ], + "selectExpressions" : [ "NAME AS NAME" ] + }, + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/spec.json new file mode 100644 index 000000000000..595d2819f792 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/spec.json @@ -0,0 +1,146 @@ +{ + "version" : "6.2.0", + "timestamp" : 1620257307634, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "multiple columns - select explicit - reorder ", + "inputs" : [ { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : null, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : null, + "AGE" : null + } + }, { + "topic" : "input", + "key" : null, + "value" : null + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "AGE" : 30 + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : 30 + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : null + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : null + }, + "value" : null + } ], + "topics" : [ { + "name" : "input", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (NAME STRING, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", "CREATE STREAM OUTPUT AS select AGE, ID, NAME from INPUT partition by AGE, ID;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/topology new file mode 100644 index 000000000000..a8325d21f26b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_/6.2.0_1620257307634/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/plan.json new file mode 100644 index 000000000000..037aa60a3276 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/plan.json @@ -0,0 +1,155 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (NAME STRING, ID INTEGER, AGE INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n INPUT.AGE AGE,\n INPUT.NAME NAME\nFROM INPUT INPUT\nPARTITION BY INPUT.AGE, INPUT.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER" + }, + "keyExpression" : [ "AGE", "ID" ] + }, + "keyColumnNames" : [ "AGE", "ID" ], + "selectExpressions" : [ "NAME AS NAME" ] + }, + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/spec.json new file mode 100644 index 000000000000..95f944384177 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/spec.json @@ -0,0 +1,146 @@ +{ + "version" : "6.2.0", + "timestamp" : 1620257307655, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "multiple columns - select explicit - reorder partition by but keep same order in project", + "inputs" : [ { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : null, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : null, + "AGE" : null + } + }, { + "topic" : "input", + "key" : null, + "value" : null + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "AGE" : 30 + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : 30 + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : null + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : null + }, + "value" : null + } ], + "topics" : [ { + "name" : "input", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (NAME STRING, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", "CREATE STREAM OUTPUT AS select ID, AGE, NAME from INPUT partition by AGE, ID;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/topology new file mode 100644 index 000000000000..a8325d21f26b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_explicit_-_reorder_partition_by_but_keep_same_order_in_project/6.2.0_1620257307655/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/plan.json new file mode 100644 index 000000000000..d4841b001bfc --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/plan.json @@ -0,0 +1,155 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (NAME STRING, ID INTEGER, AGE INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INPUT INPUT\nPARTITION BY INPUT.AGE, INPUT.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER" + }, + "keyExpression" : [ "AGE", "ID" ] + }, + "keyColumnNames" : [ "AGE", "ID" ], + "selectExpressions" : [ "NAME AS NAME" ] + }, + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/spec.json new file mode 100644 index 000000000000..dd4bbd009d68 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/spec.json @@ -0,0 +1,146 @@ +{ + "version" : "6.2.0", + "timestamp" : 1620257307582, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "multiple columns - select star - reorder columns", + "inputs" : [ { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : null, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "NAME" : "bob", + "ID" : null, + "AGE" : null + } + }, { + "topic" : "input", + "key" : null, + "value" : null + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "AGE" : 30 + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : 30 + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : null + }, + "value" : { + "NAME" : "bob" + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "AGE" : null + }, + "value" : null + } ], + "topics" : [ { + "name" : "input", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (NAME STRING, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", "CREATE STREAM OUTPUT AS select * from INPUT partition by AGE, ID;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`AGE` INTEGER KEY, `ID` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/topology new file mode 100644 index 000000000000..a8325d21f26b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_select_star_-_reorder_columns/6.2.0_1620257307582/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/plan.json new file mode 100644 index 000000000000..b5532b40bb8c --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/plan.json @@ -0,0 +1,157 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (NAME STRING KEY, ID INTEGER, AGE INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.ID ID,\n INPUT.AGE AGE,\n INPUT.NAME NAME\nFROM INPUT INPUT\nPARTITION BY INPUT.NAME, INPUT.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "sourceSchema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER" + }, + "keyExpression" : [ "NAME", "ID" ] + }, + "keyColumnNames" : [ "NAME", "ID" ], + "selectExpressions" : [ "AGE AS AGE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/spec.json new file mode 100644 index 000000000000..9f6efbf87b41 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/spec.json @@ -0,0 +1,156 @@ +{ + "version" : "6.2.0", + "timestamp" : 1620261429513, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON", + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "multiple columns - some key some value - key first", + "inputs" : [ { + "topic" : "input", + "key" : "bob", + "value" : { + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : "bob", + "value" : { + "ID" : null, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : "bob", + "value" : null + }, { + "topic" : "input", + "key" : null, + "value" : null + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "NAME" : "bob" + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : "bob" + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "NAME" : null + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : "bob" + }, + "value" : null + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : null + }, + "value" : null + } ], + "topics" : [ { + "name" : "input", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", "CREATE STREAM OUTPUT AS select ID, AGE, NAME from INPUT partition by NAME, ID;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ "UNWRAP_SINGLES" ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input", + "keyFormat" : { + "format" : "JSON", + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/topology new file mode 100644 index 000000000000..a8325d21f26b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_key_first/6.2.0_1620261429513/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/plan.json new file mode 100644 index 000000000000..ee18f044f7c6 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/plan.json @@ -0,0 +1,157 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (NAME STRING KEY, ID INTEGER, AGE INTEGER) WITH (FORMAT='JSON', KAFKA_TOPIC='input');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.NAME NAME,\n INPUT.AGE AGE,\n INPUT.ID ID\nFROM INPUT INPUT\nPARTITION BY INPUT.NAME, INPUT.ID\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSelectKeyV2", + "properties" : { + "queryContext" : "PartitionBy" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "input", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "keyFeatures" : [ "UNWRAP_SINGLES" ] + }, + "sourceSchema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER" + }, + "keyExpression" : [ "NAME", "ID" ] + }, + "keyColumnNames" : [ "NAME", "ID" ], + "selectExpressions" : [ "AGE AS AGE" ] + }, + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/spec.json new file mode 100644 index 000000000000..fb34336a258a --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/spec.json @@ -0,0 +1,156 @@ +{ + "version" : "6.2.0", + "timestamp" : 1620261797927, + "path" : "query-validation-tests/partition-by.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON", + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "multiple columns - some key some value - properly ordered in selection", + "inputs" : [ { + "topic" : "input", + "key" : "bob", + "value" : { + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : "bob", + "value" : { + "ID" : null, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : null, + "value" : { + "ID" : 10, + "AGE" : 30 + } + }, { + "topic" : "input", + "key" : "bob", + "value" : null + }, { + "topic" : "input", + "key" : null, + "value" : null + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "NAME" : "bob" + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : "bob" + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : 10, + "NAME" : null + }, + "value" : { + "AGE" : 30 + } + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : "bob" + }, + "value" : null + }, { + "topic" : "OUTPUT", + "key" : { + "ID" : null, + "NAME" : null + }, + "value" : null + } ], + "topics" : [ { + "name" : "input", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", "CREATE STREAM OUTPUT AS SELECT name, age, id FROM input PARTITION BY name, id;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING KEY, `ID` INTEGER, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ "UNWRAP_SINGLES" ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`NAME` STRING KEY, `ID` INTEGER KEY, `AGE` INTEGER", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "input", + "keyFormat" : { + "format" : "JSON", + "features" : [ "UNWRAP_SINGLES" ] + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/topology new file mode 100644 index 000000000000..a8325d21f26b --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/partition-by_-_multiple_columns_-_some_key_some_value_-_properly_ordered_in_selection/6.2.0_1620261797927/topology @@ -0,0 +1,16 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [input]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> PartitionBy-SelectKey + <-- KSTREAM-SOURCE-0000000000 + Processor: PartitionBy-SelectKey (stores: []) + --> Project + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000004 + <-- PartitionBy-SelectKey + Sink: KSTREAM-SINK-0000000004 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/plan.json new file mode 100644 index 000000000000..996d7f3cb1f7 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/plan.json @@ -0,0 +1,148 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM INPUT (ID INTEGER KEY, AGE INTEGER KEY, NAME STRING) WITH (FORMAT='JSON', KAFKA_TOPIC='test_topic');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "INPUT", + "schema" : "`ID` INTEGER KEY, `AGE` INTEGER KEY, `NAME` STRING", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM OUTPUT AS SELECT\n INPUT.AGE AGE,\n INPUT.ID ID,\n INPUT.NAME NAME\nFROM INPUT INPUT\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "OUTPUT", + "schema" : "`ID` INTEGER KEY, `AGE` INTEGER KEY, `NAME` STRING", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "orReplace" : false + }, + "queryPlan" : { + "sources" : [ "INPUT" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "streamSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Project" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "sourceSchema" : "`ID` INTEGER KEY, `AGE` INTEGER KEY, `NAME` STRING" + }, + "keyColumnNames" : [ "ID", "AGE" ], + "selectExpressions" : [ "NAME AS NAME" ] + }, + "formats" : { + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CSAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "metric.reporters" : "", + "ksql.transient.prefix" : "transient_", + "ksql.query.status.running.threshold.seconds" : "300", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.persistence.default.format.key" : "KAFKA", + "ksql.query.persistent.max.bytes.buffering.total" : "-1", + "ksql.query.error.max.queue.size" : "10", + "ksql.variable.substitution.enable" : "true", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.query.pull.max.qps" : "2147483647", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.query.pull.metrics.enabled" : "true", + "ksql.create.or.replace.enabled" : "true", + "ksql.metrics.extension" : null, + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.cast.strings.preserve.nulls" : "true", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.pull.queries.enable" : "true", + "ksql.lambdas.enabled" : "true", + "ksql.suppress.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.query.persistent.active.limit" : "2147483647", + "ksql.persistence.wrap.single.values" : null, + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.query.retry.backoff.initial.ms" : "15000", + "ksql.query.transient.max.bytes.buffering.total" : "-1", + "ksql.schema.registry.url" : "", + "ksql.properties.overrides.denylist" : "", + "ksql.query.pull.max.concurrent.requests" : "2147483647", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.query.pull.interpreter.enabled" : "true", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.query.pull.table.scan.enabled" : "false", + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.streams.topology.optimization" : "all", + "ksql.query.retry.backoff.max.ms" : "900000", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.metrics.tags.custom" : "", + "ksql.persistence.default.format.value" : null, + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.udf.collect.metrics" : "false", + "ksql.query.pull.thread.pool.size" : "100", + "ksql.persistent.prefix" : "query_", + "ksql.metastore.backup.location" : "", + "ksql.error.classifier.regex" : "", + "ksql.suppress.buffer.size.bytes" : "-1" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/spec.json new file mode 100644 index 000000000000..9620a12ddf45 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/spec.json @@ -0,0 +1,102 @@ +{ + "version" : "6.2.0", + "timestamp" : 1620257309645, + "path" : "query-validation-tests/select.json", + "schemas" : { + "CSAS_OUTPUT_0.KsqlTopic.Source" : { + "schema" : "`ID` INTEGER KEY, `AGE` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + }, + "CSAS_OUTPUT_0.OUTPUT" : { + "schema" : "`ID` INTEGER KEY, `AGE` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + } + } + }, + "testCase" : { + "name" : "multi-key columns reordered", + "inputs" : [ { + "topic" : "test_topic", + "key" : { + "id" : 1, + "age" : 20 + }, + "value" : { + "name" : "a" + } + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : { + "ID" : 1, + "AGE" : 20 + }, + "value" : { + "NAME" : "a" + } + } ], + "topics" : [ { + "name" : "OUTPUT", + "replicas" : 1, + "numPartitions" : 4 + }, { + "name" : "test_topic", + "replicas" : 1, + "numPartitions" : 4 + } ], + "statements" : [ "CREATE STREAM INPUT (id int KEY, age int KEY, name STRING) WITH (kafka_topic='test_topic', format='JSON');", "CREATE STREAM OUTPUT AS SELECT age, id, NAME FROM INPUT;" ], + "post" : { + "sources" : [ { + "name" : "INPUT", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `AGE` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + }, { + "name" : "OUTPUT", + "type" : "STREAM", + "schema" : "`ID` INTEGER KEY, `AGE` INTEGER KEY, `NAME` STRING", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : "JSON", + "keyFeatures" : [ ], + "valueFeatures" : [ ] + } ], + "topics" : { + "topics" : [ { + "name" : "test_topic", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + }, { + "name" : "OUTPUT", + "keyFormat" : { + "format" : "JSON" + }, + "valueFormat" : { + "format" : "JSON" + }, + "partitions" : 4 + } ] + } + } + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/topology new file mode 100644 index 000000000000..441a8f282644 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/select_-_multi-key_columns_reordered/6.2.0_1620257309645/topology @@ -0,0 +1,13 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Project + <-- KSTREAM-SOURCE-0000000000 + Processor: Project (stores: []) + --> KSTREAM-SINK-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Sink: KSTREAM-SINK-0000000003 (topic: OUTPUT) + <-- Project + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json index 32d284413fea..4df5c23b1a20 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/partition-by.json @@ -1050,6 +1050,30 @@ ] } }, + { + "name": "multiple columns - select star - reorder columns", + "statements": [ + "CREATE STREAM INPUT (NAME STRING, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", + "CREATE STREAM OUTPUT AS select * from INPUT partition by AGE, ID;" + ], + "inputs": [ + {"topic": "input", "value": {"NAME": "bob", "ID": 10, "AGE": 30}}, + {"topic": "input", "value": {"NAME": "bob", "ID": null, "AGE": 30}}, + {"topic": "input", "value": {"NAME": "bob", "ID": null, "AGE": null}}, + {"topic": "input", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"ID": 10, "AGE": 30}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": 30}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": null}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": null}, "value": null} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "AGE INT KEY, ID INT KEY, NAME STRING"} + ] + } + }, { "name": "multiple columns - select explicit", "statements": [ @@ -1074,11 +1098,59 @@ ] } }, + { + "name": "multiple columns - select explicit - reorder ", + "statements": [ + "CREATE STREAM INPUT (NAME STRING, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", + "CREATE STREAM OUTPUT AS SELECT age, id, name FROM input PARTITION BY age, id;" + ], + "inputs": [ + {"topic": "input", "value": {"NAME": "bob", "ID": 10, "AGE": 30}}, + {"topic": "input", "value": {"NAME": "bob", "ID": null, "AGE": 30}}, + {"topic": "input", "value": {"NAME": "bob", "ID": null, "AGE": null}}, + {"topic": "input", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"ID": 10, "AGE": 30}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": 30}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": null}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": null}, "value": null} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "AGE INT KEY, ID INT KEY, NAME STRING"} + ] + } + }, + { + "name": "multiple columns - select explicit - reorder partition by but keep same order in project", + "statements": [ + "CREATE STREAM INPUT (NAME STRING, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", + "CREATE STREAM OUTPUT AS SELECT id, age, name FROM input PARTITION BY age, id;" + ], + "inputs": [ + {"topic": "input", "value": {"NAME": "bob", "ID": 10, "AGE": 30}}, + {"topic": "input", "value": {"NAME": "bob", "ID": null, "AGE": 30}}, + {"topic": "input", "value": {"NAME": "bob", "ID": null, "AGE": null}}, + {"topic": "input", "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"ID": 10, "AGE": 30}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": 30}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": null}, "value": {"NAME": "bob"}}, + {"topic": "OUTPUT", "key": {"ID": null, "AGE": null}, "value": null} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "AGE INT KEY, ID INT KEY, NAME STRING"} + ] + } + }, { "name": "multiple columns - some key some value", "statements": [ "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", - "CREATE STREAM OUTPUT AS select ID, AGE, NAME from INPUT partition by ID, NAME;" + "CREATE STREAM OUTPUT AS SELECT id, age, name FROM input PARTITION BY id, name;" ], "inputs": [ {"topic": "input", "key": "bob", "value": {"ID": 10, "AGE": 30}}, @@ -1100,6 +1172,110 @@ ] } }, + { + "name": "multiple columns - select * - some key some value", + "statements": [ + "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", + "CREATE STREAM OUTPUT AS SELECT * FROM input PARTITION BY id, name;" + ], + "inputs": [ + {"topic": "input", "key": "bob", "value": {"ID": 10, "AGE": 30}}, + {"topic": "input", "key": "bob", "value": {"ID": null, "AGE": 30}}, + {"topic": "input", "key": null, "value": {"ID": 10, "AGE": 30}}, + {"topic": "input", "key": "bob", "value": null}, + {"topic": "input", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"ID": 10, "NAME": "bob"}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": "bob"}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": 10, "NAME": null}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": "bob"}, "value": null}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": null}, "value": null} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ID INT KEY, NAME STRING KEY, AGE INT"} + ] + } + }, + { + "name": "multiple columns - some key some value - key first", + "statements": [ + "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", + "CREATE STREAM OUTPUT AS select ID, AGE, NAME from INPUT partition by NAME, ID;" + ], + "inputs": [ + {"topic": "input", "key": "bob", "value": {"ID": 10, "AGE": 30}}, + {"topic": "input", "key": "bob", "value": {"ID": null, "AGE": 30}}, + {"topic": "input", "key": null, "value": {"ID": 10, "AGE": 30}}, + {"topic": "input", "key": "bob", "value": null}, + {"topic": "input", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"ID": 10, "NAME": "bob"}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": "bob"}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": 10, "NAME": null}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": "bob"}, "value": null}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": null}, "value": null} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "NAME STRING KEY, ID INT KEY, AGE INT"} + ] + } + }, + { + "name": "multiple columns - select * - some key some value - key first", + "statements": [ + "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", + "CREATE STREAM OUTPUT AS SELECT * FROM input PARTITION BY name, id;" + ], + "inputs": [ + {"topic": "input", "key": "bob", "value": {"ID": 10, "AGE": 30}}, + {"topic": "input", "key": "bob", "value": {"ID": null, "AGE": 30}}, + {"topic": "input", "key": null, "value": {"ID": 10, "AGE": 30}}, + {"topic": "input", "key": "bob", "value": null}, + {"topic": "input", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"ID": 10, "NAME": "bob"}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": "bob"}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": 10, "NAME": null}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": "bob"}, "value": null}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": null}, "value": null} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "NAME STRING KEY, ID INT KEY, AGE INT"} + ] + } + }, + { + "name": "multiple columns - some key some value - properly ordered in selection", + "statements": [ + "CREATE STREAM INPUT (NAME STRING KEY, ID INT, AGE INT) with (kafka_topic='input', format='JSON');", + "CREATE STREAM OUTPUT AS SELECT name, age, id FROM input PARTITION BY name, id;" + ], + "inputs": [ + {"topic": "input", "key": "bob", "value": {"ID": 10, "AGE": 30}}, + {"topic": "input", "key": "bob", "value": {"ID": null, "AGE": 30}}, + {"topic": "input", "key": null, "value": {"ID": 10, "AGE": 30}}, + {"topic": "input", "key": "bob", "value": null}, + {"topic": "input", "key": null, "value": null} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"ID": 10, "NAME": "bob"}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": "bob"}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": 10, "NAME": null}, "value": {"AGE": 30}}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": "bob"}, "value": null}, + {"topic": "OUTPUT", "key": {"ID": null, "NAME": null}, "value": null} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "NAME STRING KEY, ID INT KEY, AGE INT"} + ] + } + }, { "name": "multiple key columns", "statements": [ diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/select.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/select.json index 1d3886cc2904..6ffa3e47c915 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/select.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/select.json @@ -21,6 +21,24 @@ ] } }, + { + "name": "multi-key columns reordered", + "statements": [ + "CREATE STREAM INPUT (id int KEY, age int KEY, name STRING) WITH (kafka_topic='test_topic', format='JSON');", + "CREATE STREAM OUTPUT AS SELECT age, id, NAME FROM INPUT;" + ], + "inputs": [ + {"topic": "test_topic", "key": {"id": 1, "age": 20}, "value": {"name": "a"}} + ], + "outputs": [ + {"topic": "OUTPUT", "key": {"ID": 1, "AGE": 20}, "value": {"NAME": "a"}} + ], + "post": { + "sources": [ + {"name": "OUTPUT", "type": "stream", "schema": "ID INT KEY, AGE INT KEY, NAME STRING"} + ] + } + }, { "name": "value column", "statements": [