Skip to content

Commit

Permalink
feat: support multi-column key declarations
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Oct 29, 2020
1 parent a635ad8 commit 20d2c78
Show file tree
Hide file tree
Showing 50 changed files with 736 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import io.confluent.ksql.execution.ddl.commands.KsqlTopic;
import io.confluent.ksql.execution.expression.tree.ColumnReferenceExp;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression;
import io.confluent.ksql.execution.expression.tree.ComparisonExpression.Type;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.FunctionCall;
import io.confluent.ksql.execution.expression.tree.LogicalBinaryExpression;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
import io.confluent.ksql.execution.expression.tree.TraversalExpressionVisitor;
import io.confluent.ksql.execution.windows.KsqlWindowExpression;
Expand Down Expand Up @@ -312,10 +314,21 @@ protected AstNode visitJoinedSource(final JoinedSource node, final Void context)
final JoinNode.JoinType joinType = getJoinType(node);

final JoinOn joinOn = (JoinOn) node.getCriteria();
final ComparisonExpression comparisonExpression = (ComparisonExpression) joinOn
.getExpression();
final Expression joinExp = joinOn.getExpression();
if (!(joinExp instanceof ComparisonExpression)) {
// add in a special check for multi-column joins so that we can throw
// an even more useful error message
if (joinExp instanceof LogicalBinaryExpression
&& isEqualityJoin(((LogicalBinaryExpression) joinExp).getLeft())
&& isEqualityJoin(((LogicalBinaryExpression) joinExp).getRight())) {
throw new KsqlException("JOINs on multiple conditions are not yet supported: " + joinExp);
}

throw new KsqlException("Unsupported join expression: " + joinExp);
}
final ComparisonExpression comparisonExpression = (ComparisonExpression) joinExp;

if (comparisonExpression.getType() != ComparisonExpression.Type.EQUAL) {
if (!(isEqualityJoin(joinExp))) {
throw new KsqlException("Only equality join criteria is supported.");
}

Expand Down Expand Up @@ -361,6 +374,11 @@ protected AstNode visitJoinedSource(final JoinedSource node, final Void context)
return null;
}

private boolean isEqualityJoin(final Expression exp) {
return exp instanceof ComparisonExpression
&& ((ComparisonExpression) exp).getType() == Type.EQUAL;
}

private void throwOnJoinWithoutSource(
final AliasedDataSource source,
final AliasedDataSource left,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.errorprone.annotations.Immutable;
import io.confluent.ksql.execution.builder.KsqlQueryBuilder;
import io.confluent.ksql.execution.context.QueryContext;
Expand All @@ -32,6 +31,7 @@
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.planner.Projection;
import io.confluent.ksql.planner.RequiredColumns;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.services.KafkaTopicClient;
Expand Down Expand Up @@ -133,13 +133,22 @@ void validateKeyPresent(final SourceName sinkName, final Projection projection)
return;
}

final ColumnName keyName = Iterables.getOnlyElement(getSchema().key()).name();

