From 553363ac934a25a0ddabf3df2a149a09bdffaef8 Mon Sep 17 00:00:00 2001 From: Rohan Desai Date: Thu, 9 Aug 2018 12:05:58 -0700 Subject: [PATCH] Address review feedback and fix bug with ROWKEY --- .../confluent/ksql/planner/plan/JoinNode.java | 18 +++++-- .../ksql/planner/plan/JoinNodeTest.java | 50 +++++++++++++++---- .../query-validation-tests/joins.json | 23 +++++++++ 3 files changed, 77 insertions(+), 14 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java index 6953002f42ae..1c8f63f2c9bd 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/plan/JoinNode.java @@ -282,7 +282,9 @@ protected SchemaKStream buildStream(final PlanNode node, final String keyFieldNa } - protected SchemaKTable buildTable(final PlanNode node, final String keyFieldName) { + protected SchemaKTable buildTable(final PlanNode node, + final String keyFieldName, + final String tableName) { final Map joinTableProps = new HashMap<>(props); joinTableProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -299,10 +301,13 @@ protected SchemaKTable buildTable(final PlanNode node, final String keyFieldName } if (schemaKStream.getKeyField() != null + && !keyFieldName.equals(SchemaUtil.ROWKEY_NAME) && !SchemaUtil.matchFieldName(schemaKStream.getKeyField(), keyFieldName)) { throw new KsqlException( String.format( - "Source table key column (%s) is not the column used in the join criteria (%s).", + "Source table (%s) key column (%s) " + + "is not the column used in the join criteria (%s).", + tableName, schemaKStream.getKeyField().name(), keyFieldName ) @@ -426,7 +431,8 @@ public SchemaKStream join() { } final SchemaKTable rightTable = buildTable(joinNode.getRight(), - joinNode.getRightKeyFieldName()); + joinNode.getRightKeyFieldName(), + joinNode.getRightAlias()); final SchemaKStream leftStream = buildStream(joinNode.getLeft(), joinNode.getLeftKeyFieldName()); @@ -477,9 +483,11 @@ public SchemaKTable join() { } final SchemaKTable leftTable = buildTable(joinNode.getLeft(), - joinNode.getLeftKeyFieldName()); + joinNode.getLeftKeyFieldName(), + joinNode.getLeftAlias()); final SchemaKTable rightTable = buildTable(joinNode.getRight(), - joinNode.getRightKeyFieldName()); + joinNode.getRightKeyFieldName(), + joinNode.getRightAlias()); switch (joinNode.joinType) { case LEFT: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java index 62e936f94a57..5110a8dc7135 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.GenericRow; @@ -54,8 +55,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -483,6 +486,20 @@ public void shouldNotPerformJoinIfInputPartitionsMisMatch() { assertEquals(JoinNode.JoinType.OUTER, joinNode.getJoinType()); } + private static Optional getColumn(final Schema schema, final Predicate filter) { + return schema.fields().stream() + .map(Field::name) + .filter(filter::test) + .findFirst(); + } + + private static Optional getNonKeyColumn(final Schema schema, final String keyName) { + return getColumn( + schema, + s -> !ImmutableList.of(SchemaUtil.ROWKEY_NAME, SchemaUtil.ROWTIME_NAME, keyName).contains(s) + ); + } + @SuppressWarnings("unchecked") @Test public void shouldFailJoinIfTableCriteriaColumnIsNotKey() { @@ -491,12 +508,13 @@ public void shouldFailJoinIfTableCriteriaColumnIsNotKey() { expectKeyField(rightSchemaKTable, rightKeyFieldName); replay(left, right, leftSchemaKStream, rightSchemaKTable); + final String rightCriteriaColumn = getNonKeyColumn(rightSchema, rightKeyFieldName).get(); final JoinNode joinNode = new JoinNode(new PlanNodeId("join"), JoinNode.JoinType.LEFT, left, right, leftKeyFieldName, - "COL0", + rightCriteriaColumn, leftAlias, rightAlias, null, @@ -514,8 +532,12 @@ public void shouldFailJoinIfTableCriteriaColumnIsNotKey() { assertThat( e.getMessage(), equalTo( - "Source table key column (COL1) is not the column " + - "used in the join criteria (COL0).")); + String.format( + "Source table (s) key column (%s) is not the column " + + "used in the join criteria (%s).", + rightAlias, + rightKeyFieldName, + rightCriteriaColumn))); return; } fail("buildStream did not throw exception"); @@ -709,11 +731,12 @@ public void shouldFailTableTableJoinIfLeftCriteriaColumnIsNotKey() { setupTable(right, rightSchemaKTable, rightSchema, 2); replay(left, right, leftSchemaKTable, rightSchemaKTable); + final String leftCriteriaColumn = getNonKeyColumn(leftSchema, leftKeyFieldName).get(); final JoinNode joinNode = new JoinNode(new PlanNodeId("join"), JoinNode.JoinType.LEFT, left, right, - "COL1", + leftCriteriaColumn, rightKeyFieldName, leftAlias, rightAlias, @@ -732,8 +755,12 @@ public void shouldFailTableTableJoinIfLeftCriteriaColumnIsNotKey() { assertThat( e.getMessage(), equalTo( - "Source table key column (COL0) is not the column " + - "used in the join criteria (COL1).")); + String.format( + "Source table (s) key column (%s) is not the column " + + "used in the join criteria (%s).", + leftAlias, + leftKeyFieldName, + leftCriteriaColumn))); return; } fail("buildStream did not throw exception"); @@ -748,12 +775,13 @@ public void shouldFailTableTableJoinIfRightCriteriaColumnIsNotKey() { expectKeyField(rightSchemaKTable, rightKeyFieldName); replay(left, right, leftSchemaKTable, rightSchemaKTable); + final String rightCriteriaColumn = getNonKeyColumn(rightSchema, rightKeyFieldName).get(); final JoinNode joinNode = new JoinNode(new PlanNodeId("join"), JoinNode.JoinType.LEFT, left, right, leftKeyFieldName, - "COL0", + rightCriteriaColumn, leftAlias, rightAlias, null, @@ -771,8 +799,12 @@ public void shouldFailTableTableJoinIfRightCriteriaColumnIsNotKey() { assertThat( e.getMessage(), equalTo( - "Source table key column (COL1) is not the column " + - "used in the join criteria (COL0).")); + String.format( + "Source table (s) key column (%s) is not the column " + + "used in the join criteria (%s).", + rightAlias, + rightKeyFieldName, + rightCriteriaColumn))); return; } fail("buildStream did not throw exception"); diff --git a/ksql-engine/src/test/resources/query-validation-tests/joins.json b/ksql-engine/src/test/resources/query-validation-tests/joins.json index e863855cb697..25855fbc4199 100644 --- a/ksql-engine/src/test/resources/query-validation-tests/joins.json +++ b/ksql-engine/src/test/resources/query-validation-tests/joins.json @@ -313,6 +313,29 @@ {"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "a", "F2": 10}, "timestamp": 15000} ] }, + { + "name": "join using ROWKEY in the criteria", + "format": ["AVRO", "JSON"], + "statements": [ + "CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');", + "CREATE TABLE TEST_TABLE (ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='test_table', value_format='{FORMAT}', key='ID');", + "CREATE STREAM INNER_JOIN as SELECT t.id, name, value, f1, f2 FROM test t join test_table tt on t.ROWKEY = tt.ROWKEY;" + ], + "inputs": [ + {"topic": "test_table", "key": 0, "value": {"ID": 0, "F1": "zero", "F2": 0}, "timestamp": 0}, + {"topic": "test_table", "key": 10, "value": {"ID": 10, "F1": "100", "F2": 5}, "timestamp": 10000}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "blah", "VALUE": 50}, "timestamp": 10000}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 10000}, + {"topic": "test_table", "key": 0, "value": {"ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000}, + {"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 15000}, + {"topic": "test_topic", "key": 90, "value": {"ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 15000} + ], + "outputs": [ + {"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "blah", "VALUE": 50, "F1": "zero", "F2": 0}, "timestamp": 10000}, + {"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "zero", "F2": 0}, "timestamp": 10000}, + {"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "a", "F2": 10}, "timestamp": 15000} + ] + }, { "name": "multi-way join", "format": ["JSON"],