Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify that table joins use the key in the join criterion #1718

Merged
merged 5 commits into from
Aug 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ from the left stream that was used in the join criteria. This column will be
registered as the key of the resulting stream if included in the selected
columns.

For stream-table joins, the column used in the join criteria for the table
must be the table key.

The WITH clause for the result supports the following properties:

+---------------+------------------------------------------------------------------------------------------------------+
Expand Down Expand Up @@ -413,6 +416,9 @@ from the left table that was used in the join criteria. This column will be
registered as the key of the resulting table if included in the selected
columns.

For joins, the columns used in the join criteria must be the keys of the
tables being joined.

The WITH clause supports the following properties:

+---------------+------------------------------------------------------------------------------------------------------+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,26 +217,25 @@ private void analyzeExpressions() {

@Override
protected Node visitJoin(final Join node, final AnalysisContext context) {
AliasedRelation left = (AliasedRelation) process(node.getLeft(), context);
AliasedRelation right = (AliasedRelation) process(node.getRight(), context);
final AliasedRelation left = (AliasedRelation) process(node.getLeft(), context);
final AliasedRelation right = (AliasedRelation) process(node.getRight(), context);

String leftSideName = ((Table) left.getRelation()).getName().getSuffix();
StructuredDataSource leftDataSource = metaStore.getSource(leftSideName);
final String leftSideName = ((Table) left.getRelation()).getName().getSuffix();
final StructuredDataSource leftDataSource = metaStore.getSource(leftSideName);
if (leftDataSource == null) {
throw new KsqlException(format("Resource %s does not exist.", leftSideName));
}

String rightSideName = ((Table) right.getRelation()).getName().getSuffix();
StructuredDataSource rightDataSource = metaStore.getSource(rightSideName);
final String rightSideName = ((Table) right.getRelation()).getName().getSuffix();
final StructuredDataSource rightDataSource = metaStore.getSource(rightSideName);
if (rightDataSource == null) {
throw new KsqlException(format("Resource %s does not exist.", rightSideName));
}

final String leftAlias = left.getAlias();
final String rightAlias = right.getAlias();

String leftAlias = left.getAlias();
String rightAlias = right.getAlias();

JoinNode.JoinType joinType = getJoinType(node);
final JoinNode.JoinType joinType = getJoinType(node);

if (!node.getCriteria().isPresent()) {
throw new KsqlException(String.format(
Expand All @@ -246,43 +245,43 @@ protected Node visitJoin(final Join node, final AnalysisContext context) {
: ""
));
}
JoinOn joinOn = (JoinOn) (node.getCriteria().get());
ComparisonExpression comparisonExpression = (ComparisonExpression) joinOn.getExpression();
final JoinOn joinOn = (JoinOn) (node.getCriteria().get());
final ComparisonExpression comparisonExpression = (ComparisonExpression) joinOn.getExpression();

Pair<String, String> leftSide = fetchKeyFieldName(
final Pair<String, String> leftSide = fetchKeyFieldName(
comparisonExpression,
leftAlias,
leftDataSource.getSchema()
);
Pair<String, String> rightSide = fetchKeyFieldName(
final Pair<String, String> rightSide = fetchKeyFieldName(
comparisonExpression,
rightAlias,
rightDataSource.getSchema()
);

String leftKeyFieldName = leftSide.getRight();
String rightKeyFieldName = rightSide.getRight();
final String leftKeyFieldName = leftSide.getRight();
final String rightKeyFieldName = rightSide.getRight();

if (comparisonExpression.getType() != ComparisonExpression.Type.EQUAL) {
throw new KsqlException("Only equality join criteria is supported.");
}

StructuredDataSourceNode
final StructuredDataSourceNode
leftSourceKafkaTopicNode =
new StructuredDataSourceNode(
new PlanNodeId("KafkaTopic_Left"),
leftDataSource,
leftDataSource.getSchema()
);
StructuredDataSourceNode
final StructuredDataSourceNode
rightSourceKafkaTopicNode =
new StructuredDataSourceNode(
new PlanNodeId("KafkaTopic_Right"),
rightDataSource,
rightDataSource.getSchema()
);

JoinNode joinNode =
final JoinNode joinNode =
new JoinNode(
new PlanNodeId("Join"),
joinType,
Expand Down Expand Up @@ -320,21 +319,21 @@ private JoinNode.JoinType getJoinType(Join node) {
}

/**
* From the join criteria expression fetch the key field corresponding to the given source
* From the join criteria expression fetch the field corresponding to the given source
* alias.
*/
private Pair<String, String> fetchKeyFieldName(
ComparisonExpression comparisonExpression,
String sourceAlias,
Schema sourceSchema
) {
Pair<String, String> keyInfo = fetchKeyFieldNameFromExpr(
Pair<String, String> keyInfo = fetchFieldNameFromExpr(
comparisonExpression.getLeft(),
sourceAlias,
sourceSchema
);
if (keyInfo == null) {
keyInfo = fetchKeyFieldNameFromExpr(
keyInfo = fetchFieldNameFromExpr(
comparisonExpression.getRight(),
sourceAlias,
sourceSchema
Expand All @@ -343,7 +342,7 @@ private Pair<String, String> fetchKeyFieldName(
if (keyInfo == null) {
throw new KsqlException(
String.format(
"%s : Invalid join criteria %s. Key for %s is not set correctly. ",
"%s : Invalid join criteria %s. Could not find a join criteria operand for %s. ",
comparisonExpression.getLocation().isPresent()
? comparisonExpression.getLocation().get().toString()
: "", comparisonExpression, sourceAlias
Expand All @@ -358,7 +357,7 @@ private Pair<String, String> fetchKeyFieldName(
* DereferenceExpression
* or QualifiedNameReference and if the variable prefix matches the source Alias.
*/
private Pair<String, String> fetchKeyFieldNameFromExpr(
private Pair<String, String> fetchFieldNameFromExpr(
Expression expression, String sourceAlias,
Schema sourceSchema
) {
Expand All @@ -367,17 +366,17 @@ private Pair<String, String> fetchKeyFieldNameFromExpr(
(DereferenceExpression) expression;
String sourceAliasVal = dereferenceExpression.getBase().toString();
if (sourceAliasVal.equalsIgnoreCase(sourceAlias)) {
String keyFieldName = dereferenceExpression.getFieldName();
if (SchemaUtil.getFieldByName(sourceSchema, keyFieldName).isPresent()) {
return new Pair<>(sourceAliasVal, keyFieldName);
String fieldName = dereferenceExpression.getFieldName();
if (SchemaUtil.getFieldByName(sourceSchema, fieldName).isPresent()) {
return new Pair<>(sourceAliasVal, fieldName);
}
}
} else if (expression instanceof QualifiedNameReference) {
QualifiedNameReference qualifiedNameReference =
(QualifiedNameReference) expression;
String keyFieldName = qualifiedNameReference.getName().getSuffix();
if (SchemaUtil.getFieldByName(sourceSchema, keyFieldName).isPresent()) {
return new Pair<>(sourceAlias, keyFieldName);
String fieldName = qualifiedNameReference.getName().getSuffix();
if (SchemaUtil.getFieldByName(sourceSchema, fieldName).isPresent()) {
return new Pair<>(sourceAlias, fieldName);
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,18 @@ public JoinNode(@JsonProperty("id") final PlanNodeId id,

private Schema buildSchema(final PlanNode left, final PlanNode right) {

Schema leftSchema = left.getSchema();
Schema rightSchema = right.getSchema();
final Schema leftSchema = left.getSchema();
final Schema rightSchema = right.getSchema();

SchemaBuilder schemaBuilder = SchemaBuilder.struct();
final SchemaBuilder schemaBuilder = SchemaBuilder.struct();

for (Field field : leftSchema.fields()) {
String fieldName = leftAlias + "." + field.name();
for (final Field field : leftSchema.fields()) {
final String fieldName = leftAlias + "." + field.name();
schemaBuilder.field(fieldName, field.schema());
}

for (Field field : rightSchema.fields()) {
String fieldName = rightAlias + "." + field.name();
for (final Field field : rightSchema.fields()) {
final String fieldName = rightAlias + "." + field.name();
schemaBuilder.field(fieldName, field.schema());
}
return schemaBuilder.build();
Expand Down Expand Up @@ -282,7 +282,9 @@ protected SchemaKStream buildStream(final PlanNode node, final String keyFieldNa
}


protected SchemaKTable buildTable(final PlanNode node) {
protected SchemaKTable buildTable(final PlanNode node,
final String keyFieldName,
final String tableName) {

final Map<String, Object> joinTableProps = new HashMap<>(props);
joinTableProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Expand All @@ -298,6 +300,20 @@ protected SchemaKTable buildTable(final PlanNode node) {
throw new RuntimeException("Expected to find a Table, found a stream instead.");
}

if (schemaKStream.getKeyField() != null
&& !keyFieldName.equals(SchemaUtil.ROWKEY_NAME)
&& !SchemaUtil.matchFieldName(schemaKStream.getKeyField(), keyFieldName)) {
throw new KsqlException(
String.format(
"Source table (%s) key column (%s) "
+ "is not the column used in the join criteria (%s).",
tableName,
schemaKStream.getKeyField().name(),
keyFieldName
)
);
}

return (SchemaKTable) schemaKStream;
}

Expand Down Expand Up @@ -414,7 +430,9 @@ public SchemaKStream join() {
+ " the WITHIN clause) and try to execute your join again.");
}

final SchemaKTable rightTable = buildTable(joinNode.getRight());
final SchemaKTable rightTable = buildTable(joinNode.getRight(),
joinNode.getRightKeyFieldName(),
joinNode.getRightAlias());
final SchemaKStream leftStream = buildStream(joinNode.getLeft(),
joinNode.getLeftKeyFieldName());

Expand Down Expand Up @@ -464,8 +482,12 @@ public SchemaKTable join() {
+ "join again.");
}

final SchemaKTable leftTable = buildTable(joinNode.getLeft());
final SchemaKTable rightTable = buildTable(joinNode.getRight());
final SchemaKTable leftTable = buildTable(joinNode.getLeft(),
joinNode.getLeftKeyFieldName(),
joinNode.getLeftAlias());
final SchemaKTable rightTable = buildTable(joinNode.getRight(),
joinNode.getRightKeyFieldName(),
joinNode.getRightAlias());

switch (joinNode.joinType) {
case LEFT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,11 @@ public void shouldFailWithIncorrectJoinCriteria() {
try {
queryAnalyzer.analyze("sqlExpression", query);
} catch (KsqlException ex) {
assertThat(ex.getMessage().trim(), equalTo("Line: 1, Col: 46 : Invalid join criteria (TEST1.COL1 = TEST2.COLL). Key for TEST2 is not set correctly."));
assertThat(
ex.getMessage().trim(),
equalTo(
"Line: 1, Col: 46 : Invalid join criteria (TEST1.COL1 = TEST2.COLL). "
+ "Could not find a join criteria operand for TEST2." ));
}
}

Expand Down
Loading