Skip to content

Commit

Permalink
feat: ungate support for multi-column GROUP BY (#6786)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Queries with GROUP BY clauses that contain multiple grouping expressions now result in multiple key columns, one for each grouping expression, rather than a single key column that is the string-concatenation of the grouping expressions. Note that this new behavior (and breaking change) apply only to new queries; existing queries will continue to run uninterrupted with the previous behavior, even across ksqlDB server upgrades.
  • Loading branch information
vcrfxia authored Dec 17, 2020
1 parent 554aef7 commit 9900623
Show file tree
Hide file tree
Showing 422 changed files with 56,378 additions and 3,347 deletions.
13 changes: 7 additions & 6 deletions docs/developer-guide/ksqldb-reference/create-table-as-select.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ See [Partition Data to Enable Joins](../joins/partition-data.md) for more inform
correctly partition your data for joins.

The primary key of the resulting table is determined by the following rules, in order of priority:
1. if the query has a `GROUP BY`:
1. if the `GROUP BY` is on a single source column reference, the primary key will match the
name, type and contents of the source column.
1. if the `GROUP BY` is any other single expression, the primary key will have a system
1. if the query has a `GROUP BY`, then the resulting number of primary key columns will match the
number of grouping expressions. For each grouping expression:
1. if the grouping expression is a single source column reference, the corresponding primary key
column will match the name, type and contents of the source column.
1. if the grouping expression is a reference to a field within a `STRUCT`-type column, then the
corresponding primary key column will match the name, type, and contents of the `STRUCT` field.
1. if the `GROUP BY` is any other expression, the primary key will have a system
generated name, unless you provide an alias in the projection, and will match the type and
contents of the result of the expression.
1. otherwise, the primary key will have a system generated name, and will be of type `STRING`
and contain the grouping expression concatenated together.
1. if the query has a join see [Join Synthetic Key Columns](../joins/synthetic-keys) for more info.
1. otherwise, the primary key will match the name, unless you provide an alias in the projection,
and type of the source table's primary key.
Expand Down
11 changes: 0 additions & 11 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,6 @@ public class KsqlConfig extends AbstractConfig {
+ "in interactive mode. Once this limit is reached, any further persistent queries will not "
+ "be accepted.";

public static final String KSQL_MULTICOL_KEY_FORMAT_ENABLED = "ksql.multicol.key.format.enabled";
public static final Boolean KSQL_MULTICOL_KEY_FORMAT_ENABLED_DEFAULT = false;
public static final String KSQL_MULTICOL_KEY_FORMAT_ENABLED_DOC =
"Feature flag for multi-column keys";

public static final String KSQL_DEFAULT_KEY_FORMAT_CONFIG = "ksql.persistence.default.format.key";
private static final String KSQL_DEFAULT_KEY_FORMAT_DEFAULT = "KAFKA";
private static final String KSQL_DEFAULT_KEY_FORMAT_DOC =
Expand Down Expand Up @@ -612,12 +607,6 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_SECURITY_EXTENSION_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_SECURITY_EXTENSION_DOC
).define(
KSQL_MULTICOL_KEY_FORMAT_ENABLED,
Type.BOOLEAN,
KSQL_MULTICOL_KEY_FORMAT_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_MULTICOL_KEY_FORMAT_ENABLED_DOC
).define(
KSQL_DEFAULT_KEY_FORMAT_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.FormatFactory;
import io.confluent.ksql.serde.FormatInfo;
import io.confluent.ksql.serde.KeyFormat;
Expand Down Expand Up @@ -643,46 +642,16 @@ private LogicalSchema buildAggregateSchema(

final Builder builder = LogicalSchema.builder();

if (ksqlConfig.getBoolean(KsqlConfig.KSQL_MULTICOL_KEY_FORMAT_ENABLED)) {
final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, metaStore);
final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, metaStore);

for (final Expression expression : groupByExps) {
final SqlType keyType = typeManager.getExpressionSqlType(expression);
final ColumnName keyName = selectResolver.apply(expression)
.orElseGet(() -> expression instanceof ColumnReferenceExp
? ((ColumnReferenceExp) expression).getColumnName()
: ColumnNames.uniqueAliasFor(expression, sourceSchema)
);

builder.keyColumn(keyName, keyType);
}
} else {
final ColumnName keyName;
final SqlType keyType;

if (groupByExps.size() != 1) {
keyType = SqlTypes.STRING;

keyName = ColumnNames.nextKsqlColAlias(
sourceSchema,
LogicalSchema.builder()
.valueColumns(projectionSchema.value())
.build()
);
} else {
final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, metaStore);

final Expression expression = groupByExps.get(0);

keyType = typeManager.getExpressionSqlType(expression);
keyName = selectResolver.apply(expression)
.orElseGet(() -> expression instanceof ColumnReferenceExp
? ((ColumnReferenceExp) expression).getColumnName()
: ColumnNames.uniqueAliasFor(expression, sourceSchema)
);
}
for (final Expression expression : groupByExps) {
final SqlType keyType = typeManager.getExpressionSqlType(expression);
final ColumnName keyName = selectResolver.apply(expression)
.orElseGet(() -> expression instanceof ColumnReferenceExp
? ((ColumnReferenceExp) expression).getColumnName()
: ColumnNames.uniqueAliasFor(expression, sourceSchema)
);

builder.keyColumn(keyName, keyType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.JoinType;
import io.confluent.ksql.execution.plan.KGroupedStreamHolder;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.StreamFilter;
import io.confluent.ksql.execution.plan.StreamFlatMap;
import io.confluent.ksql.execution.plan.StreamGroupBy;
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.execution.plan.StreamSelect;
import io.confluent.ksql.execution.plan.StreamSink;
Expand Down Expand Up @@ -371,32 +371,16 @@ public SchemaKGroupedStream groupBy(
rekeyedFormat = keyFormat;
}

final boolean isSingleKey;
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_MULTICOL_KEY_FORMAT_ENABLED)) {
isSingleKey = groupByExpressions.size() == 1;
} else {
isSingleKey = true;
}

final KeyFormat sanitizedKeyFormat = SerdeFeaturesFactory.sanitizeKeyFormat(
rekeyedFormat, isSingleKey);

final ExecutionStep<KGroupedStreamHolder> source;
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_MULTICOL_KEY_FORMAT_ENABLED)) {
source = ExecutionStepFactory.streamGroupBy(
contextStacker,
sourceStep,
InternalFormats.of(sanitizedKeyFormat, valueFormat),
groupByExpressions
);
} else {
source = ExecutionStepFactory.streamGroupByV1(
contextStacker,
sourceStep,
InternalFormats.of(sanitizedKeyFormat, valueFormat),
groupByExpressions
);
}
rekeyedFormat,
groupByExpressions.size() == 1
);
final StreamGroupBy<K> source = ExecutionStepFactory.streamGroupBy(
contextStacker,
sourceStep,
InternalFormats.of(sanitizedKeyFormat, valueFormat),
groupByExpressions
);

