Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for using primitive types in joins. #4132

Merged
merged 4 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public DataSource<?> getDataSource() {
return dataSource;
}

SourceName getAlias() {
public SourceName getAlias() {
return alias;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@
import io.confluent.ksql.execution.streams.JoinParamsFactory;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.metastore.model.KeyField;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.WithinExpression;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.Column.Namespace;
import io.confluent.ksql.schema.ksql.ColumnRef;
import io.confluent.ksql.schema.ksql.FormatOptions;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.SqlBaseType;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.structured.SchemaKStream;
Expand Down Expand Up @@ -93,11 +92,7 @@ public JoinNode(
? left.getKeyField()
: KeyField.of(leftKeyCol.ref());

this.schema = JoinParamsFactory.createSchema(left.getSchema(), right.getSchema());

if (schema.key().get(0).type().baseType() != SqlBaseType.STRING) {
throw new KsqlException("JOIN is not supported with non-STRING keys");
}
this.schema = buildJoinSchema(left, leftJoinFieldName, right, rightJoinFieldName);
}

@Override
Expand Down Expand Up @@ -237,11 +232,7 @@ SchemaKStream<K> buildStream(
}

@SuppressWarnings("unchecked")
SchemaKTable<K> buildTable(
final PlanNode node,
final ColumnRef joinFieldName,
final SourceName tableName
) {
SchemaKTable<K> buildTable(final PlanNode node) {
final SchemaKStream<?> schemaKStream = node.buildStream(
builder.withKsqlConfig(builder.getKsqlConfig()
.cloneWithPropertyOverwrite(Collections.singletonMap(
Expand All @@ -252,37 +243,7 @@ SchemaKTable<K> buildTable(
throw new RuntimeException("Expected to find a Table, found a stream instead.");
}

final Optional<Column> keyColumn = schemaKStream
.getKeyField()
.resolve(schemaKStream.getSchema());

final ColumnRef rowKey = ColumnRef.of(
tableName,
SchemaUtil.ROWKEY_NAME
);

final boolean namesMatch = keyColumn
.map(field -> field.ref().equals(joinFieldName))
.orElse(false);

if (namesMatch || joinFieldName.equals(rowKey)) {
return (SchemaKTable) schemaKStream;
}

if (!keyColumn.isPresent()) {
throw new KsqlException(
"Source table (" + tableName.name() + ") has no key column defined. "
+ "Only 'ROWKEY' is supported in the join criteria."
);
}

throw new KsqlException(
"Source table (" + tableName.toString(FormatOptions.noEscape()) + ") key column ("
+ keyColumn.get().ref().toString(FormatOptions.noEscape()) + ") "
+ "is not the column used in the join criteria ("
+ joinFieldName.toString(FormatOptions.noEscape()) + "). "
+ "Only the table's key column or 'ROWKEY' is supported in the join criteria."
);
Comment on lines -255 to -285
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks are now done from the constructor.

return (SchemaKTable<K>) schemaKStream;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -378,8 +339,7 @@ public SchemaKStream<K> join() {
+ " the WITHIN clause) and try to execute your join again.");
}

final SchemaKTable<K> rightTable = buildTable(
joinNode.getRight(), joinNode.rightJoinFieldName, joinNode.right.getAlias());
final SchemaKTable<K> rightTable = buildTable(joinNode.getRight());

final SchemaKStream<K> leftStream = buildStream(
joinNode.getLeft(), joinNode.leftJoinFieldName);
Expand Down Expand Up @@ -428,10 +388,8 @@ public SchemaKTable<K> join() {
+ "join again.");
}

final SchemaKTable<K> leftTable = buildTable(
joinNode.getLeft(), joinNode.leftJoinFieldName, joinNode.left.getAlias());
final SchemaKTable<K> rightTable = buildTable(
joinNode.getRight(), joinNode.rightJoinFieldName, joinNode.right.getAlias());
final SchemaKTable<K> leftTable = buildTable(joinNode.getLeft());
final SchemaKTable<K> rightTable = buildTable(joinNode.getRight());

switch (joinNode.joinType) {
case LEFT:
Expand Down Expand Up @@ -465,4 +423,80 @@ private static DataSourceType calculateSinkType(
? DataSourceType.KTABLE
: DataSourceType.KSTREAM;
}

private static LogicalSchema buildJoinSchema(
final DataSourceNode left,
final ColumnRef leftJoinFieldName,
final DataSourceNode right,
final ColumnRef rightJoinFieldName
) {
final LogicalSchema leftSchema = selectKey(left, leftJoinFieldName);
final LogicalSchema rightSchema = selectKey(right, rightJoinFieldName);

return JoinParamsFactory.createSchema(leftSchema, rightSchema);
}

/**
* Adjust the schema to take into account any change in key columns.
*
* @param source the source node
* @param joinColumnRef the join column
* @return the true source schema after any change of key columns.
*/
private static LogicalSchema selectKey(
final DataSourceNode source,
final ColumnRef joinColumnRef
) {
final LogicalSchema sourceSchema = source.getSchema();

final Column joinCol = sourceSchema.findColumn(joinColumnRef)
.orElseThrow(() -> new KsqlException("Unknown join column: " + joinColumnRef));

if (sourceSchema.key().size() != 1) {
throw new UnsupportedOperationException("Only single key columns supported");
}

if (joinCol.namespace() == Namespace.KEY) {
// Join column is only key column, so no change of key columns required:
return sourceSchema;
}

final Optional<Column> keyColumn = source
.getKeyField()
.resolve(sourceSchema);

if (keyColumn.isPresent() && keyColumn.get().equals(joinCol)) {
// Join column is KEY field, which is an alias for the only key column, so no change of key
// columns required:
return sourceSchema;
}

// Change of key columns required

if (source.getDataSourceType() == DataSourceType.KTABLE) {
// Tables do not support rekey:
final String sourceName = source.getDataSource().getName().toString(FormatOptions.noEscape());

if (!keyColumn.isPresent()) {
throw new KsqlException(
"Invalid join criteria: Source table (" + sourceName + ") has no key column "
+ "defined. Only 'ROWKEY' is supported in the join criteria."
big-andy-coates marked this conversation as resolved.
Show resolved Hide resolved
);
}

throw new KsqlException(
"Invalid join criteria: Source table "
+ "(" + sourceName + ") key column "
+ "(" + keyColumn.get().ref().toString(FormatOptions.noEscape()) + ") "
+ "is not the column used in the join criteria ("
+ joinCol.ref().toString(FormatOptions.noEscape()) + "). "
+ "Only the table's key column or 'ROWKEY' is supported in the join criteria."
);
}

return LogicalSchema.builder()
.keyColumn(source.getAlias(), SchemaUtil.ROWKEY_NAME, joinCol.type())
.valueColumns(sourceSchema.value())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ public class PhysicalPlanBuilderTest {
+ "WITH (KAFKA_TOPIC = 'test1', VALUE_FORMAT = 'JSON');";

private static final String CREATE_STREAM_TEST2 = "CREATE STREAM TEST2 "
+ "(ID BIGINT, COL0 VARCHAR, COL1 DOUBLE) "
+ "(ROWKEY BIGINT KEY, ID BIGINT, COL0 VARCHAR, COL1 BIGINT) "
+ " WITH (KAFKA_TOPIC = 'test2', VALUE_FORMAT = 'JSON', KEY='ID');";

private static final String CREATE_STREAM_TEST3 = "CREATE STREAM TEST3 "
+ "(ID BIGINT, COL0 VARCHAR, COL1 DOUBLE) "
+ "(ROWKEY BIGINT KEY, ID BIGINT, COL0 BIGINT, COL1 DOUBLE) "
+ " WITH (KAFKA_TOPIC = 'test3', VALUE_FORMAT = 'JSON', KEY='ID');";

private static final String CREATE_TABLE_TEST4 = "CREATE TABLE TEST4 "
+ "(ID BIGINT, COL0 VARCHAR, COL1 DOUBLE) "
+ "(ROWKEY BIGINT KEY, ID BIGINT, COL0 VARCHAR, COL1 DOUBLE) "
+ " WITH (KAFKA_TOPIC = 'test4', VALUE_FORMAT = 'JSON', KEY='ID');";

private static final String CREATE_TABLE_TEST5 = "CREATE TABLE TEST5 "
+ "(ID BIGINT, COL0 VARCHAR, COL1 DOUBLE) "
+ "(ROWKEY BIGINT KEY, ID BIGINT, COL0 VARCHAR, COL1 DOUBLE) "
+ " WITH (KAFKA_TOPIC = 'test5', VALUE_FORMAT = 'JSON', KEY='ID');";

private static final String CREATE_STREAM_TEST6 = "CREATE STREAM TEST6 "
Expand Down Expand Up @@ -316,7 +316,7 @@ public void shouldRepartitionLeftStreamIfNotCorrectKey() {
.get(0);

// Then:
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [ROWKEY DOUBLE KEY, TEST2."));
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [ROWKEY BIGINT KEY, TEST2."));
}

@Test
Expand All @@ -332,7 +332,7 @@ public void shouldRepartitionRightStreamIfNotCorrectKey() {
.get(0);

// Then:
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [ROWKEY STRING KEY, TEST3."));
assertThat(result.getExecutionPlan(), containsString("[ REKEY ] | Schema: [ROWKEY BIGINT KEY, TEST3."));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testSimpleQueryLogicalPlan() {

@Test
public void testSimpleLeftJoinLogicalPlan() {
final String simpleQuery = "SELECT t1.col1, t2.col1, t1.col4, t2.col2 FROM test1 t1 LEFT JOIN test2 t2 ON t1.col1 = t2.col1 EMIT CHANGES;";
final String simpleQuery = "SELECT t1.col1, t2.col1, t1.col4, t2.col2 FROM test1 t1 LEFT JOIN test2 t2 ON t1.col0 = t2.col0 EMIT CHANGES;";
final PlanNode logicalPlan = buildLogicalPlan(simpleQuery);

assertThat(logicalPlan.getSources().get(0), instanceOf(ProjectNode.class));
Expand All @@ -106,13 +106,13 @@ public void testSimpleLeftJoinFilterLogicalPlan() {
final String
simpleQuery =
"SELECT t1.col1, t2.col1, col5, t2.col4, t2.col2 FROM test1 t1 LEFT JOIN test2 t2 ON "
+ "t1.col1 = t2.col1 WHERE t1.col1 > 10 AND t2.col4 = 10.8 EMIT CHANGES;";
+ "t1.col0 = t2.col0 WHERE t1.col1 > 10 AND t2.col4 = 10.8 EMIT CHANGES;";
final PlanNode logicalPlan = buildLogicalPlan(simpleQuery);

assertThat(logicalPlan.getSources().get(0), instanceOf(ProjectNode.class));
final ProjectNode projectNode = (ProjectNode) logicalPlan.getSources().get(0);

assertThat(projectNode.getKeyField().ref(), is(Optional.of(ColumnRef.withoutSource(ColumnName.of("T1_COL1")))));
assertThat(projectNode.getKeyField().ref(), is(Optional.empty()));
assertThat(projectNode.getSchema().value().size(), equalTo(5));

assertThat(projectNode.getSources().get(0), instanceOf(FilterNode.class));
Expand Down Expand Up @@ -183,7 +183,7 @@ public void shouldCreateStreamOutputForStreamTableJoin() {
final String
simpleQuery =
"SELECT t1.col1, t2.col1, col5, t2.col4, t2.col2 FROM test1 t1 LEFT JOIN test2 t2 ON "
+ "t1.col1 = t2.col1 WHERE t1.col1 > 10 AND t2.col4 = 10.8 EMIT CHANGES;";
+ "t1.col0 = t2.col0 WHERE t1.col1 > 10 AND t2.col4 = 10.8 EMIT CHANGES;";
final PlanNode logicalPlan = buildLogicalPlan(simpleQuery);
assertThat(logicalPlan.getNodeOutputType(), equalTo(DataSourceType.KSTREAM));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void shouldExtractCorrectSourceForSimpleQuery() {
public void shouldExtractCorrectSourceForJoinQuery() {
final PlanNode planNode = buildLogicalPlan(
"SELECT t1.col1, t2.col1, t1.col4, t2.col2 FROM test1 t1 LEFT JOIN "
+ "test2 t2 ON t1.col1 = t2.col1 EMIT CHANGES;");
+ "test2 t2 ON t1.col0 = t2.col0 EMIT CHANGES;");
final PlanSourceExtractorVisitor planSourceExtractorVisitor = new PlanSourceExtractorVisitor();
planSourceExtractorVisitor.process(planNode, null);
final Set<SourceName> sourceNames = planSourceExtractorVisitor.getSourceNames();
Expand Down
Loading