Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: better error message on self-join #4248

Merged
merged 1 commit into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,14 @@ protected AstNode visitJoin(final Join node, final Void context) {
throw new KsqlException("Only equality join criteria is supported.");
}

if (left.getDataSource().getName().equals(right.getDataSource().getName())) {
throw new KsqlException(
"Can not join '" + left.getDataSource().getName().toString(FormatOptions.noEscape())
+ "' to '" + right.getDataSource().getName().toString(FormatOptions.noEscape())
+ "': self joins are not yet supported."
);
}

final ColumnRef leftJoinField = getJoinFieldName(
comparisonExpression,
left.getAlias(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class AnalyzerFunctionalTest {
public final ExpectedException expectedException = ExpectedException.none();

@Mock
private SerdeOptionsSupplier serdeOptiponsSupplier;
private SerdeOptionsSupplier serdeOptionsSupplier;
@Mock
private Sink sink;

Expand All @@ -127,7 +127,7 @@ public void init() {
jsonMetaStore,
"",
DEFAULT_SERDE_OPTIONS,
serdeOptiponsSupplier
serdeOptionsSupplier
);

when(sink.getName()).thenReturn(SourceName.of("TEST0"));
Expand Down Expand Up @@ -388,7 +388,7 @@ public void shouldFailIfExplicitNamespaceIsProvidedButEmpty() {
public void shouldGetSerdeOptions() {
// Given:
final Set<SerdeOption> serdeOptions = ImmutableSet.of(SerdeOption.UNWRAP_SINGLE_VALUES);
when(serdeOptiponsSupplier.build(any(), any(), any(), any())).thenReturn(serdeOptions);
when(serdeOptionsSupplier.build(any(), any(), any(), any())).thenReturn(serdeOptions);

givenSinkValueFormat(Format.AVRO);
givenWrapSingleValues(true);
Expand All @@ -397,7 +397,7 @@ public void shouldGetSerdeOptions() {
final Analysis result = analyzer.analyze(query, Optional.of(sink));

// Then:
verify(serdeOptiponsSupplier).build(
verify(serdeOptionsSupplier).build(
ImmutableList.of("COL0", "COL1").stream().map(ColumnName::of).collect(Collectors.toList()),
Format.AVRO,
Optional.of(true),
Expand Down Expand Up @@ -484,6 +484,26 @@ public void shouldNotIncludeMetaColumnsForSelectStartOnStaticQueries() {
)));
}

@Test
public void shouldThrowOnSelfJoin() {
// Given:
final CreateStreamAsSelect createStreamAsSelect = parseSingle(
"CREATE STREAM FOO AS "
+ "SELECT * FROM test1 t1 JOIN test1 t2 ON t1.rowkey = t2.rowkey;"
);

final Query query = createStreamAsSelect.getQuery();

final Analyzer analyzer = new Analyzer(jsonMetaStore, "", DEFAULT_SERDE_OPTIONS);

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectMessage("Can not join 'TEST1' to 'TEST1': self joins are not yet supported.");

// When:
analyzer.analyze(query, Optional.of(createStreamAsSelect.getSink()));
}

@SuppressWarnings("unchecked")
private <T extends Statement> T parseSingle(final String simpleQuery) {
return (T) Iterables.getOnlyElement(parse(simpleQuery, jsonMetaStore));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1922,6 +1922,17 @@
{"name": "OUTPUT", "type": "stream", "schema": "ROWKEY STRING KEY, L_ROWKEY STRING, L1 INT, R1 INT"}
]
}
},
{
"name": "self join",
"statements": [
"CREATE STREAM INPUT (ID bigint) WITH (kafka_topic='left_topic', value_format='JSON');",
"CREATE STREAM OUTPUT as SELECT * FROM INPUT s1 JOIN INPUT s2 WITHIN 1 HOUR ON s1.id = s2.id;"
],
"expectedException": {
"type": "io.confluent.ksql.util.KsqlStatementException",
"message": "Can not join 'INPUT' to 'INPUT': self joins are not yet supported."
}
}
]
}