return new SchemaKGroupedStream(
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.JoinType;
import io.confluent.ksql.execution.plan.KGroupedTableHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.plan.SelectExpression;
import io.confluent.ksql.execution.plan.TableFilter;
import io.confluent.ksql.execution.plan.TableGroupBy;
import io.confluent.ksql.execution.plan.TableSelect;
import io.confluent.ksql.execution.plan.TableSink;
import io.confluent.ksql.execution.plan.TableSuppress;
Expand Down Expand Up @@ -221,37 +221,20 @@ public SchemaKGroupedTable groupBy(
final List<Expression> groupByExpressions,
final Stacker contextStacker
) {
final boolean isSingleKey;
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_MULTICOL_KEY_FORMAT_ENABLED)) {
isSingleKey = groupByExpressions.size() == 1;
} else {
isSingleKey = true;
}

// Since tables must have a key, we know that the keyFormat is both
// not NONE and has at least one column; this allows us to inherit
// the key format directly (as opposed to the logic in SchemaKStream)
final KeyFormat groupedKeyFormat = SerdeFeaturesFactory.sanitizeKeyFormat(
KeyFormat.nonWindowed(keyFormat.getFormatInfo(), keyFormat.getFeatures()),
isSingleKey
groupByExpressions.size() == 1
);

final ExecutionStep<KGroupedTableHolder> step;
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_MULTICOL_KEY_FORMAT_ENABLED)) {
step = ExecutionStepFactory.tableGroupBy(
contextStacker,
sourceTableStep,
InternalFormats.of(groupedKeyFormat, valueFormat),
groupByExpressions
);
} else {
step = ExecutionStepFactory.tableGroupByV1(
contextStacker,
sourceTableStep,
InternalFormats.of(groupedKeyFormat, valueFormat),
groupByExpressions
);
}
final TableGroupBy<K> step = ExecutionStepFactory.tableGroupBy(
contextStacker,
sourceTableStep,
InternalFormats.of(groupedKeyFormat, valueFormat),
groupByExpressions
);

return new SchemaKGroupedTable(
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public void shouldBuildStepForGroupBy() {
assertThat(
groupedSchemaKStream.getSourceStep(),
equalTo(
ExecutionStepFactory.streamGroupByV1(
ExecutionStepFactory.streamGroupBy(
childContextStacker,
initialSchemaKStream.getSourceStep(),
Formats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public void shouldBuildStepForGroupBy() {
assertThat(
groupedSchemaKTable.getSourceTableStep(),
equalTo(
ExecutionStepFactory.tableGroupByV1(
ExecutionStepFactory.tableGroupBy(
childContextStacker,
initialSchemaKTable.getSourceTableStep(),
Formats.of(
Expand Down Expand Up @@ -593,13 +593,13 @@ public void shouldBuildStepForGroupByBasedOnKeySerdeFeatures() {
assertThat(
groupedSchemaKTable.getSourceTableStep(),
equalTo(
ExecutionStepFactory.tableGroupByV1(
ExecutionStepFactory.tableGroupBy(
childContextStacker,
initialSchemaKTable.getSourceTableStep(),
Formats.of(
initialSchemaKTable.keyFormat.getFormatInfo(),
valueFormat.getFormatInfo(),
SerdeFeatures.of(SerdeFeature.UNWRAP_SINGLES),
SerdeFeatures.of(),
SerdeFeatures.of()
),
groupByExpressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import io.confluent.ksql.test.tools.TestCase;
import io.confluent.ksql.test.tools.TestCaseBuilder;
import io.confluent.ksql.test.tools.TopologyAndConfigs;
import io.confluent.ksql.util.KsqlConfig;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public final class PlannedTestUtils {
Expand All @@ -49,8 +47,7 @@ public static boolean isPlannedTestCase(final TestCase testCase) {

public static boolean isNotExcluded(final TestCase testCase) {
// Place temporary logic here to exclude test cases based on feature flags, etc.
final Map<String, Object> props = testCase.properties();
return !(boolean) props.getOrDefault(KsqlConfig.KSQL_MULTICOL_KEY_FORMAT_ENABLED, false);
return true;
}

public static boolean isSamePlan(
Expand Down
Loading

0 comments on commit 9900623

Please sign in to comment.