Skip to content

Commit

Permalink
fix: make api client recognize ddl warnings better (#9341)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zara Lim authored Jul 28, 2022
1 parent 63f2014 commit c565ac5
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ private static boolean isCommandStatusEntity(final JsonObject ksqlEntity) {
}

private static boolean isIfNotExistsWarning(final JsonObject ksqlEntity) {
return ksqlEntity.getString("statementText") != null
&& ksqlEntity.getString("statementText").contains("IF NOT EXISTS")
return ksqlEntity.getString("message") != null
&& ksqlEntity.getString("message").startsWith("Cannot add")
&& ksqlEntity.getString("@type") != null
&& ksqlEntity.getString("@type").equals("warning_entity");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -1191,6 +1192,40 @@ public void shouldFailToDropConnectorViaExecuteStatement() {
containsString("Use the dropConnector() method instead"));
}

@Test
public void shouldHandleWarningsForDdlStatements()
throws ExecutionException, InterruptedException {
// Given
final WarningEntity entity = new WarningEntity("create stream if not exists;", "Cannot add stream foo: A stream with the same name already exists.");
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When
final ExecuteStatementResult result = javaClient.executeStatement("create stream if not exists;").get();

// Then
assertFalse(result.queryId().isPresent());
}

@Test
public void shouldRejectWarningsFromNonDdlStatementsViaExecuteStatement() {
// Given
final WarningEntity entity = new WarningEntity("drop connector if exits;", "Connector does not exist.");
testEndpoints.setKsqlEndpointResponse(Collections.singletonList(entity));

// When
final Exception e = assertThrows(
ExecutionException.class, // thrown from .get() when the future completes exceptionally
() -> javaClient.executeStatement("drop connector if exits;").get()
);

// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_REQUEST_ACCEPTED_DOC));
assertThat(e.getCause().getMessage(), containsString(EXECUTE_STATEMENT_USAGE_DOC));
assertThat(e.getCause().getMessage(),
containsString("Use the dropConnector() method instead"));
}

@Test
public void shouldListStreams() throws Exception {
// Given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -829,6 +830,34 @@ public void shouldHandleErrorResponseFromExecuteStatement() {
assertThat(e.getCause().getMessage(), containsString("Error code: 40001"));
}

@Test
public void shouldHandleWarningResponseFromExecuteStatement()
throws ExecutionException, InterruptedException {
// Given:
client.executeStatement("create table if not exists tasks (taskId varchar primary key) with (kafka_topic='tasks', value_format='json', partitions=1);").get();

// When:
final ExecuteStatementResult result =
client.executeStatement("create table if not exists tasks (taskId varchar primary key) with (kafka_topic='tasks', value_format='json', partitions=1);").get();

// Then
assertFalse(result.queryId().isPresent());
}

@Test
public void shouldRejectWarningsFromConnectorRequestsInExecuteStatement() throws Exception {
// When
final Exception e = assertThrows(
ExecutionException.class, // thrown from .get() when the future completes exceptionally
() -> client.executeStatement("DROP CONNECTOR IF EXISTS foo;").get()
);

// Then
assertThat(e.getCause(), instanceOf(KsqlClientException.class));
assertThat(e.getCause().getMessage(), containsString("The ksqlDB server accepted the statement issued via executeStatement(), but the response received is of an unexpected format."));
assertThat(e.getCause().getMessage(), containsString("Use the dropConnector() method instead."));
}

@Test
public void shouldRejectMultipleRequestsFromExecuteStatement() {
// When
Expand Down

0 comments on commit c565ac5

Please sign in to comment.