From fcac4595ff88a0b9a4f4b91f53eaa0d4f669aa31 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg <57688982+AlanConfluent@users.noreply.github.com> Date: Thu, 28 Jan 2021 09:54:50 -0800 Subject: [PATCH] fix: Select star table scans need keys in the intermediate row (#6908) * fix: Select star table scans need keys in the intermediate row --- .../ksql/planner/plan/PullProjectNode.java | 3 ++- .../ksql/planner/plan/PullProjectNodeTest.java | 3 ++- ...ueries-against-materialized-aggregates.json | 18 ++++++++++++++++-- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PullProjectNode.java b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PullProjectNode.java index 7ab4d962c8ac..227a76a7c053 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PullProjectNode.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/PullProjectNode.java @@ -180,7 +180,8 @@ private boolean shouldAddAdditionalColumnsInSchema() { getSource().getSchema().isKeyColumn(cn) ); - return hasSystemColumns || hasKeyColumns; + // Select * also requires keys, in case it's not explicitly mentioned + return hasSystemColumns || hasKeyColumns || isSelectStar; } private boolean isSelectStar() { diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/PullProjectNodeTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/PullProjectNodeTest.java index 64620b2b21b1..0299bd7a4fd2 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/PullProjectNodeTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/planner/plan/PullProjectNodeTest.java @@ -286,7 +286,8 @@ public void shouldBuildPullQueryOutputSchemaSelectStar() { // Then: final LogicalSchema expectedSchema = INPUT_SCHEMA; - assertThat(expectedSchema, is(projectNode.getIntermediateSchema())); + assertThat(expectedSchema.withPseudoAndKeyColsInValue(false), + is(projectNode.getIntermediateSchema())); assertThat(expectedSchema.withoutPseudoAndKeyColsInValue(), is(projectNode.getSchema())); assertThrows( IllegalStateException.class, diff --git a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json index 39921d2549cc..5898278d20ec 100644 --- a/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json +++ b/ksqldb-functional-tests/src/test/resources/rest-query-validation-tests/pull-queries-against-materialized-aggregates.json @@ -2009,7 +2009,8 @@ "statements": [ "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ID;", - "SELECT ID, COUNT, ROWTIME FROM AGGREGATE;" + "SELECT ID, COUNT, ROWTIME FROM AGGREGATE;", + "SELECT * FROM AGGREGATE;" ], "inputs": [ {"topic": "test_topic", "timestamp": 12365, "key": "10", "value": {}}, @@ -2022,6 +2023,11 @@ {"header":{"schema":"`ID` STRING KEY, `COUNT` BIGINT, `ROWTIME` BIGINT"}}, {"row":{"columns":["11", 1, 12345]}}, {"row":{"columns":["10", 1, 12365]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["11", 1]}}, + {"row":{"columns":["10", 1]}} ]} ] }, @@ -2030,7 +2036,8 @@ "statements": [ "CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');", "CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;", - "SELECT ID, WINDOWSTART, WINDOWEND, COUNT, ROWTIME FROM AGGREGATE;" + "SELECT ID, WINDOWSTART, WINDOWEND, COUNT, ROWTIME FROM AGGREGATE;", + "SELECT * FROM AGGREGATE;" ], "inputs": [ {"topic": "test_topic", "timestamp": 12345, "key": "10", "value": {"val": 1}}, @@ -2047,6 +2054,13 @@ {"row":{"columns":["11", 13000, 14000, 1, 13346]}}, {"row":{"columns":["10", 12000, 13000, 1, 12345]}}, {"row":{"columns":["10", 13000, 14000, 1, 13345]}} + ]}, + {"query": [ + {"header":{"schema":"`ID` STRING KEY, `WINDOWSTART` BIGINT KEY, `WINDOWEND` BIGINT KEY, `COUNT` BIGINT"}}, + {"row":{"columns":["11", 12000, 13000, 1]}}, + {"row":{"columns":["11", 13000, 14000, 1]}}, + {"row":{"columns":["10", 12000, 13000, 1]}}, + {"row":{"columns":["10", 13000, 14000, 1]}} ]} ] },