diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java b/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java index ee8175d95514..bd216b96f153 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analysis.java @@ -39,11 +39,13 @@ import io.confluent.ksql.planner.plan.JoinNode.JoinType; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.serde.SerdeOption; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -63,7 +65,7 @@ public class Analysis implements ImmutableAnalysis { private Optional whereExpression = Optional.empty(); private final List selectItems = new ArrayList<>(); private final Set selectColumnNames = new HashSet<>(); - private final List groupByExpressions = new ArrayList<>(); + private final Set groupByExpressions = new LinkedHashSet<>(); private Optional windowExpression = Optional.empty(); private Optional partitionBy = Optional.empty(); private Optional havingExpression = Optional.empty(); @@ -129,8 +131,12 @@ public List getGroupByExpressions() { return ImmutableList.copyOf(groupByExpressions); } - void addGroupByExpressions(final Set expressions) { - groupByExpressions.addAll(expressions); + void setGroupByExpressions(final List expressions) { + expressions.forEach(exp -> { + if (!groupByExpressions.add(exp)) { + throw new KsqlException("Duplicate GROUP BY expression: " + exp); + } + }); } @Override diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java b/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java index e989e04b70a0..0e24b217adae 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/analyzer/Analyzer.java @@ -42,7 +42,6 @@ import io.confluent.ksql.parser.tree.AllColumns; import io.confluent.ksql.parser.tree.AstNode; import io.confluent.ksql.parser.tree.GroupBy; -import io.confluent.ksql.parser.tree.GroupingElement; import io.confluent.ksql.parser.tree.Join; import io.confluent.ksql.parser.tree.JoinOn; import io.confluent.ksql.parser.tree.JoinedSource; @@ -482,22 +481,13 @@ protected AstNode visitSelect(final Select node, final Void context) { return null; } - @Override - protected AstNode visitGroupBy(final GroupBy node, final Void context) { - return null; - } - private void analyzeWhere(final Expression node) { analysis.setWhereExpression(node); } private void analyzeGroupBy(final GroupBy groupBy) { isGroupBy = true; - - for (final GroupingElement groupingElement : groupBy.getGroupingElements()) { - final Set groupingSet = groupingElement.enumerateGroupingSets().get(0); - analysis.addGroupByExpressions(groupingSet); - } + analysis.setGroupByExpressions(groupBy.getGroupingExpressions()); } private void analyzePartitionBy(final Expression partitionBy) { diff --git a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java index 5cabd4f68b16..88d5acf3b1bf 100644 --- a/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java +++ b/ksqldb-engine/src/main/java/io/confluent/ksql/engine/rewrite/StatementRewriter.java @@ -30,7 +30,6 @@ import io.confluent.ksql.parser.tree.DropTable; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.GroupBy; -import io.confluent.ksql.parser.tree.GroupingElement; import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Join; import io.confluent.ksql.parser.tree.JoinCriteria; @@ -41,7 +40,6 @@ import io.confluent.ksql.parser.tree.Relation; import io.confluent.ksql.parser.tree.Select; import io.confluent.ksql.parser.tree.SelectItem; -import io.confluent.ksql.parser.tree.SimpleGroupBy; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.Statements; @@ -441,30 +439,13 @@ protected AstNode visitGroupBy(final GroupBy node, final C context) { return result.get(); } - final List rewrittenGroupings = node.getGroupingElements().stream() - .map(groupingElement -> (GroupingElement) rewriter.apply(groupingElement, context)) + final List rewrittenGroupings = node.getGroupingExpressions().stream() + .map(exp -> processExpression(exp, context)) .collect(Collectors.toList()); return new GroupBy(node.getLocation(), rewrittenGroupings); } - @Override - protected AstNode visitSimpleGroupBy(final SimpleGroupBy node, final C context) { - final Optional result = plugin.apply(node, new Context<>(context, this)); - if (result.isPresent()) { - return result.get(); - } - - final List columns = node.getColumns().stream() - .map(ce -> processExpression(ce, context)) - .collect(Collectors.toList()); - - return new SimpleGroupBy( - node.getLocation(), - columns - ); - } - @Override public AstNode visitRegisterType(final RegisterType node, final C context) { final Optional result = plugin.apply(node, new Context<>(context, this)); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/analyzer/AnalysisTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/analyzer/AnalysisTest.java index 8a0c9e92dc8e..94e78a014e88 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/analyzer/AnalysisTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/analyzer/AnalysisTest.java @@ -15,12 +15,18 @@ package io.confluent.ksql.analyzer; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.confluent.ksql.execution.ddl.commands.KsqlTopic; +import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.model.WindowType; import io.confluent.ksql.name.ColumnName; @@ -32,7 +38,9 @@ import io.confluent.ksql.serde.FormatInfo; import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.serde.WindowInfo; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -63,6 +71,10 @@ public class AnalysisTest { private Function, SourceSchemas> sourceSchemasFactory; @Mock private WindowExpression windowExpression; + @Mock(name = "anExpression") + private Expression exp1; + @Mock(name = "anotherExpression") + private Expression exp2; private Analysis analysis; @@ -177,6 +189,34 @@ public void shouldGetWindowedGroupBySourceSchemasPostAggregate() { )); } + @Test + public void shouldMaintainGroupByOrder() { + // Given: + final List original = ImmutableList.of(exp1, exp2); + + analysis.setGroupByExpressions(original); + + // When: + final List result = analysis.getGroupByExpressions(); + + // Then: + assertThat(result, is(original)); + } + + @Test + public void shouldThrowOnDuplicateGroupBy() { + // Given: + final List withDuplicate = ImmutableList.of(exp1, exp1); + + // When: + final KsqlException e = assertThrows( + KsqlException.class, + () -> analysis.setGroupByExpressions(withDuplicate) + ); + + // Then: + assertThat(e.getMessage(), containsString("Duplicate GROUP BY expression: anExpression")); + } private static void givenNoneWindowedSource(final KsqlStream dataSource) { final KsqlTopic topic = mock(KsqlTopic.class); diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java index 90c03559e097..c4c43c0bed32 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/engine/rewrite/StatementRewriterTest.java @@ -42,7 +42,6 @@ import io.confluent.ksql.parser.tree.CreateTableAsSelect; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.GroupBy; -import io.confluent.ksql.parser.tree.GroupingElement; import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Join; import io.confluent.ksql.parser.tree.JoinCriteria; @@ -52,7 +51,6 @@ import io.confluent.ksql.parser.tree.Relation; import io.confluent.ksql.parser.tree.ResultMaterialization; import io.confluent.ksql.parser.tree.Select; -import io.confluent.ksql.parser.tree.SimpleGroupBy; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.Statements; @@ -769,16 +767,16 @@ public void shouldRewriteInsertIntoUsingPlugin() { @Test public void shouldRewriteGroupBy() { // Given: - final GroupingElement groupingElement1 = mock(GroupingElement.class); - final GroupingElement groupingElement2 = mock(GroupingElement.class); - final GroupingElement rewrittenGroupingElement1 = mock(GroupingElement.class); - final GroupingElement rewrittenGroupingElement2 = mock(GroupingElement.class); + final Expression exp1 = mock(Expression.class); + final Expression exp2 = mock(Expression.class); + final Expression rewrittenExp1 = mock(Expression.class); + final Expression rewrittenExp2 = mock(Expression.class); final GroupBy groupBy = new GroupBy( location, - ImmutableList.of(groupingElement1, groupingElement2) + ImmutableList.of(exp1, exp2) ); - when(mockRewriter.apply(groupingElement1, context)).thenReturn(rewrittenGroupingElement1); - when(mockRewriter.apply(groupingElement2, context)).thenReturn(rewrittenGroupingElement2); + when(expressionRewriter.apply(exp1, context)).thenReturn(rewrittenExp1); + when(expressionRewriter.apply(exp2, context)).thenReturn(rewrittenExp2); // When: final AstNode rewritten = rewriter.rewrite(groupBy, context); @@ -789,32 +787,7 @@ public void shouldRewriteGroupBy() { equalTo( new GroupBy( location, - ImmutableList.of(rewrittenGroupingElement1, rewrittenGroupingElement2) - ) - ) - ); - } - - @Test - public void shouldRewriteSimpleGroupBy() { - // Given: - final Expression expression2 = mock(Expression.class); - final Expression rewrittenExpression2 = mock(Expression.class); - final SimpleGroupBy groupBy = - new SimpleGroupBy(location, ImmutableList.of(expression, expression2)); - when(expressionRewriter.apply(expression, context)).thenReturn(rewrittenExpression); - when(expressionRewriter.apply(expression2, context)).thenReturn(rewrittenExpression2); - - // When: - final AstNode rewritten = rewriter.rewrite(groupBy, context); - - // Then: - assertThat( - rewritten, - equalTo( - new SimpleGroupBy( - location, - ImmutableList.of(rewrittenExpression, rewrittenExpression2) + ImmutableList.of(rewrittenExp1, rewrittenExp2) ) ) ); diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/plan.json b/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/plan.json new file mode 100644 index 000000000000..f49053a29491 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/plan.json @@ -0,0 +1,164 @@ +{ + "plan" : [ { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE STREAM TEST (ROWKEY INTEGER KEY, F1 INTEGER, F2 STRING, F3 INTEGER) WITH (KAFKA_TOPIC='test_topic', KEY='f1', VALUE_FORMAT='DELIMITED');", + "ddlCommand" : { + "@type" : "createStreamV1", + "sourceName" : "TEST", + "schema" : "`ROWKEY` INTEGER KEY, `F1` INTEGER, `F2` STRING, `F3` INTEGER", + "keyField" : "F1", + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + } + }, { + "@type" : "ksqlPlanV1", + "statementText" : "CREATE TABLE OUTPUT AS SELECT\n TEST.F1 F1,\n TEST.F2 F2,\n COUNT(*) KSQL_COL_0\nFROM TEST TEST\nGROUP BY TEST.F3, TEST.F2, TEST.F1\nEMIT CHANGES", + "ddlCommand" : { + "@type" : "createTableV1", + "sourceName" : "OUTPUT", + "schema" : "`ROWKEY` STRING KEY, `F1` INTEGER, `F2` STRING, `KSQL_COL_0` BIGINT", + "topicName" : "OUTPUT", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + } + }, + "queryPlan" : { + "sources" : [ "TEST" ], + "sink" : "OUTPUT", + "physicalPlan" : { + "@type" : "tableSinkV1", + "properties" : { + "queryContext" : "OUTPUT" + }, + "source" : { + "@type" : "tableSelectV1", + "properties" : { + "queryContext" : "Aggregate/Project" + }, + "source" : { + "@type" : "streamAggregateV1", + "properties" : { + "queryContext" : "Aggregate/Aggregate" + }, + "source" : { + "@type" : "streamGroupByV1", + "properties" : { + "queryContext" : "Aggregate/GroupBy" + }, + "source" : { + "@type" : "streamSelectV1", + "properties" : { + "queryContext" : "Aggregate/Prepare" + }, + "source" : { + "@type" : "streamSourceV1", + "properties" : { + "queryContext" : "KsqlTopic/Source" + }, + "topicName" : "test_topic", + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "sourceSchema" : "`ROWKEY` INTEGER KEY, `F1` INTEGER, `F2` STRING, `F3` INTEGER" + }, + "selectExpressions" : [ "F1 AS F1", "F2 AS F2", "ROWTIME AS ROWTIME", "F3 AS F3" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "groupByExpressions" : [ "F3", "F2", "F1" ] + }, + "internalFormats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "nonAggregateColumns" : [ "F1", "F2", "ROWTIME", "F3" ], + "aggregationFunctions" : [ "COUNT(ROWTIME)" ] + }, + "selectExpressions" : [ "F1 AS F1", "F2 AS F2", "KSQL_AGG_VARIABLE_0 AS KSQL_COL_0" ] + }, + "formats" : { + "keyFormat" : { + "format" : "KAFKA" + }, + "valueFormat" : { + "format" : "DELIMITED" + } + }, + "topicName" : "OUTPUT" + }, + "queryId" : "CTAS_OUTPUT_0" + } + } ], + "configs" : { + "ksql.extension.dir" : "ext", + "ksql.streams.cache.max.bytes.buffering" : "0", + "ksql.security.extension.class" : null, + "ksql.transient.prefix" : "transient_", + "ksql.persistence.wrap.single.values" : "true", + "ksql.authorization.cache.expiry.time.secs" : "30", + "ksql.schema.registry.url" : "", + "ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix" : "", + "ksql.streams.auto.offset.reset" : "earliest", + "ksql.query.pull.enable.standby.reads" : "false", + "ksql.connect.url" : "http://localhost:8083", + "ksql.service.id" : "some.ksql.service.id", + "ksql.internal.topic.min.insync.replicas" : "1", + "ksql.streams.shutdown.timeout.ms" : "300000", + "ksql.internal.topic.replicas" : "1", + "ksql.insert.into.values.enabled" : "true", + "ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807", + "ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler", + "ksql.access.validator.enable" : "auto", + "ksql.streams.bootstrap.servers" : "localhost:0", + "ksql.streams.commit.interval.ms" : "2000", + "ksql.metric.reporters" : "", + "ksql.query.pull.metrics.enabled" : "false", + "ksql.authentication.plugin.class" : null, + "ksql.streams.auto.commit.interval.ms" : "0", + "ksql.metrics.extension" : null, + "ksql.streams.topology.optimization" : "all", + "ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.streams.num.stream.threads" : "4", + "ksql.timestamp.throw.on.invalid" : "false", + "ksql.authorization.cache.max.entries" : "10000", + "ksql.metrics.tags.custom" : "", + "ksql.pull.queries.enable" : "true", + "ksql.udfs.enabled" : "true", + "ksql.udf.enable.security.manager" : "true", + "ksql.connect.worker.config" : "", + "ksql.any.key.name.enabled" : "false", + "ksql.sink.window.change.log.additional.retention" : "1000000", + "ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses", + "ksql.udf.collect.metrics" : "false", + "ksql.persistent.prefix" : "query_", + "ksql.query.persistent.active.limit" : "2147483647" + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/spec.json b/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/spec.json new file mode 100644 index 000000000000..b624a3725323 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/spec.json @@ -0,0 +1,59 @@ +{ + "version" : "6.0.0", + "timestamp" : 1585599970057, + "schemas" : { + "CTAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT NOT NULL", + "CTAS_OUTPUT_0.Aggregate.GroupBy" : "STRUCT NOT NULL", + "CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : "STRUCT NOT NULL", + "CTAS_OUTPUT_0.OUTPUT" : "STRUCT NOT NULL" + }, + "inputs" : [ { + "topic" : "test_topic", + "key" : 1, + "value" : "1,a,-1" + }, { + "topic" : "test_topic", + "key" : 2, + "value" : "2,b,-2" + }, { + "topic" : "test_topic", + "key" : 1, + "value" : "1,a,-1" + }, { + "topic" : "test_topic", + "key" : 2, + "value" : "2,b,-2" + }, { + "topic" : "test_topic", + "key" : 3, + "value" : "3,a,-3" + } ], + "outputs" : [ { + "topic" : "OUTPUT", + "key" : "-1|+|a|+|1", + "value" : "1,a,1" + }, { + "topic" : "OUTPUT", + "key" : "-2|+|b|+|2", + "value" : "2,b,1" + }, { + "topic" : "OUTPUT", + "key" : "-1|+|a|+|1", + "value" : "1,a,2" + }, { + "topic" : "OUTPUT", + "key" : "-2|+|b|+|2", + "value" : "2,b,2" + }, { + "topic" : "OUTPUT", + "key" : "-3|+|a|+|3", + "value" : "3,a,1" + } ], + "postConditions" : { + "sources" : [ { + "name" : "OUTPUT", + "type" : "table", + "schema" : "ROWKEY STRING KEY, F1 INT, F2 STRING, KSQL_COL_0 BIGINT" + } ] + } +} \ No newline at end of file diff --git a/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/topology b/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/topology new file mode 100644 index 000000000000..99948fd4b5c4 --- /dev/null +++ b/ksqldb-functional-tests/src/test/resources/historical_plans/group-by_-_with_single_grouping_set_(stream-_table)/6.0.0_1585599970057/topology @@ -0,0 +1,40 @@ +Topologies: + Sub-topology: 0 + Source: KSTREAM-SOURCE-0000000000 (topics: [test_topic]) + --> KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: []) + --> Aggregate-Prepare + <-- KSTREAM-SOURCE-0000000000 + Processor: Aggregate-Prepare (stores: []) + --> KSTREAM-FILTER-0000000003 + <-- KSTREAM-TRANSFORMVALUES-0000000001 + Processor: KSTREAM-FILTER-0000000003 (stores: []) + --> Aggregate-GroupBy + <-- Aggregate-Prepare + Processor: Aggregate-GroupBy (stores: []) + --> Aggregate-GroupBy-repartition-filter + <-- KSTREAM-FILTER-0000000003 + Processor: Aggregate-GroupBy-repartition-filter (stores: []) + --> Aggregate-GroupBy-repartition-sink + <-- Aggregate-GroupBy + Sink: Aggregate-GroupBy-repartition-sink (topic: Aggregate-GroupBy-repartition) + <-- Aggregate-GroupBy-repartition-filter + + Sub-topology: 1 + Source: Aggregate-GroupBy-repartition-source (topics: [Aggregate-GroupBy-repartition]) + --> KSTREAM-AGGREGATE-0000000005 + Processor: KSTREAM-AGGREGATE-0000000005 (stores: [Aggregate-Aggregate-Materialize]) + --> Aggregate-Aggregate-ToOutputSchema + <-- Aggregate-GroupBy-repartition-source + Processor: Aggregate-Aggregate-ToOutputSchema (stores: []) + --> Aggregate-Project + <-- KSTREAM-AGGREGATE-0000000005 + Processor: Aggregate-Project (stores: []) + --> KTABLE-TOSTREAM-0000000011 + <-- Aggregate-Aggregate-ToOutputSchema + Processor: KTABLE-TOSTREAM-0000000011 (stores: []) + --> KSTREAM-SINK-0000000012 + <-- Aggregate-Project + Sink: KSTREAM-SINK-0000000012 (topic: OUTPUT) + <-- KTABLE-TOSTREAM-0000000011 + diff --git a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json index 99d6a2868b65..4409d347e5a3 100644 --- a/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json +++ b/ksqldb-functional-tests/src/test/resources/query-validation-tests/group-by.json @@ -503,6 +503,31 @@ "CREATE STREAM TEST (ROWKEY INT KEY, f1 INT, f2 VARCHAR, f3 INT) WITH (kafka_topic='test_topic', KEY='f1', value_format='DELIMITED');", "CREATE TABLE OUTPUT AS SELECT f1, f2, COUNT(*) FROM TEST GROUP BY f3, (f2, f1);" ], + "expectedException": { + "type": "io.confluent.ksql.parser.exception.ParseFailedException", + "message": "line 2:74: missing ')' at ','" + } + }, + { + "name": "duplicate expressions", + "statements": [ + "CREATE STREAM TEST (ID INT KEY, data STRING) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE TABLE OUTPUT AS SELECT COUNT(*) FROM TEST GROUP BY DATA, DATA;" + ], + "properties": { + "ksql.any.key.name.enabled": true + }, + "expectedException": { + "type": "io.confluent.ksql.util.KsqlStatementException", + "message": "Duplicate GROUP BY expression: TEST.DATA" + } + }, + { + "name": "with single grouping set (stream->table)", + "statements": [ + "CREATE STREAM TEST (ROWKEY INT KEY, f1 INT, f2 VARCHAR, f3 INT) WITH (kafka_topic='test_topic', KEY='f1', value_format='DELIMITED');", + "CREATE TABLE OUTPUT AS SELECT f1, f2, COUNT(*) FROM TEST GROUP BY (f3, f2, f1);" + ], "inputs": [ {"topic": "test_topic", "key": 1, "value": "1,a,-1"}, {"topic": "test_topic", "key": 2, "value": "2,b,-2"}, diff --git a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index 21f699c455e2..2ddfa7ded5b2 100644 --- a/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksqldb-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -153,26 +153,14 @@ windowUnit ; groupBy - : groupingElement (',' groupingElement)* - ; - -groupingElement - : groupingExpressions #singleGroupingSet - ; - -groupingExpressions - : '(' (valueExpression (',' valueExpression)*)? ')' - | valueExpression + : valueExpression (',' valueExpression)* + | '(' (valueExpression (',' valueExpression)*)? ')' ; values : '(' (valueExpression (',' valueExpression)*)? ')' ; -/* - * Dropped `namedQuery` as we don't support them. - */ - selectItem : expression (AS? identifier)? #selectSingle | identifier '.' ASTERISK #selectAll diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 608cee68f494..41092733d24f 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -102,7 +102,6 @@ import io.confluent.ksql.parser.tree.DropTable; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.GroupBy; -import io.confluent.ksql.parser.tree.GroupingElement; import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.InsertValues; import io.confluent.ksql.parser.tree.Join; @@ -127,7 +126,6 @@ import io.confluent.ksql.parser.tree.SelectItem; import io.confluent.ksql.parser.tree.SetProperty; import io.confluent.ksql.parser.tree.ShowColumns; -import io.confluent.ksql.parser.tree.SimpleGroupBy; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.Statements; @@ -563,15 +561,7 @@ private static Pair getSizeAndUnitFromJoinWindowSize( public Node visitGroupBy(final SqlBaseParser.GroupByContext context) { return new GroupBy( getLocation(context), - visit(context.groupingElement(), GroupingElement.class) - ); - } - - @Override - public Node visitSingleGroupingSet(final SqlBaseParser.SingleGroupingSetContext context) { - return new SimpleGroupBy( - getLocation(context), - visit(context.groupingExpressions().valueExpression(), Expression.class) + visit(context.valueExpression(), Expression.class) ); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultTraversalVisitor.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultTraversalVisitor.java index c55be0b1187e..c97307e0862d 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultTraversalVisitor.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/DefaultTraversalVisitor.java @@ -21,13 +21,11 @@ import io.confluent.ksql.parser.tree.CreateTableAsSelect; import io.confluent.ksql.parser.tree.Explain; import io.confluent.ksql.parser.tree.GroupBy; -import io.confluent.ksql.parser.tree.GroupingElement; import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Join; import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Select; import io.confluent.ksql.parser.tree.SelectItem; -import io.confluent.ksql.parser.tree.SimpleGroupBy; import io.confluent.ksql.parser.tree.SingleColumn; import io.confluent.ksql.parser.tree.Statements; @@ -80,22 +78,6 @@ protected R visitJoin(final Join node, final C context) { @Override protected R visitGroupBy(final GroupBy node, final C context) { - for (final GroupingElement groupingElement : node.getGroupingElements()) { - process(groupingElement, context); - } - - return null; - } - - @Override - protected R visitGroupingElement(final GroupingElement node, final C context) { - return null; - } - - @Override - protected R visitSimpleGroupBy(final SimpleGroupBy node, final C context) { - visitGroupingElement(node, context); - return null; } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/ExpressionFormatterUtil.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/ExpressionFormatterUtil.java deleted file mode 100644 index 7ac1d1b82538..000000000000 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/ExpressionFormatterUtil.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.parser; - -import static java.lang.String.format; - -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; -import io.confluent.ksql.execution.expression.formatter.ExpressionFormatter; -import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.parser.tree.GroupingElement; -import io.confluent.ksql.schema.ksql.FormatOptions; -import io.confluent.ksql.util.IdentifierUtil; -import java.util.List; -import java.util.Set; - -public final class ExpressionFormatterUtil { - private ExpressionFormatterUtil() { - } - - public static String formatExpression(final Expression expression) { - return ExpressionFormatter.formatExpression( - expression, - FormatOptions.of(IdentifierUtil::needsQuotes) - ); - } - - public static String formatGroupBy(final List groupingElements) { - final ImmutableList.Builder resultStrings = ImmutableList.builder(); - - for (final GroupingElement groupingElement : groupingElements) { - resultStrings.add(groupingElement.format()); - } - return Joiner.on(", ").join(resultStrings.build()); - } - - public static String formatGroupingSet(final Set groupingSet) { - return format("(%s)", Joiner.on(", ").join(groupingSet.stream() - .map(ExpressionFormatterUtil::formatExpression) - .iterator())); - } - -} diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java index 3fdc87446459..728c4a004f32 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/SqlFormatter.java @@ -111,27 +111,28 @@ protected Void visitQuery(final Query node, final Integer indent) { if (node.getWhere().isPresent()) { append( indent, - "WHERE " + ExpressionFormatterUtil.formatExpression(node.getWhere().get()) + "WHERE " + formatExpression(node.getWhere().get()) ).append('\n'); } if (node.getGroupBy().isPresent()) { - append(indent, "GROUP BY " - + ExpressionFormatterUtil - .formatGroupBy(node.getGroupBy().get().getGroupingElements())) + final String expressions = node.getGroupBy().get().getGroupingExpressions().stream() + .map(SqlFormatter::formatExpression) + .collect(Collectors.joining(", ")); + + append(indent, "GROUP BY " + expressions) .append('\n'); } if (node.getPartitionBy().isPresent()) { append(indent, "PARTITION BY " - + ExpressionFormatterUtil - .formatExpression(node.getPartitionBy().get())) + + formatExpression(node.getPartitionBy().get())) .append('\n'); } if (node.getHaving().isPresent()) { append(indent, "HAVING " - + ExpressionFormatterUtil.formatExpression(node.getHaving().get())) + + formatExpression(node.getHaving().get())) .append('\n'); } @@ -177,7 +178,7 @@ protected Void visitSelect(final Select node, final Integer indent) { @Override protected Void visitSingleColumn(final SingleColumn node, final Integer indent) { - builder.append(ExpressionFormatterUtil.formatExpression(node.getExpression())); + builder.append(formatExpression(node.getExpression())); if (node.getAlias().isPresent()) { builder.append(' ') // for backwards compatibility, we always quote with `""` here @@ -221,7 +222,7 @@ protected Void visitJoin(final Join join, final Integer indent) { node.getWithinExpression().map((e) -> builder.append(e.toString())); final JoinOn on = (JoinOn) criteria; builder.append(" ON (") - .append(ExpressionFormatterUtil.formatExpression(on.getExpression())) + .append(formatExpression(on.getExpression())) .append(")"); return null; @@ -331,7 +332,7 @@ protected Void visitInsertValues(final InsertValues node, final Integer context) builder.append( node.getValues() .stream() - .map(ExpressionFormatterUtil::formatExpression) + .map(SqlFormatter::formatExpression) .collect(Collectors.joining(", "))); builder.append(")"); @@ -398,7 +399,7 @@ public Void visitRegisterType(final RegisterType node, final Integer context) { builder.append("CREATE TYPE "); builder.append(FORMAT_OPTIONS.escape(node.getName())); builder.append(" AS "); - builder.append(ExpressionFormatterUtil.formatExpression(node.getType())); + builder.append(formatExpression(node.getType())); builder.append(";"); return null; } @@ -431,7 +432,7 @@ private void processPartitionBy( final Integer indent ) { partitionByColumn.ifPresent(partitionBy -> append(indent, "PARTITION BY " - + ExpressionFormatterUtil.formatExpression(partitionBy)) + + formatExpression(partitionBy)) .append('\n')); } @@ -502,6 +503,13 @@ private static String formatTableElement(final TableElement e) { } } + private static String formatExpression(final Expression expression) { + return ExpressionFormatter.formatExpression( + expression, + FormatOptions.of(IdentifierUtil::needsQuotes) + ); + } + private static String escapedName(final Name name) { return name.toString(FORMAT_OPTIONS); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java index 6c96a389728f..d50828d7a599 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/AstVisitor.java @@ -141,14 +141,6 @@ protected R visitGroupBy(final GroupBy node, final C context) { return visitNode(node, context); } - protected R visitGroupingElement(final GroupingElement node, final C context) { - return visitNode(node, context); - } - - protected R visitSimpleGroupBy(final SimpleGroupBy node, final C context) { - return visitGroupingElement(node, context); - } - protected R visitTerminateQuery(final TerminateQuery node, final C context) { return visitStatement(node, context); } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/GroupBy.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/GroupBy.java index 6624b28507fe..4feea69bded2 100644 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/GroupBy.java +++ b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/GroupBy.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.Immutable; +import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.parser.NodeLocation; import java.util.List; import java.util.Objects; @@ -28,23 +29,23 @@ @Immutable public class GroupBy extends AstNode { - private final ImmutableList groupingElements; + private final ImmutableList groupingExpressions; - public GroupBy(final List groupingElements) { - this(Optional.empty(), groupingElements); + public GroupBy(final List groupingExpressions) { + this(Optional.empty(), groupingExpressions); } public GroupBy( final Optional location, - final List groupingElements + final List groupingExpressions ) { super(location); - this.groupingElements = ImmutableList - .copyOf(requireNonNull(groupingElements, "groupingElements")); + this.groupingExpressions = ImmutableList + .copyOf(requireNonNull(groupingExpressions, "groupingElements")); } - public List getGroupingElements() { - return groupingElements; + public List getGroupingExpressions() { + return groupingExpressions; } @Override @@ -61,18 +62,18 @@ public boolean equals(final Object o) { return false; } final GroupBy groupBy = (GroupBy) o; - return Objects.equals(groupingElements, groupBy.groupingElements); + return Objects.equals(groupingExpressions, groupBy.groupingExpressions); } @Override public int hashCode() { - return Objects.hash(groupingElements); + return Objects.hash(groupingExpressions); } @Override public String toString() { return toStringHelper(this) - .add("groupingElements", groupingElements) + .add("groupingExpressions", groupingExpressions) .toString(); } } diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/GroupingElement.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/GroupingElement.java deleted file mode 100644 index e7a7c0ec563f..000000000000 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/GroupingElement.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.parser.tree; - -import com.google.errorprone.annotations.Immutable; -import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.parser.NodeLocation; -import java.util.List; -import java.util.Optional; -import java.util.Set; - -@Immutable -public abstract class GroupingElement extends AstNode { - - GroupingElement(final Optional location) { - super(location); - } - - public abstract List> enumerateGroupingSets(); - - @Override - protected R accept(final AstVisitor visitor, final C context) { - return visitor.visitGroupingElement(this, context); - } - - public abstract String format(); -} diff --git a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/SimpleGroupBy.java b/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/SimpleGroupBy.java deleted file mode 100644 index d93eeaa0aac3..000000000000 --- a/ksqldb-parser/src/main/java/io/confluent/ksql/parser/tree/SimpleGroupBy.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2018 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.parser.tree; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.collect.Iterables.getOnlyElement; -import static java.util.Objects.requireNonNull; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.errorprone.annotations.Immutable; -import io.confluent.ksql.execution.expression.tree.Expression; -import io.confluent.ksql.parser.ExpressionFormatterUtil; -import io.confluent.ksql.parser.NodeLocation; -import java.util.List; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; - -@Immutable -public class SimpleGroupBy extends GroupingElement { - - private final ImmutableList columns; - - public SimpleGroupBy(final List columns) { - this(Optional.empty(), columns); - } - - public SimpleGroupBy( - final Optional location, - final List columns - ) { - super(location); - this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns")); - } - - public List getColumns() { - return columns; - } - - @Override - public List> enumerateGroupingSets() { - return ImmutableList.of(ImmutableSet.copyOf(columns)); - } - - @Override - protected R accept(final AstVisitor visitor, final C context) { - return visitor.visitSimpleGroupBy(this, context); - } - - @Override - public String format() { - final Set - columns = - ImmutableSet.copyOf(this.columns); - if (columns.size() == 1) { - return ExpressionFormatterUtil.formatExpression(getOnlyElement(columns)); - } - return ExpressionFormatterUtil.formatGroupingSet(columns); - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final SimpleGroupBy that = (SimpleGroupBy) o; - return Objects.equals(columns, that.columns); - } - - @Override - public int hashCode() { - return Objects.hash(columns); - } - - @Override - public String toString() { - return toStringHelper(this) - .add("columns", columns) - .toString(); - } -} diff --git a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionFormatterUtilTest.java b/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionFormatterUtilTest.java deleted file mode 100644 index 77950c4b3774..000000000000 --- a/ksqldb-parser/src/test/java/io/confluent/ksql/parser/ExpressionFormatterUtilTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2019 Confluent Inc. - * - * Licensed under the Confluent Community License (the "License"); you may not use - * this file except in compliance with the License. You may obtain a copy of the - * License at - * - * http://www.confluent.io/confluent-community-license - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package io.confluent.ksql.parser; - -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertThat; - -import io.confluent.ksql.execution.expression.tree.Type; -import io.confluent.ksql.schema.ksql.types.SqlStruct; -import io.confluent.ksql.schema.ksql.types.SqlTypes; -import org.junit.Test; - -public class ExpressionFormatterUtilTest { - @Test - public void shouldFormatStructWithColumnWithReservedWordName() { - final SqlStruct struct = SqlStruct.builder() - .field("END", SqlTypes.INTEGER) - .build(); - - assertThat( - ExpressionFormatterUtil.formatExpression(new Type(struct)), - equalTo("STRUCT<`END` INTEGER>")); - } -}