Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not allow grouping sets #4942

Merged
merged 4 commits into from
Mar 31, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public List<Expression> getGroupByExpressions() {
return ImmutableList.copyOf(groupByExpressions);
}

void addGroupByExpressions(final Set<Expression> expressions) {
void addGroupByExpressions(final Collection<Expression> expressions) {
Copy link
Contributor

@blueedgenick blueedgenick Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't grouping expressions be a unique set though ? what does it mean to group by a,a,b,a ? it's possible i'm entirely misunderstanding what this piece of code does though, rusty on this stuff ;) Comment applies regardless of whether the groupingExpression is retained (per Almog's suggestion) or not - i think....

Copy link
Contributor Author

@big-andy-coates big-andy-coates Mar 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point Nick. The old code didn't actually enforce this as addGroupByExpressions was called multiple times.

I've corrected the code and added tests to cover this. Now:

CREATE TABLE OUTPUT AS SELECT COUNT(*) FROM TEST GROUP BY DATA, DATA;

Results in an error:

Duplicate GROUP BY expression: TEST.DATA

The old code was also not maintaining the order of the GROUP BY expressions. This has also been fixed in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

groupByExpressions.addAll(expressions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.confluent.ksql.parser.tree.AllColumns;
import io.confluent.ksql.parser.tree.AstNode;
import io.confluent.ksql.parser.tree.GroupBy;
import io.confluent.ksql.parser.tree.GroupingElement;
import io.confluent.ksql.parser.tree.Join;
import io.confluent.ksql.parser.tree.JoinOn;
import io.confluent.ksql.parser.tree.JoinedSource;
Expand Down Expand Up @@ -482,22 +481,13 @@ protected AstNode visitSelect(final Select node, final Void context) {
return null;
}

@Override
protected AstNode visitGroupBy(final GroupBy node, final Void context) {
return null;
}

private void analyzeWhere(final Expression node) {
analysis.setWhereExpression(node);
}

private void analyzeGroupBy(final GroupBy groupBy) {
isGroupBy = true;

for (final GroupingElement groupingElement : groupBy.getGroupingElements()) {
final Set<Expression> groupingSet = groupingElement.enumerateGroupingSets().get(0);
analysis.addGroupByExpressions(groupingSet);
}
analysis.addGroupByExpressions(groupBy.getGroupingExpressions());
}

private void analyzePartitionBy(final Expression partitionBy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.Explain;
import io.confluent.ksql.parser.tree.GroupBy;
import io.confluent.ksql.parser.tree.GroupingElement;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Join;
import io.confluent.ksql.parser.tree.JoinCriteria;
Expand All @@ -41,7 +40,6 @@
import io.confluent.ksql.parser.tree.Relation;
import io.confluent.ksql.parser.tree.Select;
import io.confluent.ksql.parser.tree.SelectItem;
import io.confluent.ksql.parser.tree.SimpleGroupBy;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.Statements;
Expand Down Expand Up @@ -441,30 +439,13 @@ protected AstNode visitGroupBy(final GroupBy node, final C context) {
return result.get();
}

final List<GroupingElement> rewrittenGroupings = node.getGroupingElements().stream()
.map(groupingElement -> (GroupingElement) rewriter.apply(groupingElement, context))
final List<Expression> rewrittenGroupings = node.getGroupingExpressions().stream()
.map(exp -> processExpression(exp, context))
.collect(Collectors.toList());

return new GroupBy(node.getLocation(), rewrittenGroupings);
}

@Override
protected AstNode visitSimpleGroupBy(final SimpleGroupBy node, final C context) {
final Optional<AstNode> result = plugin.apply(node, new Context<>(context, this));
if (result.isPresent()) {
return result.get();
}

final List<Expression> columns = node.getColumns().stream()
.map(ce -> processExpression(ce, context))
.collect(Collectors.toList());

return new SimpleGroupBy(
node.getLocation(),
columns
);
}

@Override
public AstNode visitRegisterType(final RegisterType node, final C context) {
final Optional<AstNode> result = plugin.apply(node, new Context<>(context, this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.Explain;
import io.confluent.ksql.parser.tree.GroupBy;
import io.confluent.ksql.parser.tree.GroupingElement;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.Join;
import io.confluent.ksql.parser.tree.JoinCriteria;
Expand All @@ -52,7 +51,6 @@
import io.confluent.ksql.parser.tree.Relation;
import io.confluent.ksql.parser.tree.ResultMaterialization;
import io.confluent.ksql.parser.tree.Select;
import io.confluent.ksql.parser.tree.SimpleGroupBy;
import io.confluent.ksql.parser.tree.SingleColumn;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.Statements;
Expand Down Expand Up @@ -769,16 +767,16 @@ public void shouldRewriteInsertIntoUsingPlugin() {
@Test
public void shouldRewriteGroupBy() {
// Given:
final GroupingElement groupingElement1 = mock(GroupingElement.class);
final GroupingElement groupingElement2 = mock(GroupingElement.class);
final GroupingElement rewrittenGroupingElement1 = mock(GroupingElement.class);
final GroupingElement rewrittenGroupingElement2 = mock(GroupingElement.class);
final Expression exp1 = mock(Expression.class);
final Expression exp2 = mock(Expression.class);
final Expression rewrittenExp1 = mock(Expression.class);
final Expression rewrittenExp2 = mock(Expression.class);
final GroupBy groupBy = new GroupBy(
location,
ImmutableList.of(groupingElement1, groupingElement2)
ImmutableList.of(exp1, exp2)
);
when(mockRewriter.apply(groupingElement1, context)).thenReturn(rewrittenGroupingElement1);
when(mockRewriter.apply(groupingElement2, context)).thenReturn(rewrittenGroupingElement2);
when(expressionRewriter.apply(exp1, context)).thenReturn(rewrittenExp1);
when(expressionRewriter.apply(exp2, context)).thenReturn(rewrittenExp2);

// When:
final AstNode rewritten = rewriter.rewrite(groupBy, context);
Expand All @@ -789,32 +787,7 @@ public void shouldRewriteGroupBy() {
equalTo(
new GroupBy(
location,
ImmutableList.of(rewrittenGroupingElement1, rewrittenGroupingElement2)
)
)
);
}

@Test
public void shouldRewriteSimpleGroupBy() {
// Given:
final Expression expression2 = mock(Expression.class);
final Expression rewrittenExpression2 = mock(Expression.class);
final SimpleGroupBy groupBy =
new SimpleGroupBy(location, ImmutableList.of(expression, expression2));
when(expressionRewriter.apply(expression, context)).thenReturn(rewrittenExpression);
when(expressionRewriter.apply(expression2, context)).thenReturn(rewrittenExpression2);

// When:
final AstNode rewritten = rewriter.rewrite(groupBy, context);

// Then:
assertThat(
rewritten,
equalTo(
new SimpleGroupBy(
location,
ImmutableList.of(rewrittenExpression, rewrittenExpression2)
ImmutableList.of(rewrittenExp1, rewrittenExp2)
)
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM TEST (ROWKEY INTEGER KEY, F1 INTEGER, F2 STRING, F3 INTEGER) WITH (KAFKA_TOPIC='test_topic', KEY='f1', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "TEST",
"schema" : "`ROWKEY` INTEGER KEY, `F1` INTEGER, `F2` STRING, `F3` INTEGER",
"keyField" : "F1",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
}
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE TABLE OUTPUT AS SELECT\n TEST.F1 F1,\n TEST.F2 F2,\n COUNT(*) KSQL_COL_0\nFROM TEST TEST\nGROUP BY TEST.F3, TEST.F2, TEST.F1\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createTableV1",
"sourceName" : "OUTPUT",
"schema" : "`ROWKEY` STRING KEY, `F1` INTEGER, `F2` STRING, `KSQL_COL_0` BIGINT",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
}
},
"queryPlan" : {
"sources" : [ "TEST" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "tableSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "tableSelectV1",
"properties" : {
"queryContext" : "Aggregate/Project"
},
"source" : {
"@type" : "streamAggregateV1",
"properties" : {
"queryContext" : "Aggregate/Aggregate"
},
"source" : {
"@type" : "streamGroupByV1",
"properties" : {
"queryContext" : "Aggregate/GroupBy"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Aggregate/Prepare"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`ROWKEY` INTEGER KEY, `F1` INTEGER, `F2` STRING, `F3` INTEGER"
},
"selectExpressions" : [ "F1 AS F1", "F2 AS F2", "ROWTIME AS ROWTIME", "F3 AS F3" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"groupByExpressions" : [ "F3", "F2", "F1" ]
},
"internalFormats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"nonAggregateColumns" : [ "F1", "F2", "ROWTIME", "F3" ],
"aggregationFunctions" : [ "COUNT(ROWTIME)" ]
},
"selectExpressions" : [ "F1 AS F1", "F2 AS F2", "KSQL_AGG_VARIABLE_0 AS KSQL_COL_0" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CTAS_OUTPUT_0"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"ksql.transient.prefix" : "transient_",
"ksql.persistence.wrap.single.values" : "true",
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.schema.registry.url" : "",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.metric.reporters" : "",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.authentication.plugin.class" : null,
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.metrics.extension" : null,
"ksql.streams.topology.optimization" : "all",
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.metrics.tags.custom" : "",
"ksql.pull.queries.enable" : "true",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.any.key.name.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.query.persistent.active.limit" : "2147483647"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"version" : "6.0.0",
"timestamp" : 1585599970057,
"schemas" : {
"CTAS_OUTPUT_0.KsqlTopic.Source" : "STRUCT<F1 INT, F2 VARCHAR, F3 INT> NOT NULL",
"CTAS_OUTPUT_0.Aggregate.GroupBy" : "STRUCT<F1 INT, F2 VARCHAR, ROWTIME BIGINT, F3 INT> NOT NULL",
"CTAS_OUTPUT_0.Aggregate.Aggregate.Materialize" : "STRUCT<F1 INT, F2 VARCHAR, ROWTIME BIGINT, F3 INT, KSQL_AGG_VARIABLE_0 BIGINT> NOT NULL",
"CTAS_OUTPUT_0.OUTPUT" : "STRUCT<F1 INT, F2 VARCHAR, KSQL_COL_0 BIGINT> NOT NULL"
},
"inputs" : [ {
"topic" : "test_topic",
"key" : 1,
"value" : "1,a,-1"
}, {
"topic" : "test_topic",
"key" : 2,
"value" : "2,b,-2"
}, {
"topic" : "test_topic",
"key" : 1,
"value" : "1,a,-1"
}, {
"topic" : "test_topic",
"key" : 2,
"value" : "2,b,-2"
}, {
"topic" : "test_topic",
"key" : 3,
"value" : "3,a,-3"
} ],
"outputs" : [ {
"topic" : "OUTPUT",
"key" : "-1|+|a|+|1",
"value" : "1,a,1"
}, {
"topic" : "OUTPUT",
"key" : "-2|+|b|+|2",
"value" : "2,b,1"
}, {
"topic" : "OUTPUT",
"key" : "-1|+|a|+|1",
"value" : "1,a,2"
}, {
"topic" : "OUTPUT",
"key" : "-2|+|b|+|2",
"value" : "2,b,2"
}, {
"topic" : "OUTPUT",
"key" : "-3|+|a|+|3",
"value" : "3,a,1"
} ],
"postConditions" : {
"sources" : [ {
"name" : "OUTPUT",
"type" : "table",
"schema" : "ROWKEY STRING KEY, F1 INT, F2 STRING, KSQL_COL_0 BIGINT"
} ]
}
}
Loading