Skip to content

Commit

Permalink
fix: Specify source type in DROP referential integrity errors (#8253)
Browse files Browse the repository at this point in the history
Without this patch a DROP command for a non-existent table or
stream resulted in a generic error, e.g. "Source `BAAZ` does not
exist".

This patch specifies the source type in the error message. In the
example above, it might be "TABLE `BAAZ` does not exist" if the
command was "DROP TABLE BAAZ".
  • Loading branch information
Gerrrr authored Oct 19, 2021
1 parent ef48f26 commit 03bd713
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ public void shouldHandleErrorResponseFromExecuteStatement() {
// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
assertThat(e.getCause().getMessage(), containsString("Source NONEXISTENT does not exist"));
assertThat(e.getCause().getMessage(), containsString("Stream NONEXISTENT does not exist"));
assertThat(e.getCause().getMessage(), containsString("Error code: 40001"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;

public final class DropSourceFactory {
private final MetaStore metaStore;
Expand Down Expand Up @@ -58,13 +59,14 @@ private DropSourceCommand create(
final DataSource dataSource = metaStore.getSource(sourceName);
if (dataSource == null) {
if (!ifExists) {
throw new KsqlException("Source " + sourceName.text() + " does not exist.");
throw new KsqlException(StringUtils.capitalize(dataSourceType.getKsqlType().toLowerCase())
+ " " + sourceName.text() + " does not exist.");
}
} else if (dataSource.getDataSourceType() != dataSourceType) {
throw new KsqlException(String.format(
"Incompatible data source type is %s, but statement was DROP %s",
dataSource.getDataSourceType() == DataSourceType.KSTREAM ? "STREAM" : "TABLE",
dataSourceType == DataSourceType.KSTREAM ? "STREAM" : "TABLE"
dataSource.getDataSourceType().getKsqlType().toLowerCase(),
dataSourceType.getKsqlType().toLowerCase()
));
} else if (dataSource.isSource() && deleteTopic) {
throw new KsqlException("Cannot delete topic for read-only source: " + sourceName.text());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,23 @@ public void shouldFailDropSourceOnMissingSourceWithNoIfExistsForStream() {
);

// Then:
assertThat(e.getMessage(), containsString("Source bob does not exist."));
assertThat(e.getMessage(), containsString("Stream bob does not exist."));
}

@Test
public void shouldFailDropSourceOnMissingSourceWithNoIfExistsForTable() {
// Given:
final DropTable dropTable = new DropTable(SOME_NAME, false, true);
when(metaStore.getSource(SOME_NAME)).thenReturn(null);

// When:
final Exception e = assertThrows(
KsqlException.class,
() -> dropSourceFactory.create(dropTable)
);

// Then:
assertThat(e.getMessage(), containsString("Table bob does not exist."));
}

@Test
Expand All @@ -131,7 +147,7 @@ public void shouldFailDropSourceOnDropIncompatibleSourceForStream() {
);

// Then:
assertThat(e.getMessage(), containsString("Incompatible data source type is TABLE"));
assertThat(e.getMessage(), containsString("Incompatible data source type is table"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
],
"expectedException" : {
"type": "io.confluent.ksql.util.KsqlException",
"message": "Source TEST does not exist"
"message": "Stream TEST does not exist"
}
},
{
Expand Down

0 comments on commit 03bd713

Please sign in to comment.