Skip to content

Commit

Permalink
Partial support for arbitrary key column names. (#4701)
Browse files Browse the repository at this point in the history
* chore: partial support for arbitrarily named key columns

Partial fix for: #3536

First part of supporting key column names other than `ROWKEY`.

With this initial pass you can now name your key columns anything you want in your `CREATE TABLE` and `CREATE STREAM` statements, e.g.

```sql
CREATE STREAM S (ID INT KEY, NAME STRING) WITH (...);
```

Any GROUP BY, PARTITION BY or JOIN on the key column results any created data source having a key column with a matching name, e.g.

```sql
-- schema of T: ID INT KEY, COUNT BIGINT
CREATE TABLE T AS SELECT COUNT() AS COUNT FROM S GROUP BY ID;
```

Pull and push queries work as expected and quoted identifiers work too.

However, this functionality is not complete yet.
Hence it is guarded by the `ksql.any.key.name.enabled` feature flag, which defaults to off.
The following big ticket items are remaining:

* PARTITION BY a single value column should result in a stream with the key column that matches the value column name.
* GROUP BY a single value column should result in a table with the key column that matches the value column name.
* JOIN on a single value column should  result in a stream/table with the key column that matches the value column name.

This additional work will be tracked under the same ticket, e.g. #3536
  • Loading branch information
big-andy-coates authored Mar 5, 2020
1 parent 5c0cb0e commit eaa8f23
Show file tree
Hide file tree
Showing 360 changed files with 38,278 additions and 641 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMap1;
import io.confluent.ksql.util.HandlerMaps.Handler1;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.TabularRow;
import java.io.Closeable;
import java.io.File;
Expand Down Expand Up @@ -454,11 +455,11 @@ private static String formatFieldType(
final Optional<WindowType> windowType,
final String keyField
) {
if (field.getName().equals("ROWTIME")) {
if (field.getName().equals(SchemaUtil.ROWTIME_NAME.text())) {
return String.format("%-16s %s", field.getSchema().toTypeString(), "(system)");
}

if (field.getName().equals("ROWKEY")) {
if (field.getName().equals(SchemaUtil.ROWKEY_NAME.text())) {
final String wt = windowType
.map(v -> " (Window type: " + v + ")")
.orElse("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ public class KsqlConfig extends AbstractConfig {
+ "behavior, and instead throw an exception to ensure that no data is missed, set "
+ "ksql.timestamp.skip.invalid to true.";

public static final String KSQL_ANY_KEY_NAME_ENABLED = "ksql.any.key.name.enabled";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -623,6 +625,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.MEDIUM,
KSQL_TIMESTAMP_THROW_ON_INVALID_DOC
)
.define(
KSQL_ANY_KEY_NAME_ENABLED,
Type.BOOLEAN,
false,
Importance.LOW,
"Feature flag for removing restriction on key names - WIP, do not enable."
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ private void visitSelectStar(final AllColumns allColumns) {
// See https://github.com/confluentinc/ksql/issues/3731 for more info
final List<Column> valueColumns = persistent && !analysis.isJoin()
? schema.value()
: systemColumnsToTheFront(schema.withMetaAndKeyColsInValue(windowed).value());
: orderColumns(schema.withMetaAndKeyColsInValue(windowed).value(), schema);

for (final Column column : valueColumns) {

Expand All @@ -596,12 +596,15 @@ private void visitSelectStar(final AllColumns allColumns) {
}
}

private List<Column> systemColumnsToTheFront(final List<Column> columns) {
// When doing a `select *` the system columns should be at the front of the column list
private List<Column> orderColumns(
final List<Column> columns,
final LogicalSchema schema
) {
// When doing a `select *` system and key columns should be at the front of the column list
// but are added at the back during processing for performance reasons.
// Switch them around here:
final Map<Boolean, List<Column>> partitioned = columns.stream()
.collect(Collectors.groupingBy(c -> SchemaUtil.isSystemColumn(c.name())));
final Map<Boolean, List<Column>> partitioned = columns.stream().collect(Collectors
.groupingBy(c -> SchemaUtil.isSystemColumn(c.name()) || schema.isKeyColumn(c.name())));

final List<Column> all = partitioned.get(true);
all.addAll(partitioned.get(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public CreateStreamCommand createStreamCommand(
) {
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
final LogicalSchema schema = buildSchema(statement.getElements(), ksqlConfig);
final Optional<ColumnName> keyFieldName = buildKeyFieldName(statement, schema);
final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
Expand Down Expand Up @@ -121,7 +121,7 @@ public CreateTableCommand createTableCommand(
) {
final SourceName sourceName = statement.getName();
final KsqlTopic topic = buildTopic(statement.getProperties(), serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
final LogicalSchema schema = buildSchema(statement.getElements(), ksqlConfig);
final Optional<ColumnName> keyFieldName = buildKeyFieldName(statement, schema);
final Optional<TimestampColumn> timestampColumn = buildTimestampColumn(
ksqlConfig,
Expand Down Expand Up @@ -165,7 +165,10 @@ private static Optional<ColumnName> buildKeyFieldName(
}
}

private static LogicalSchema buildSchema(final TableElements tableElements) {
private static LogicalSchema buildSchema(
final TableElements tableElements,
final KsqlConfig ksqlConfig
) {
if (Iterables.isEmpty(tableElements)) {
throw new KsqlException("The statement does not define any columns.");
}
Expand All @@ -177,14 +180,21 @@ private static LogicalSchema buildSchema(final TableElements tableElements) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name.");
}

if (e.getNamespace() == Namespace.KEY) {
if (!isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is an invalid KEY column name. "
+ "KSQL currently only supports KEY columns named ROWKEY.");
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ANY_KEY_NAME_ENABLED)) {
if (isRowKey && e.getNamespace() != Namespace.KEY) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name. "
+ "It can only be used for KEY columns.");
}
} else {
if (e.getNamespace() == Namespace.KEY) {
if (!isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is an invalid KEY column name. "
+ "KSQL currently only supports KEY columns named ROWKEY.");
}
} else if (isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name. "
+ "It can only be used for KEY columns.");
}
} else if (isRowKey) {
throw new KsqlException("'" + e.getName().text() + "' is a reserved column name. "
+ "It can only be used for KEY columns.");
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import io.confluent.ksql.rest.SessionProperties;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.DefaultSqlValueCoercer;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SchemaConverters;
Expand All @@ -61,7 +60,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -142,6 +140,7 @@ private InsertValuesExecutor(
this.valueSerdeFactory = Objects.requireNonNull(valueSerdeFactory, "valueSerdeFactory");
}

@SuppressWarnings("unused") // Part of required API.
public void execute(
final ConfiguredStatement<InsertValues> statement,
final SessionProperties sessionProperties,
Expand Down Expand Up @@ -175,7 +174,7 @@ public void execute(
}
}

private DataSource getDataSource(
private static DataSource getDataSource(
final KsqlConfig ksqlConfig,
final MetaStore metaStore,
final InsertValues insertValues
Expand Down Expand Up @@ -266,11 +265,23 @@ private RowData extractRow(
final Map<ColumnName, Object> values = resolveValues(
insertValues, columns, schema, functionRegistry, config);

handleExplicitKeyField(values, dataSource.getKeyField());
handleExplicitKeyField(
values,
dataSource.getKeyField(),
Iterables.getOnlyElement(dataSource.getSchema().key())
);

if (dataSource.getDataSourceType() == DataSourceType.KTABLE) {
final String noValue = dataSource.getSchema().key().stream()
.map(Column::name)
.filter(colName -> !values.containsKey(colName))
.map(ColumnName::text)
.collect(Collectors.joining(", "));

if (dataSource.getDataSourceType() == DataSourceType.KTABLE
&& values.get(SchemaUtil.ROWKEY_NAME) == null) {
throw new KsqlException("Value for ROWKEY is required for tables");
if (!noValue.isEmpty()) {
throw new KsqlException("Value for primary key column(s) "
+ noValue + " is required for tables");
}
}

final long ts = (long) values.getOrDefault(SchemaUtil.ROWTIME_NAME, clock.getAsLong());
Expand Down Expand Up @@ -357,26 +368,29 @@ private static Map<ColumnName, Object> resolveValues(

private static void handleExplicitKeyField(
final Map<ColumnName, Object> values,
final KeyField keyField
final KeyField keyField,
final Column keyColumn
) {
final Optional<ColumnName> keyFieldName = keyField.ref();
if (keyFieldName.isPresent()) {
final ColumnName key = keyFieldName.get();
final Object keyValue = values.get(key);
final Object rowKeyValue = values.get(SchemaUtil.ROWKEY_NAME);

if (keyValue != null ^ rowKeyValue != null) {
if (keyValue == null) {
values.put(key, rowKeyValue);
// key column: the key column in the source's schema.
// key field: the column identified in the WITH clause as being an alias to the key column.

keyField.ref().ifPresent(keyFieldName -> {
final ColumnName keyColumnName = keyColumn.name();
final Object keyFieldValue = values.get(keyFieldName);
final Object keyColumnValue = values.get(keyColumnName);

if (keyFieldValue != null ^ keyColumnValue != null) {
if (keyFieldValue == null) {
values.put(keyFieldName, keyColumnValue);
} else {
values.put(SchemaUtil.ROWKEY_NAME, keyValue);
values.put(keyColumnName, keyFieldValue);
}
} else if (keyValue != null && !Objects.equals(keyValue, rowKeyValue)) {
throw new KsqlException(String.format(
"Expected ROWKEY and %s to match but got %s and %s respectively.",
key.toString(FormatOptions.noEscape()), rowKeyValue, keyValue));
} else if (keyFieldValue != null && !Objects.equals(keyFieldValue, keyColumnValue)) {
throw new KsqlException(
"Expected " + keyColumnName.text() + " and " + keyFieldName.text() + " to match "
+ "but got " + keyColumnValue + " and " + keyFieldValue + " respectively.");
}
}
});
}

private static SqlType columnType(final ColumnName column, final LogicalSchema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.planner.plan.RepartitionNode;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.Column.Namespace;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
Expand Down Expand Up @@ -173,8 +174,11 @@ private AggregateNode buildAggregateNode(final PlanNode sourcePlanNode) {
? groupByExps.get(0)
: null;

final Optional<ColumnName> keyFieldName = getSelectAliasMatching((expression, alias) ->
expression.equals(groupBy) && !SchemaUtil.isSystemColumn(alias),
final Optional<ColumnName> keyFieldName = getSelectAliasMatching(
(expression, alias) ->
expression.equals(groupBy)
&& !SchemaUtil.isSystemColumn(alias)
&& !schema.isKeyColumn(alias),
sourcePlanNode.getSelectExpressions());

return new AggregateNode(
Expand Down Expand Up @@ -389,9 +393,7 @@ private LogicalSchema buildProjectionSchema(
final Builder builder = LogicalSchema.builder()
.withRowTime();

final List<Column> keyColumns = schema.key();

builder.keyColumns(keyColumns);
builder.keyColumns(schema.key());

for (int i = 0; i < projection.size(); i++) {
final SelectExpression select = projection.get(i);
Expand All @@ -409,26 +411,36 @@ private LogicalSchema buildAggregateSchema(
final PlanNode sourcePlanNode,
final List<Expression> groupByExps
) {
final LogicalSchema sourceSchema = sourcePlanNode.getSchema();

final ColumnName keyName;
final SqlType keyType;
if (groupByExps.size() != 1) {
keyName = SchemaUtil.ROWKEY_NAME;
keyType = SqlTypes.STRING;
} else {
final Expression expression = groupByExps.get(0);

keyName = exactlyMatchesKeyColumns(expression, sourceSchema)
? ((ColumnReferenceExp) expression).getColumnName()
: SchemaUtil.ROWKEY_NAME;

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourcePlanNode.getSchema(), functionRegistry);
new ExpressionTypeManager(sourceSchema, functionRegistry);

keyType = typeManager.getExpressionSqlType(groupByExps.get(0));
keyType = typeManager.getExpressionSqlType(expression);
}

final LogicalSchema sourceSchema = buildProjectionSchema(
sourcePlanNode.getSchema()
final LogicalSchema projectionSchema = buildProjectionSchema(
sourceSchema
.withMetaAndKeyColsInValue(analysis.getWindowExpression().isPresent()),
sourcePlanNode.getSelectExpressions()
);

return LogicalSchema.builder()
.withRowTime()
.keyColumn(SchemaUtil.ROWKEY_NAME, keyType)
.valueColumns(sourceSchema.value())
.keyColumn(keyName, keyType)
.valueColumns(projectionSchema.value())
.build();
}

Expand All @@ -438,6 +450,11 @@ private LogicalSchema buildRepartitionedSchema(
) {
final LogicalSchema sourceSchema = sourceNode.getSchema();

if (exactlyMatchesKeyColumns(partitionBy, sourceSchema)) {
// No-op:
return sourceSchema;
}

final ExpressionTypeManager typeManager =
new ExpressionTypeManager(sourceSchema, functionRegistry);

Expand All @@ -450,6 +467,30 @@ private LogicalSchema buildRepartitionedSchema(
.build();
}

private static boolean exactlyMatchesKeyColumns(
final Expression expression,
final LogicalSchema schema
) {
if (schema.key().size() != 1) {
// Currently only support single key column:
return false;
}

if (!(expression instanceof ColumnReferenceExp)) {
// Anything not a column ref can't be a match:
return false;
}

final ColumnName columnName = ((ColumnReferenceExp) expression).getColumnName();

final Namespace ns = schema
.findColumn(columnName)
.map(Column::namespace)
.orElse(Namespace.VALUE);

return ns == Namespace.KEY;
}

private static List<SelectExpression> selectWithPrependAlias(
final SourceName alias,
final LogicalSchema schema
Expand Down
Loading

0 comments on commit eaa8f23

Please sign in to comment.