-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Partition by prep #4781
Partition by prep #4781
Conversation
Prep for confluentinc#4749. This commit changes the way the engine resolves '*' in a projection, e.g. `SELECT * FROM X;`. Previously, the `Analyzer` was responsible for expanding the `*` into the set of columns of each source. However, this code was getting complicated and would be much more complicated once the key column can have any name, (confluentinc#3536). The complexity comes about because the `Analyzer` would need to determine the presence of joins, group bys, partition bys, etc, which can effect how `*` is resolved. This logic duplicates the logic in the `LogicalPlanner` and `PlanNode` sub-classes. With this commit sees the logical plan and planner being responsible for resolving any `*` in the projection. This is achieved by asking the parent of the projection node to resolve the `*` into the set of columns. Parent node types that do not know how to resolve the `*`, e.g. `FilterNode`, forward requests to their parents. In this way, the resolution request ripples up the logical plan until it reaches a `DataSourceNode`, which can resolve the `*` into a list of columns. `JoinNode` knows how forward `*`, `left.*` and `right.*` appropriately. Previously, the list of `SelectExpressions` was passed down from parent `PlanNode` to child, allowing some nodes to rewrite the expressions. For example, `FlatMapNode` would rewrite any expression involving a TableFunction to use the internal names like `KSQL_SYNTH_0`. With this commit this is no longer necessary. Instead, when building a projection node the planner asks it's parent node to resolve any selects, allowing the parent to perform any rewrite. At the moment, the planner is still responsible for much of this work. In the future, this logic may move into the plan itself. However, such a change would increase the complexity of this commit.
public AggregateAnalysisResult analyze( | ||
final ImmutableAnalysis analysis, | ||
final List<SelectExpression> finalProjection | ||
) { | ||
if (analysis.getGroupByExpressions().isEmpty()) { | ||
throw new IllegalArgumentException("Not an aggregate query"); | ||
} | ||
|
||
final Context context = new Context(analysis); | ||
|
||
finalProjection.stream() | ||
.map(SelectExpression::getExpression) | ||
.forEach(exp -> processSelect(exp, context)); | ||
|
||
analysis.getWhereExpression() | ||
.ifPresent(exp -> processWhere(exp, context)); | ||
|
||
analysis.getGroupByExpressions() | ||
.forEach(exp -> processGroupBy(exp, context)); | ||
|
||
analysis.getHavingExpression() | ||
.ifPresent(exp -> processHaving(exp, context)); | ||
|
||
enforceAggregateRules(context); | ||
|
||
return context.aggregateAnalysis; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically moved from QueryAnalyzer
.
private static void enforceAggregateRules( | ||
final Context context | ||
) { | ||
if (context.aggregateAnalysis.getAggregateFunctions().isEmpty()) { | ||
throw new KsqlException( | ||
"GROUP BY requires columns using aggregate functions in SELECT clause."); | ||
} | ||
|
||
final Set<Expression> groupByExprs = getGroupByExpressions(context.analysis); | ||
|
||
final List<String> unmatchedSelects = context.aggregateAnalysis | ||
.getNonAggregateSelectExpressions() | ||
.entrySet() | ||
.stream() | ||
// Remove any that exactly match a group by expression: | ||
.filter(e -> !groupByExprs.contains(e.getKey())) | ||
// Remove any that are constants, | ||
// or expressions where all params exactly match a group by expression: | ||
.filter(e -> !Sets.difference(e.getValue(), groupByExprs).isEmpty()) | ||
.map(Map.Entry::getKey) | ||
.map(Expression::toString) | ||
.sorted() | ||
.collect(Collectors.toList()); | ||
|
||
if (!unmatchedSelects.isEmpty()) { | ||
throw new KsqlException( | ||
"Non-aggregate SELECT expression(s) not part of GROUP BY: " + unmatchedSelects); | ||
} | ||
|
||
final SetView<ColumnReferenceExp> unmatchedSelectsAgg = Sets | ||
.difference(context.aggregateAnalysis.getAggregateSelectFields(), groupByExprs); | ||
if (!unmatchedSelectsAgg.isEmpty()) { | ||
throw new KsqlException( | ||
"Column used in aggregate SELECT expression(s) " | ||
+ "outside of aggregate functions not part of GROUP BY: " + unmatchedSelectsAgg); | ||
} | ||
|
||
final Set<ColumnReferenceExp> havingColumns = context.aggregateAnalysis | ||
.getNonAggregateHavingFields().stream() | ||
.map(ref -> new UnqualifiedColumnReferenceExp(ref.getColumnName())) | ||
.collect(Collectors.toSet()); | ||
|
||
final Set<ColumnReferenceExp> havingOnly = Sets.difference(havingColumns, groupByExprs); | ||
if (!havingOnly.isEmpty()) { | ||
throw new KsqlException( | ||
"Non-aggregate HAVING expression not part of GROUP BY: " + havingOnly); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from QueryAnalyzer
private static Set<Expression> getGroupByExpressions( | ||
final ImmutableAnalysis analysis | ||
) { | ||
if (!analysis.getWindowExpression().isPresent()) { | ||
return ImmutableSet.copyOf(analysis.getGroupByExpressions()); | ||
} | ||
|
||
// Add in window bounds columns as implicit group by columns: | ||
final Set<UnqualifiedColumnReferenceExp> windowBoundColumnRefs = | ||
SchemaUtil.windowBoundsColumnNames().stream() | ||
.map(UnqualifiedColumnReferenceExp::new) | ||
.collect(Collectors.toSet()); | ||
|
||
return ImmutableSet.<Expression>builder() | ||
.addAll(analysis.getGroupByExpressions()) | ||
.addAll(windowBoundColumnRefs) | ||
.build(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from QueryAnalyzer
@@ -89,8 +88,8 @@ ResultMaterialization getResultMaterialization() { | |||
return resultMaterialization; | |||
} | |||
|
|||
void addSelectItem(final Expression expression, final ColumnName alias) { | |||
selectExpressions.add(SelectExpression.of(alias, expression)); | |||
void addSelectItem(final SelectItem selectItem) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we captured the resolved select items - now we just capture the unresolved SelectItem
private void setSerdeOptions(final Sink sink) { | ||
final List<ColumnName> columnNames = getColumnNames(); | ||
|
||
final Format valueFormat = getValueFormat(sink); | ||
|
||
final Set<SerdeOption> serdeOptions = serdeOptionsSupplier.build( | ||
columnNames, | ||
valueFormat, | ||
sink.getProperties().getWrapSingleValues(), | ||
defaultSerdeOptions | ||
); | ||
|
||
analysis.setSerdeOptions(serdeOptions); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To set the serde options we need to know the number of columns, which requires any *
to be resolved. Hence this code has now moved to later LogicalPlanner
once any *
has been resolved.
@@ -97,177 +78,4 @@ public Analysis analyze( | |||
|
|||
return analysis; | |||
} | |||
|
|||
public AggregateAnalysis analyzeAggregate(final Query query, final Analysis analysis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Analysis of aggregates is now delayed until the creation of an AggregateNode
in the LogicalPlanner
. This is necessary as this code needs any *
s to have been resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the analysis out of the analyzer seems less than ideal... I'm trying to wrap my head around what *
means in the context of an aggregation. other than COUNT(*)
and other aggregate functions that have the *
as the argument - how can you even use *
in an aggregate? If it doesn't make sense outside of function calls, can we leverage that to keep the analysis in the analyzer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is just a continuation of a trend Tim started with UDTFs. Trying to extract everything up front in the analyzer, and then using this to build the logical plan, is probably not the best approach. It often involves duplicating logic between the analyser and plan, and if we just delay the analysis till we're building the plan we can take advantage of being able to ask parent nodes for things, i.e. we can build something more object oriented and take advantage of polymorphism, rather than if
statements.
With regards to the resolution of *
for GROUP BY: It's valid to have SELECT * FROM X GROUP BY ?,?,?
just so long as the GROUP BY contains all the columns in the source. There's a QTT test covering this somewhere. But we don't know what columns exist in the source until we've built the plan: the source could be a join for example. Hence we now delay this analysis till later.
private static Stream<ColumnName> orderColumns( | ||
final List<Column> columns, | ||
final LogicalSchema schema | ||
) { | ||
// When doing a `select *` system and key columns should be at the front of the column list | ||
// but are added at the back during processing for performance reasons. | ||
// Switch them around here: | ||
final Map<Boolean, List<Column>> partitioned = columns.stream().collect(Collectors | ||
.groupingBy(c -> SchemaUtil.isSystemColumn(c.name()) || schema.isKeyColumn(c.name()))); | ||
|
||
final List<Column> all = partitioned.get(true); | ||
all.addAll(partitioned.get(false)); | ||
return all.stream().map(Column::name); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from Analyzer
} | ||
|
||
static boolean isValidComparison( | ||
static void isValidComparison( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a bug in here when moving tests to QTT: Comparisons to the NULL
type result in SqlType
of null
, which caused an NPE in this method!
Unfortunately, fixing it caused the complexity of this method to go over the checkstyle rule, so had to refactor.
if (someStars && select.getSelectItems().size() != 1) { | ||
throw new KsqlException("Pull queries only support wildcards in the projects " | ||
+ "if they are the only expression"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually means KSQL rejects any pull query that uses a mix of *
and explicit column names in a pull query. Previously, depending on the order, the explicit columns were ignored or a weird error was returned. At least now a meaningful error is returned.
This is a follow on task from confluentinc#4781. While working on confluentinc#4781 I realized it the complex logic in `enforceAggregateRules` could be simplified by applying the rules along the way, rather than capturing all the information along the way and then trying to munge it all together at the end. This simplifies the code and moves several fields from `MutableAggregateAnalysis` to `AggregateAnalyzer.AggAnalyzer`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM, I took as deep a look as I could but for the most part I think the approach is reasonable. I'm not sure I understand the boundaries between the analyzer and the logical planner anymore... but that was a problem before this change as well.
Anyway, feel free to ship - mostly clarification questions
@@ -97,177 +78,4 @@ public Analysis analyze( | |||
|
|||
return analysis; | |||
} | |||
|
|||
public AggregateAnalysis analyzeAggregate(final Query query, final Analysis analysis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the analysis out of the analyzer seems less than ideal... I'm trying to wrap my head around what *
means in the context of an aggregation. other than COUNT(*)
and other aggregate functions that have the *
as the argument - how can you even use *
in an aggregate? If it doesn't make sense outside of function calls, can we leverage that to keep the analysis in the analyzer?
return getSources().stream() | ||
.filter(s -> !sourceName.isPresent() || sourceName.equals(s.getSourceName())) | ||
.flatMap(s -> s.resolveSelectStar(sourceName, false)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at what point do we change the names of the columns to prepend <SOURCE>_
and does it make sense to do it here instead? e.g.
ksql> CREATE STREAM s (id VARCHAR, val VARCHAR) WITH (kafka_topic='s', value_format='json', partitions=1);
ksql> CREATE TABLE t (id VARCHAR, val VARCHAR) with (kafka_topic='t', value_format='json', partitions=1, key='id');
ksql> CREATE STREAM j AS SELECT * FROM s LEFT JOIN t ON s.id = t.id;
ksql> SELECT * FROM J EMIT CHANGES;
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|ROWTIME|ROWKEY |S_ROWTI|S_ROWKE|S_ID |S_VAL |T_ROWTI|T_ROWKE|T_ID |T_VAL |
| | |ME |Y | | |ME |Y | | |
+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai made a change to the LogicalPlanner
to alias the join columns in a projection node, prior to the join node. There's probably scope for cleaning some of that work up a little once the dust has settled from recent changes. However:
1). I actually probably makes sense to handle the aliases in the rewriter the engine runs post-parse/pre-execute, rather than in this method.
2) It's out of scope for this PR.
ksqldb-engine/src/main/java/io/confluent/ksql/planner/plan/FlatMapNode.java
Show resolved
Hide resolved
ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
Outdated
Show resolved
Hide resolved
Conflicting files ksqldb-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java
* chore: refactor AggregateAnalyzer This is a follow on task from #4781. While working on #4781 I realized it the complex logic in `enforceAggregateRules` could be simplified by applying the rules along the way, rather than capturing all the information along the way and then trying to munge it all together at the end. This simplifies the code and moves several fields from `MutableAggregateAnalysis` to `AggregateAnalyzer.AggAnalyzer`. Co-authored-by: Andy Coates <[email protected]>
Description
refactor: projection expression handling
Prep for #4749.
This commit changes the way the engine resolves '*' in a projection, e.g.
SELECT * FROM X;
.Previously, the
Analyzer
was responsible for expanding the*
into the set of columns of each source. However, this code was getting complicated and would be much more complicated once the key column can have any name, (#3536). The complexity comes about because theAnalyzer
would need to determine the presence of joins, group bys, partition bys, etc, which can effect how*
is resolved. This logic duplicates the logic in theLogicalPlanner
andPlanNode
sub-classes.With this commit sees the logical plan and planner being responsible for resolving any
*
in the projection. This is achieved by asking the parent of the projection node to resolve the*
into the set of columns. Parent node types that do not know how to resolve the*
, e.g.FilterNode
, forward requests to their parents. In this way, the resolution request ripples up the logical plan until it reaches aDataSourceNode
, which can resolve the*
into a list of columns.JoinNode
knows how forward*
,left.*
andright.*
appropriately.Previously, the list of
SelectExpressions
was passed down from parentPlanNode
to child, allowing some nodes to rewrite the expressions. For example,FlatMapNode
would rewrite any expression involving a TableFunction to use the internal names likeKSQL_SYNTH_0
.With this commit this is no longer necessary. Instead, when building a projection node the planner asks it's parent node to resolve any selects, allowing the parent to perform any rewrite.
At the moment, the planner is still responsible for much of this work. In the future, this logic may move into the plan itself. However, such a change would increase the complexity of this commit.
How to review.
AggregateAnalysis
interface.ProjectNode.resolveSelectStar
and adds unit tests for the changes to thePlanNode
subclasses.The main code changes are in the first commit. There's a fair amount of lines changes as this is not a small change. However, at lot of the time its just logic being moved from one place to another. Specifically, the resolution of
*
and aggregate analysis is delayed until needed, i.e. we creating the projection or aggregate node. So worth checking this out.Also, checkout
resolveSelect
andresolveSelectStar
inPlanNode
and its sub-classes.Testing done
I've moved tests out of
AnalyzerFunctionalTest
,QueryAnalyzerFunctionalTest
andCodeGenRunnerTest
into JSON based QTT tests, which is the preferred way to test such functionality.Reviewer checklist