Skip to content

Commit

Permalink
Address review feedback and fix bug with ROWKEY
Browse files Browse the repository at this point in the history
  • Loading branch information
rodesai committed Aug 9, 2018
1 parent 0c28e70 commit 553363a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ protected SchemaKStream buildStream(final PlanNode node, final String keyFieldNa
}


protected SchemaKTable buildTable(final PlanNode node, final String keyFieldName) {
protected SchemaKTable buildTable(final PlanNode node,
final String keyFieldName,
final String tableName) {

final Map<String, Object> joinTableProps = new HashMap<>(props);
joinTableProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand All @@ -299,10 +301,13 @@ protected SchemaKTable buildTable(final PlanNode node, final String keyFieldName
}

if (schemaKStream.getKeyField() != null
&& !keyFieldName.equals(SchemaUtil.ROWKEY_NAME)
&& !SchemaUtil.matchFieldName(schemaKStream.getKeyField(), keyFieldName)) {
throw new KsqlException(
String.format(
"Source table key column (%s) is not the column used in the join criteria (%s).",
"Source table (%s) key column (%s) " +
"is not the column used in the join criteria (%s).",
tableName,
schemaKStream.getKeyField().name(),
keyFieldName
)
Expand Down Expand Up @@ -426,7 +431,8 @@ public SchemaKStream join() {
}

final SchemaKTable rightTable = buildTable(joinNode.getRight(),
joinNode.getRightKeyFieldName());
joinNode.getRightKeyFieldName(),
joinNode.getRightAlias());
final SchemaKStream leftStream = buildStream(joinNode.getLeft(),
joinNode.getLeftKeyFieldName());

Expand Down Expand Up @@ -477,9 +483,11 @@ public SchemaKTable join() {
}

final SchemaKTable leftTable = buildTable(joinNode.getLeft(),
joinNode.getLeftKeyFieldName());
joinNode.getLeftKeyFieldName(),
joinNode.getLeftAlias());
final SchemaKTable rightTable = buildTable(joinNode.getRight(),
joinNode.getRightKeyFieldName());
joinNode.getRightKeyFieldName(),
joinNode.getRightAlias());

switch (joinNode.joinType) {
case LEFT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
Expand All @@ -54,8 +55,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -483,6 +486,20 @@ public void shouldNotPerformJoinIfInputPartitionsMisMatch() {
assertEquals(JoinNode.JoinType.OUTER, joinNode.getJoinType());
}

private static Optional<String> getColumn(final Schema schema, final Predicate<String> filter) {
return schema.fields().stream()
.map(Field::name)
.filter(filter::test)
.findFirst();
}

private static Optional<String> getNonKeyColumn(final Schema schema, final String keyName) {
return getColumn(
schema,
s -> !ImmutableList.of(SchemaUtil.ROWKEY_NAME, SchemaUtil.ROWTIME_NAME, keyName).contains(s)
);
}

@SuppressWarnings("unchecked")
@Test
public void shouldFailJoinIfTableCriteriaColumnIsNotKey() {
Expand All @@ -491,12 +508,13 @@ public void shouldFailJoinIfTableCriteriaColumnIsNotKey() {
expectKeyField(rightSchemaKTable, rightKeyFieldName);
replay(left, right, leftSchemaKStream, rightSchemaKTable);

final String rightCriteriaColumn = getNonKeyColumn(rightSchema, rightKeyFieldName).get();
final JoinNode joinNode = new JoinNode(new PlanNodeId("join"),
JoinNode.JoinType.LEFT,
left,
right,
leftKeyFieldName,
"COL0",
rightCriteriaColumn,
leftAlias,
rightAlias,
null,
Expand All @@ -514,8 +532,12 @@ public void shouldFailJoinIfTableCriteriaColumnIsNotKey() {
assertThat(
e.getMessage(),
equalTo(
"Source table key column (COL1) is not the column " +
"used in the join criteria (COL0)."));
String.format(
"Source table (s) key column (%s) is not the column " +
"used in the join criteria (%s).",
rightAlias,
rightKeyFieldName,
rightCriteriaColumn)));
return;
}
fail("buildStream did not throw exception");
Expand Down Expand Up @@ -709,11 +731,12 @@ public void shouldFailTableTableJoinIfLeftCriteriaColumnIsNotKey() {
setupTable(right, rightSchemaKTable, rightSchema, 2);
replay(left, right, leftSchemaKTable, rightSchemaKTable);

final String leftCriteriaColumn = getNonKeyColumn(leftSchema, leftKeyFieldName).get();
final JoinNode joinNode = new JoinNode(new PlanNodeId("join"),
JoinNode.JoinType.LEFT,
left,
right,
"COL1",
leftCriteriaColumn,
rightKeyFieldName,
leftAlias,
rightAlias,
Expand All @@ -732,8 +755,12 @@ public void shouldFailTableTableJoinIfLeftCriteriaColumnIsNotKey() {
assertThat(
e.getMessage(),
equalTo(
"Source table key column (COL0) is not the column " +
"used in the join criteria (COL1)."));
String.format(
"Source table (s) key column (%s) is not the column " +
"used in the join criteria (%s).",
leftAlias,
leftKeyFieldName,
leftCriteriaColumn)));
return;
}
fail("buildStream did not throw exception");
Expand All @@ -748,12 +775,13 @@ public void shouldFailTableTableJoinIfRightCriteriaColumnIsNotKey() {
expectKeyField(rightSchemaKTable, rightKeyFieldName);
replay(left, right, leftSchemaKTable, rightSchemaKTable);

final String rightCriteriaColumn = getNonKeyColumn(rightSchema, rightKeyFieldName).get();
final JoinNode joinNode = new JoinNode(new PlanNodeId("join"),
JoinNode.JoinType.LEFT,
left,
right,
leftKeyFieldName,
"COL0",
rightCriteriaColumn,
leftAlias,
rightAlias,
null,
Expand All @@ -771,8 +799,12 @@ public void shouldFailTableTableJoinIfRightCriteriaColumnIsNotKey() {
assertThat(
e.getMessage(),
equalTo(
"Source table key column (COL1) is not the column " +
"used in the join criteria (COL0)."));
String.format(
"Source table (s) key column (%s) is not the column " +
"used in the join criteria (%s).",
rightAlias,
rightKeyFieldName,
rightCriteriaColumn)));
return;
}
fail("buildStream did not throw exception");
Expand Down
23 changes: 23 additions & 0 deletions ksql-engine/src/test/resources/query-validation-tests/joins.json
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,29 @@
{"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "a", "F2": 10}, "timestamp": 15000}
]
},
{
"name": "join using ROWKEY in the criteria",
"format": ["AVRO", "JSON"],
"statements": [
"CREATE STREAM TEST (ID bigint, NAME varchar, VALUE bigint) WITH (kafka_topic='test_topic', value_format='{FORMAT}', key='ID');",
"CREATE TABLE TEST_TABLE (ID bigint, F1 varchar, F2 bigint) WITH (kafka_topic='test_table', value_format='{FORMAT}', key='ID');",
"CREATE STREAM INNER_JOIN as SELECT t.id, name, value, f1, f2 FROM test t join test_table tt on t.ROWKEY = tt.ROWKEY;"
],
"inputs": [
{"topic": "test_table", "key": 0, "value": {"ID": 0, "F1": "zero", "F2": 0}, "timestamp": 0},
{"topic": "test_table", "key": 10, "value": {"ID": 10, "F1": "100", "F2": 5}, "timestamp": 10000},
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "blah", "VALUE": 50}, "timestamp": 10000},
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "foo", "VALUE": 100}, "timestamp": 10000},
{"topic": "test_table", "key": 0, "value": {"ID": 0, "F1": "a", "F2": 10}, "timestamp": 15000},
{"topic": "test_topic", "key": 0, "value": {"ID": 0, "NAME": "bar", "VALUE": 99}, "timestamp": 15000},
{"topic": "test_topic", "key": 90, "value": {"ID": 90, "NAME": "ninety", "VALUE": 90}, "timestamp": 15000}
],
"outputs": [
{"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "blah", "VALUE": 50, "F1": "zero", "F2": 0}, "timestamp": 10000},
{"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "foo", "VALUE": 100, "F1": "zero", "F2": 0}, "timestamp": 10000},
{"topic": "INNER_JOIN", "key": 0, "value": {"T_ID": 0, "NAME": "bar", "VALUE": 99, "F1": "a", "F2": 10}, "timestamp": 15000}
]
},
{
"name": "multi-way join",
"format": ["JSON"],
Expand Down

0 comments on commit 553363a

Please sign in to comment.