Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multi-column key declarations #6544

Merged
merged 3 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
import io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
Expand Down Expand Up @@ -312,10 +314,21 @@ protected AstNode visitJoinedSource(final JoinedSource node, final Void context)
final JoinNode.JoinType joinType = getJoinType(node);

final JoinOn joinOn = (JoinOn) node.getCriteria();
final ComparisonExpression comparisonExpression = (ComparisonExpression) joinOn
.getExpression();
final Expression joinExp = joinOn.getExpression();
if (!(joinExp instanceof ComparisonExpression)) {
// add in a special check for multi-column joins so that we can throw
// an even more useful error message
if (joinExp instanceof LogicalBinaryExpression
&& isEqualityJoin(((LogicalBinaryExpression) joinExp).getLeft())
&& isEqualityJoin(((LogicalBinaryExpression) joinExp).getRight())) {
throw new KsqlException("JOINs on multiple conditions are not yet supported: " + joinExp);
}

throw new KsqlException("Unsupported join expression: " + joinExp);
}
final ComparisonExpression comparisonExpression = (ComparisonExpression) joinExp;

if (comparisonExpression.getType() != ComparisonExpression.Type.EQUAL) {
if (!(isEqualityJoin(joinExp))) {
throw new KsqlException("Only equality join criteria is supported.");
}

Expand Down Expand Up @@ -361,6 +374,11 @@ protected AstNode visitJoinedSource(final JoinedSource node, final Void context)
return null;
}

private boolean isEqualityJoin(final Expression exp) {
return exp instanceof ComparisonExpression
&& ((ComparisonExpression) exp).getType() == Type.EQUAL;
}

private void throwOnJoinWithoutSource(
final AliasedDataSource source,
final AliasedDataSource left,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
Expand All @@ -32,6 +31,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.planner.Projection;
import io.confluent.ksql.planner.RequiredColumns;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.services.KafkaTopicClient;
Expand Down Expand Up @@ -133,13 +133,22 @@ void validateKeyPresent(final SourceName sinkName, final Projection projection)
return;
}

final ColumnName keyName = Iterables.getOnlyElement(getSchema().key()).name();