if (!projection.containsExpression(new QualifiedColumnReferenceExp(getAlias(), keyName))
&& !projection.containsExpression(new UnqualifiedColumnReferenceExp(keyName))
) {
throwKeysNotIncludedError(sinkName, "key column", ImmutableList.of(
(ColumnReferenceExp) new UnqualifiedColumnReferenceExp(keyName)));
final List<Column> keys = getSchema().key();

for (final Column keyCol : keys) {
final ColumnName keyName = keyCol.name();
if (!projection.containsExpression(new QualifiedColumnReferenceExp(getAlias(), keyName))
&& !projection.containsExpression(new UnqualifiedColumnReferenceExp(keyName))
) {
throwKeysNotIncludedError(
sinkName,
"key column",
keys.stream()
.map(Column::name)
.map(UnqualifiedColumnReferenceExp::new)
.collect(Collectors.toList())
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,23 @@
import io.confluent.ksql.function.udf.AsValue;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.Name;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.NodeLocation;
import io.confluent.ksql.parser.tree.SelectItem;
import io.confluent.ksql.planner.Projection;
import io.confluent.ksql.planner.RequiredColumns;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.LogicalSchema.Builder;
import io.confluent.ksql.schema.ksql.SystemColumns;
import io.confluent.ksql.util.GrammaticalJoiner;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -104,6 +108,7 @@ private Pair<LogicalSchema, List<SelectExpression>> build(final MetaStore metaSt

if (into.isPresent()) {
// Persistent queries have key columns as key columns - so final projection can exclude them:
final Map<ColumnName, Set<ColumnName>> seenKeyColumns = new HashMap<>();
selectExpressions.removeIf(se -> {
if (se.getExpression() instanceof UnqualifiedColumnReferenceExp) {
final ColumnName columnName = ((UnqualifiedColumnReferenceExp) se.getExpression())
Expand All @@ -114,10 +119,28 @@ private Pair<LogicalSchema, List<SelectExpression>> build(final MetaStore metaSt
return true;
}

return parentSchema.isKeyColumn(columnName);
if (parentSchema.isKeyColumn(columnName)) {
seenKeyColumns.computeIfAbsent(columnName, k -> new HashSet<>()).add(se.getAlias());
return true;
}
}
return false;
});

for (final Entry<ColumnName, Set<ColumnName>> seenKey : seenKeyColumns.entrySet()) {
if (seenKey.getValue().size() > 1) {
final String keys = GrammaticalJoiner.and().join(
seenKey.getValue().stream().map(Name::text).sorted());
throw new KsqlException("The projection contains a key column (" + seenKey.getKey()
+ ") more than once, aliased as: "
+ keys + "."
+ System.lineSeparator()
+ "Each key column must only be in the projection once. "
+ "If you intended to copy the key into the value, then consider using the "
+ AsValue.NAME + " function to indicate which key reference should be copied."
);
}
}
}

final LogicalSchema nodeSchema;
Expand All @@ -142,16 +165,6 @@ private Pair<LogicalSchema, List<SelectExpression>> build(final MetaStore metaSt
private void validate() {
final LogicalSchema schema = getSchema();

if (schema.key().size() > 1) {
final String keys = GrammaticalJoiner.and().join(schema.key().stream().map(Column::name));
throw new KsqlException("The projection contains the key column more than once: " + keys + "."
+ System.lineSeparator()
+ "Each key column must only be in the projection once. "
+ "If you intended to copy the key into the value, then consider using the "
+ AsValue.NAME + " function to indicate which key reference should be copied."
);
}

if (schema.value().isEmpty()) {
throw new KsqlException("The projection contains no value columns.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public Stream<ColumnName> resolveSelectStar(
return names;
}

// if we use a synthetic key, we know there's only a single key element
final Column syntheticKey = getOnlyElement(getSchema().key());

return Streams.concat(Stream.of(syntheticKey.name()), names);
Expand Down Expand Up @@ -246,8 +247,8 @@ protected Set<ColumnReferenceExp> validateColumns(
final RequiredColumns updated = noSyntheticKey
? requiredColumns
: requiredColumns.asBuilder()
.remove(new UnqualifiedColumnReferenceExp(getOnlyElement(schema.key()).name()))
.build();
.remove(new UnqualifiedColumnReferenceExp(getOnlyElement(schema.key()).name()))
.build();

final Set<ColumnReferenceExp> leftUnknown = left.validateColumns(updated);
final Set<ColumnReferenceExp> rightUnknown = right.validateColumns(updated);
Expand All @@ -256,6 +257,11 @@ protected Set<ColumnReferenceExp> validateColumns(
}

private ColumnName getKeyColumnName() {
if (getSchema().key().size() > 1) {
throw new KsqlException("JOINs are not supported with multiple key columns: "
+ getSchema().key());
}

return getOnlyElement(getSchema().key()).name();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public Expression getPartitionBy() {

@Override
public Expression resolveSelect(final int idx, final Expression expression) {
// after issuing a PARTITION BY, there will only be one key column because we
// do not support PARTITION BY multiple columns
return partitionBy.equals(expression)
? new UnqualifiedColumnReferenceExp(Iterables.getOnlyElement(getSchema().key()).name())
: expression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public SchemaKStream<Struct> selectKey(
return (SchemaKStream<Struct>) this;
}

if (schema.key().size() > 1) {
// let's throw a better error message in the case of multi-column tables
throw new UnsupportedOperationException("Cannot repartition a TABLE source. If this is "
+ "a join, joins on tables with multiple columns is not yet supported.");
}

throw new UnsupportedOperationException("Cannot repartition a TABLE source. "
+ "If this is a join, make sure that the criteria uses the TABLE's key column "
+ Iterables.getOnlyElement(schema.key()).name().text() + " instead of "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ public static boolean repartitionNeeded(
return true;
}

// this is technically covered by the check below because our syntax does not
// yet support PARTITION BY col1, col2 but we make this explicit for when we
// do end up supporting this (we'll have to change the logic here)
if (schema.key().size() != 1) {
throw new UnsupportedOperationException("logic only supports single key column");
return true;
}

if (partitionBy.size() != schema.key().size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,29 @@ public void shouldThrowOnSelfJoin() {
"Can not join 'TEST1' to 'TEST1': self joins are not yet supported."));
}

@Test
public void shouldThrowOnJoinConditionWithMultipleEqualityExpressions() {
// Given:
final CreateStreamAsSelect createStreamAsSelect = parseSingle(
"CREATE STREAM FOO AS "
+ "SELECT * FROM test1 t1 JOIN test2 t2 ON t1.col0 = t2.col0 AND t1.col0 = t2.col0;"
);

final Query query = createStreamAsSelect.getQuery();

final Analyzer analyzer = new Analyzer(jsonMetaStore, "");

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink()))
);

// Then:
assertThat(e.getMessage(), containsString("JOINs on multiple conditions are not yet supported: " +
"((T1.COL0 = T2.COL0) AND (T1.COL0 = T2.COL0))"));
}

@Test
public void shouldThrowOnNwayJoinWithDuplicateSource() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private void shouldLeftJoinOrderAndItems(final String testStreamName,
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(source.getSchema());

final Map<Struct, GenericRow> expectedResults = ImmutableMap.of(
keyBuilder.build("ITEM_1"),
keyBuilder.build("ITEM_1", 0),
genericRow("ORDER_1", 10.0, "home cinema")
);

Expand Down Expand Up @@ -153,7 +153,7 @@ public void shouldInsertLeftJoinOrderAndItems() {
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(source.getSchema());

final Map<Struct, GenericRow> expectedResults = ImmutableMap.of(
keyBuilder.build("ITEM_1"),
keyBuilder.build("ITEM_1", 0),
genericRow("ORDER_1", 10.0, "home cinema")
);

Expand Down Expand Up @@ -200,7 +200,7 @@ public void shouldUseTimeStampFieldFromStream() {
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(source.getSchema());

final Map<Struct, GenericRow> expectedResults = ImmutableMap.of(
keyBuilder.build("ITEM_1"),
keyBuilder.build("ITEM_1", 0),
genericRow("ORDER_1", "home cinema", 1L)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private static void produceInitData() {
.valueColumn(ColumnName.of("MESSAGE"), SqlTypes.STRING)
.build();

final Struct messageKey = StructKeyUtil.keyBuilder(messageSchema).build("1");
final Struct messageKey = StructKeyUtil.keyBuilder(messageSchema).build("1", 0);
final GenericRow messageRow = genericRow(
"{\"log\":{\"@timestamp\":\"2017-05-30T16:44:22.175Z\",\"@version\":\"1\","
+ "\"caasVersion\":\"0.0.2\",\"cloud\":\"aws\",\"logs\":[{\"entry\":\"first\"}],\"clusterId\":\"cp99\",\"clusterName\":\"kafka\",\"cpComponentId\":\"kafka\",\"host\":\"kafka-1-wwl0p\",\"k8sId\":\"k8s13\",\"k8sName\":\"perf\",\"level\":\"ERROR\",\"logger\":\"kafka.server.ReplicaFetcherThread\",\"message\":\"Found invalid messages during fetch for partition [foo512,172] offset 0 error Record is corrupt (stored crc = 1321230880, computed crc = 1139143803)\",\"networkId\":\"vpc-d8c7a9bf\",\"region\":\"us-west-2\",\"serverId\":\"1\",\"skuId\":\"sku5\",\"source\":\"kafka\",\"tenantId\":\"t47\",\"tenantName\":\"perf-test\",\"thread\":\"ReplicaFetcherThread-0-2\",\"zone\":\"us-west-2a\"},\"stream\":\"stdout\",\"time\":2017}"
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testJsonStreamExtractor() {

final Map<Struct, GenericRow> expectedResults = new HashMap<>();
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(ColumnName.of("ROWKEY"), SqlTypes.STRING);
expectedResults.put(keyBuilder.build("1"), genericRow("aws"));
expectedResults.put(keyBuilder.build("1", 0), genericRow("aws"));

final Map<Struct, GenericRow> results = readNormalResults(streamName, expectedResults.size());

Expand All @@ -203,7 +203,7 @@ public void testJsonStreamExtractorNested() {

final Map<Struct, GenericRow> expectedResults = new HashMap<>();
final KeyBuilder keyBuilder = StructKeyUtil.keyBuilder(ColumnName.of("ROWKEY"), SqlTypes.STRING);
expectedResults.put(keyBuilder.build("1"), genericRow("first"));
expectedResults.put(keyBuilder.build("1", 0), genericRow("first"));

final Map<Struct, GenericRow> results = readNormalResults(streamName, expectedResults.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void shouldReplaceSimpleProject() {
outputTopic));

TEST_HARNESS.produceRows(inputTopic, new Provider("1", "A", 1), FormatFactory.KAFKA, FormatFactory.JSON);
assertForSource("PROJECT", outputTopic, ImmutableMap.of(Provider.KEY_BUILDER.build("1"), GenericRow.genericRow("A")));
assertForSource("PROJECT", outputTopic, ImmutableMap.of(Provider.KEY_BUILDER.build("1", 0), GenericRow.genericRow("A")));

// When:
ksqlContext.sql(
Expand All @@ -112,8 +112,8 @@ public void shouldReplaceSimpleProject() {

// Then:
final Map<Struct, GenericRow> expected = ImmutableMap.of(
Provider.KEY_BUILDER.build("1"), GenericRow.genericRow("A", null), // this row is leftover from the original query
Provider.KEY_BUILDER.build("2"), GenericRow.genericRow("B", 2) // this row is an artifact from the new query
Provider.KEY_BUILDER.build("1", 0), GenericRow.genericRow("A", null), // this row is leftover from the original query
Provider.KEY_BUILDER.build("2", 0), GenericRow.genericRow("B", 2) // this row is an artifact from the new query
);
assertForSource("PROJECT", outputTopic, expected);
}
Expand All @@ -128,7 +128,7 @@ public void shouldReplaceSimpleFilter() {
outputTopic));

TEST_HARNESS.produceRows(inputTopic, new Provider("1", "A", 1), FormatFactory.KAFKA, FormatFactory.JSON);
assertForSource("PROJECT", outputTopic, ImmutableMap.of(Provider.KEY_BUILDER.build("1"), GenericRow.genericRow("A")));
assertForSource("PROJECT", outputTopic, ImmutableMap.of(Provider.KEY_BUILDER.build("1", 0), GenericRow.genericRow("A")));

// When:
ksqlContext.sql(
Expand All @@ -139,8 +139,8 @@ public void shouldReplaceSimpleFilter() {

// Then:
final Map<Struct, GenericRow> expected = ImmutableMap.of(
Provider.KEY_BUILDER.build("1"), GenericRow.genericRow("A"), // this row is leftover from the original query
Provider.KEY_BUILDER.build("3"), GenericRow.genericRow("C") // this row is an artifact from the new query
Provider.KEY_BUILDER.build("1", 0), GenericRow.genericRow("A"), // this row is leftover from the original query
Provider.KEY_BUILDER.build("3", 0), GenericRow.genericRow("C") // this row is an artifact from the new query
);
assertForSource("PROJECT", outputTopic, expected);
}
Expand Down Expand Up @@ -186,7 +186,7 @@ private static Multimap<Struct, GenericRow> rows(
final String col1,
final Integer col2
) {
return ImmutableListMultimap.of(KEY_BUILDER.build(key), GenericRow.genericRow(col1, col2));
return ImmutableListMultimap.of(KEY_BUILDER.build(key, 0), GenericRow.genericRow(col1, col2));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class SelectValueMapperIntegrationTest {

private static final Struct NON_WINDOWED_KEY = StructKeyUtil
.keyBuilder(ColumnName.of("K"), SqlTypes.STRING)
.build("someKey");
.build("someKey", 0);

private final MetaStore metaStore = MetaStoreFixture
.getNewMetaStore(TestFunctionRegistry.INSTANCE.get());
Expand Down
Loading

0 comments on commit 20d2c78

Please sign in to comment.