if (!projection.containsExpression(new QualifiedColumnReferenceExp(getAlias(), keyName))
&& !projection.containsExpression(new UnqualifiedColumnReferenceExp(keyName))
) {
throwKeysNotIncludedError(sinkName, "key column", ImmutableList.of(
(ColumnReferenceExp) new UnqualifiedColumnReferenceExp(keyName)));
final List<Column> keys = getSchema().key();

for (final Column keyCol : keys) {
final ColumnName keyName = keyCol.name();
if (!projection.containsExpression(new QualifiedColumnReferenceExp(getAlias(), keyName))
&& !projection.containsExpression(new UnqualifiedColumnReferenceExp(keyName))
) {
throwKeysNotIncludedError(
sinkName,
"key column",
keys.stream()
.map(Column::name)
.map(UnqualifiedColumnReferenceExp::new)
.collect(Collectors.toList())
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@
import io.confluent.ksql.function.udf.AsValue;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.Name;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.parser.tree.SelectItem;
import io.confluent.ksql.planner.Projection;
import io.confluent.ksql.planner.RequiredColumns;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.util.GrammaticalJoiner;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -68,7 +72,7 @@ public FinalProjectNode(
this.schema = result.left;
this.selectExpressions = ImmutableList.copyOf(result.right);

validate();
throwOnEmptyValueOrUnknownColumns();
}

@Override
Expand Down Expand Up @@ -103,7 +107,8 @@ private Pair<LogicalSchema, List<SelectExpression>> build(final MetaStore metaSt
SelectionUtil.buildProjectionSchema(parentSchema, selectExpressions, metaStore);

if (into.isPresent()) {
// Persistent queries have key columns as key columns - so final projection can exclude them:
// Persistent queries have key columns as value columns - final projection can exclude them:
final Map<ColumnName, Set<ColumnName>> seenKeyColumns = new HashMap<>();
selectExpressions.removeIf(se -> {
if (se.getExpression() instanceof UnqualifiedColumnReferenceExp) {
final ColumnName columnName = ((UnqualifiedColumnReferenceExp) se.getExpression())
Expand All @@ -114,10 +119,28 @@ private Pair<LogicalSchema, List<SelectExpression>> build(final MetaStore metaSt
return true;
}

return parentSchema.isKeyColumn(columnName);
if (parentSchema.isKeyColumn(columnName)) {
seenKeyColumns.computeIfAbsent(columnName, k -> new HashSet<>()).add(se.getAlias());
return true;
}
}
return false;
});

for (final Entry<ColumnName, Set<ColumnName>> seenKey : seenKeyColumns.entrySet()) {
if (seenKey.getValue().size() > 1) {
final String keys = GrammaticalJoiner.and().join(
seenKey.getValue().stream().map(Name::text).sorted());
throw new KsqlException("The projection contains a key column (" + seenKey.getKey()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to your PR, but can you help me understand why the validation that key columns are not selected more than once is performed in FinalProjectNode while the validation that key columns are selected is performed in DataSourceNode, rather than performing both checks in the same place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every node "validates" that the key is selected, they just all delegate to the child node that has that information:

// PlanNode.java:L145
  void validateKeyPresent(final SourceName sinkName, final Projection projection) {
    getSources().forEach(s -> s.validateKeyPresent(sinkName, projection));
  }

DataSourceNode, JoinNode and UserRepartitionNode all override this method because those are the nodes that truly "know" whether the key was selected. Imagine we projected the key column with a different name, the schema in the FinalProjectNode would consider that the key - but of course we didn't select that! We selected the original key.

To illustrate, in the example below the project node has a schema [col1 INT PRIMARY KEY] but we didn't select col1 - so it would fail! Of course we could do some magic and "remember" that b.col1 is actually a.id, but why not just delegate that to the node that already tracks that?

CREATE TABLE a (id INT PRIMARY KEY, col1 INT);
CREATE TABLE b AS SELECT id AS col1; 

(Note that it's a little confusing, because there is also VerifiableNode, which AggregateNode, FinalProjectNode and SuppressNode all override - and it is VerifiableNode#validateKeyPresent is what is called at the top level, but these just delegate down to the three source nodes eventually).

why the validation that key columns are not selected more than once is performed in FinalProjectNode

I think this one is clearer why it can't be in DataSourceNode, it's the other way around that might need some justification.


All that being said, I think this can definitely be made clearer going forward. The logical nodes don't follow a very strict abstraction model unfortunately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation -- super helpful!

+ ") more than once, aliased as: "
+ keys + "."
+ System.lineSeparator()
+ "Each key column must only be in the projection once. "
+ "If you intended to copy the key into the value, then consider using the "
+ AsValue.NAME + " function to indicate which key reference should be copied."
);
}
}
}

final LogicalSchema nodeSchema;
Expand All @@ -139,19 +162,9 @@ private Pair<LogicalSchema, List<SelectExpression>> build(final MetaStore metaSt
return Pair.of(nodeSchema, selectExpressions);
}

private void validate() {
private void throwOnEmptyValueOrUnknownColumns() {
final LogicalSchema schema = getSchema();

if (schema.key().size() > 1) {
final String keys = GrammaticalJoiner.and().join(schema.key().stream().map(Column::name));
throw new KsqlException("The projection contains the key column more than once: " + keys + "."
+ System.lineSeparator()
+ "Each key column must only be in the projection once. "
+ "If you intended to copy the key into the value, then consider using the "
+ AsValue.NAME + " function to indicate which key reference should be copied."
);
}

if (schema.value().isEmpty()) {
throw new KsqlException("The projection contains no value columns.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public Stream<ColumnName> resolveSelectStar(
return names;
}

// if we use a synthetic key, we know there's only a single key element
final Column syntheticKey = getOnlyElement(getSchema().key());

return Streams.concat(Stream.of(syntheticKey.name()), names);
Expand Down Expand Up @@ -246,8 +247,8 @@ protected Set<ColumnReferenceExp> validateColumns(
final RequiredColumns updated = noSyntheticKey
? requiredColumns
: requiredColumns.asBuilder()
.remove(new UnqualifiedColumnReferenceExp(getOnlyElement(schema.key()).name()))
.build();
.remove(new UnqualifiedColumnReferenceExp(getOnlyElement(schema.key()).name()))
.build();

final Set<ColumnReferenceExp> leftUnknown = left.validateColumns(updated);
final Set<ColumnReferenceExp> rightUnknown = right.validateColumns(updated);
Expand All @@ -256,6 +257,11 @@ protected Set<ColumnReferenceExp> validateColumns(
}

private ColumnName getKeyColumnName() {
if (getSchema().key().size() > 1) {
throw new KsqlException("JOINs are not supported with multiple key columns: "
+ getSchema().key());
}

return getOnlyElement(getSchema().key()).name();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public Expression getPartitionBy() {

@Override
public Expression resolveSelect(final int idx, final Expression expression) {
// after issuing a PARTITION BY, there will only be one key column because we
// do not support PARTITION BY multiple columns
return partitionBy.equals(expression)
? new UnqualifiedColumnReferenceExp(Iterables.getOnlyElement(getSchema().key()).name())
: expression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public SchemaKStream<Struct> selectKey(
return (SchemaKStream<Struct>) this;
}

if (schema.key().size() > 1) {
// let's throw a better error message in the case of multi-column tables
throw new UnsupportedOperationException("Cannot repartition a TABLE source. If this is "
+ "a join, joins on tables with multiple columns is not yet supported.");
}

throw new UnsupportedOperationException("Cannot repartition a TABLE source. "
+ "If this is a join, make sure that the criteria uses the TABLE's key column "
+ Iterables.getOnlyElement(schema.key()).name().text() + " instead of "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ public static boolean repartitionNeeded(
return true;
}

// this is technically covered by the check below because our syntax does not
// yet support PARTITION BY col1, col2 but we make this explicit for when we
// do end up supporting this (we'll have to change the logic here)
if (schema.key().size() != 1) {
throw new UnsupportedOperationException("logic only supports single key column");
return true;
}

if (partitionBy.size() != schema.key().size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,29 @@ public void shouldThrowOnSelfJoin() {
"Can not join 'TEST1' to 'TEST1': self joins are not yet supported."));
}

@Test
public void shouldThrowOnJoinConditionWithMultipleEqualityExpressions() {
// Given:
final CreateStreamAsSelect createStreamAsSelect = parseSingle(
"CREATE STREAM FOO AS "
+ "SELECT * FROM test1 t1 JOIN test2 t2 ON t1.col0 = t2.col0 AND t1.col0 = t2.col0;"
);

final Query query = createStreamAsSelect.getQuery();

final Analyzer analyzer = new Analyzer(jsonMetaStore, "");

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink()))
);

// Then:
assertThat(e.getMessage(), containsString("JOINs on multiple conditions are not yet supported: " +
"((T1.COL0 = T2.COL0) AND (T1.COL0 = T2.COL0))"));
}

@Test
public void shouldThrowOnNwayJoinWithDuplicateSource() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void shouldLeftJoinOrderAndItems(final String testStreamName,
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(source.getSchema());

final Map<Struct, GenericRow> expectedResults = ImmutableMap.of(
keyBuilder.build("ITEM_1"),
keyBuilder.build("ITEM_1", 0),
genericRow("ORDER_1", 10.0, "home cinema")
);

Expand Down Expand Up @@ -153,7 +153,7 @@ public void shouldInsertLeftJoinOrderAndItems() {
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(source.getSchema());

final Map<Struct, GenericRow> expectedResults = ImmutableMap.of(
keyBuilder.build("ITEM_1"),
keyBuilder.build("ITEM_1", 0),
genericRow("ORDER_1", 10.0, "home cinema")
);

Expand Down Expand Up @@ -200,7 +200,7 @@ public void shouldUseTimeStampFieldFromStream() {
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(source.getSchema());

final Map<Struct, GenericRow> expectedResults = ImmutableMap.of(
keyBuilder.build("ITEM_1"),
keyBuilder.build("ITEM_1", 0),
genericRow("ORDER_1", "home cinema", 1L)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private static void produceInitData() {
.valueColumn(ColumnName.of("MESSAGE"), SqlTypes.STRING)
.build();

final Struct messageKey = StructKeyUtil.keyBuilder(messageSchema).build("1");
final Struct messageKey = StructKeyUtil.keyBuilder(messageSchema).build("1", 0);
final GenericRow messageRow = genericRow(
"{\"log\":{\"@timestamp\":\"2017-05-30T16:44:22.175Z\",\"@version\":\"1\","
+ "\"caasVersion\":\"0.0.2\",\"cloud\":\"aws\",\"logs\":[{\"entry\":\"first\"}],\"clusterId\":\"cp99\",\"clusterName\":\"kafka\",\"cpComponentId\":\"kafka\",\"host\":\"kafka-1-wwl0p\",\"k8sId\":\"k8s13\",\"k8sName\":\"perf\",\"level\":\"ERROR\",\"logger\":\"kafka.server.ReplicaFetcherThread\",\"message\":\"Found invalid messages during fetch for partition [foo512,172] offset 0 error Record is corrupt (stored crc = 1321230880, computed crc = 1139143803)\",\"networkId\":\"vpc-d8c7a9bf\",\"region\":\"us-west-2\",\"serverId\":\"1\",\"skuId\":\"sku5\",\"source\":\"kafka\",\"tenantId\":\"t47\",\"tenantName\":\"perf-test\",\"thread\":\"ReplicaFetcherThread-0-2\",\"zone\":\"us-west-2a\"},\"stream\":\"stdout\",\"time\":2017}"
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testJsonStreamExtractor() {

final Map<Struct, GenericRow> expectedResults = new HashMap<>();
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(ColumnName.of("ROWKEY"), SqlTypes.STRING);
expectedResults.put(keyBuilder.build("1"), genericRow("aws"));
expectedResults.put(keyBuilder.build("1", 0), genericRow("aws"));

final Map<Struct, GenericRow> results = readNormalResults(streamName, expectedResults.size());

Expand All @@ -203,7 +203,7 @@ public void testJsonStreamExtractorNested() {

final Map<Struct, GenericRow> expectedResults = new HashMap<>();
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(ColumnName.of("ROWKEY"), SqlTypes.STRING);
expectedResults.put(keyBuilder.build("1"), genericRow("first"));
expectedResults.put(keyBuilder.build("1", 0), genericRow("first"));

final Map<Struct, GenericRow> results = readNormalResults(streamName, expectedResults.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void shouldReplaceSimpleProject() {
outputTopic));

TEST_HARNESS.produceRows(inputTopic, new Provider("1", "A", 1), FormatFactory.KAFKA, FormatFactory.JSON);
assertForSource("PROJECT", outputTopic, ImmutableMap.of(Provider.KEY_BUILDER.build("1"), GenericRow.genericRow("A")));
assertForSource("PROJECT", outputTopic, ImmutableMap.of(Provider.KEY_BUILDER.build("1", 0), GenericRow.genericRow("A")));

// When:
ksqlContext.sql(
Expand All @@ -112,8 +112,8 @@ public void shouldReplaceSimpleProject() {

// Then:
final Map<Struct, GenericRow> expected = ImmutableMap.of(
Provider.KEY_BUILDER.build("1"), GenericRow.genericRow("A", null), // this row is leftover from the original query
Provider.KEY_BUILDER.build("2"), GenericRow.genericRow("B", 2) // this row is an artifact from the new query
Provider.KEY_BUILDER.build("1", 0), GenericRow.genericRow("A", null), // this row is leftover from the original query
Provider.KEY_BUILDER.build("2", 0), GenericRow.genericRow("B", 2) // this row is an artifact from the new query
);
assertForSource("PROJECT", outputTopic, expected);
}
Expand All @@ -128,7 +128,7 @@ public void shouldReplaceSimpleFilter() {
outputTopic));

TEST_HARNESS.produceRows(inputTopic, new Provider("1", "A", 1), FormatFactory.KAFKA, FormatFactory.JSON);
assertForSource("PROJECT", outputTopic, ImmutableMap.of(Provider.KEY_BUILDER.build("1"), GenericRow.genericRow("A")));
assertForSource("PROJECT", outputTopic, ImmutableMap.of(Provider.KEY_BUILDER.build("1", 0), GenericRow.genericRow("A")));

// When:
ksqlContext.sql(
Expand All @@ -139,8 +139,8 @@ public void shouldReplaceSimpleFilter() {

// Then:
final Map<Struct, GenericRow> expected = ImmutableMap.of(
Provider.KEY_BUILDER.build("1"), GenericRow.genericRow("A"), // this row is leftover from the original query
Provider.KEY_BUILDER.build("3"), GenericRow.genericRow("C") // this row is an artifact from the new query
Provider.KEY_BUILDER.build("1", 0), GenericRow.genericRow("A"), // this row is leftover from the original query
Provider.KEY_BUILDER.build("3", 0), GenericRow.genericRow("C") // this row is an artifact from the new query
);
assertForSource("PROJECT", outputTopic, expected);
}
Expand Down Expand Up @@ -186,7 +186,7 @@ private static Multimap<Struct, GenericRow> rows(
final String col1,
final Integer col2
) {
return ImmutableListMultimap.of(KEY_BUILDER.build(key), GenericRow.genericRow(col1, col2));
return ImmutableListMultimap.of(KEY_BUILDER.build(key, 0), GenericRow.genericRow(col1, col2));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class SelectValueMapperIntegrationTest {

private static final Struct NON_WINDOWED_KEY = StructKeyUtil
.keyBuilder(ColumnName.of("K"), SqlTypes.STRING)
.build("someKey");
.build("someKey", 0);

private final MetaStore metaStore = MetaStoreFixture
.getNewMetaStore(TestFunctionRegistry.INSTANCE.get());
Expand Down
Loading