diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java index 374760829db8..140f1dd11ce8 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java @@ -17,7 +17,6 @@ import io.confluent.ksql.api.client.impl.ClientImpl; import io.vertx.core.Vertx; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import org.reactivestreams.Publisher; @@ -71,9 +70,19 @@ public interface Client { */ BatchedQueryResult executeQuery(String sql, Map properties); - CompletableFuture insertInto(String streamName, Map row); + /** + * Inserts a row into a ksqlDB stream. + * + *

The {@code CompletableFuture} will be failed if a non-200 response is received from the + * server, or if the server encounters an error while processing the insertion. + * + * @param streamName name of the target stream + * @param row the row to insert. Keys are column names and values are column values. + * @return a future that completes once the server response is received + */ + CompletableFuture insertInto(String streamName, KsqlObject row); - Publisher streamInserts(String streamName, Publisher> insertsPublisher); + Publisher streamInserts(String streamName, Publisher insertsPublisher); /** * Terminates a push query with the specified query ID. diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/KsqlArray.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/KsqlArray.java index 57b08efedc7e..ad7622defb05 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/KsqlArray.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/KsqlArray.java @@ -282,7 +282,7 @@ public KsqlArray add(final Boolean value) { * @return a reference to this */ public KsqlArray add(final BigDecimal value) { - delegate.add(value); + delegate.add(value.doubleValue()); return this; } @@ -293,7 +293,7 @@ public KsqlArray add(final BigDecimal value) { * @return a reference to this */ public KsqlArray add(final KsqlArray value) { - delegate.add(value); + delegate.add(KsqlArray.toJsonArray(value)); return this; } @@ -304,7 +304,7 @@ public KsqlArray add(final KsqlArray value) { * @return a reference to this */ public KsqlArray add(final KsqlObject value) { - delegate.add(value); + delegate.add(KsqlObject.toJsonObject(value)); return this; } @@ -385,7 +385,7 @@ public int hashCode() { return delegate.hashCode(); } - private static JsonArray toJsonArray(final KsqlArray ksqlArray) { + static JsonArray toJsonArray(final KsqlArray ksqlArray) { return new JsonArray(ksqlArray.getList()); } } diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/KsqlObject.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/KsqlObject.java index 7d2da060bb2f..50e868f90468 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/KsqlObject.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/KsqlObject.java @@ -300,7 +300,7 @@ public KsqlObject put(final String key, final Boolean value) { * @return a reference to this */ public KsqlObject put(final String key, final BigDecimal value) { - delegate.put(key, value); + delegate.put(key, value.doubleValue()); return this; } @@ -312,7 +312,7 @@ public KsqlObject put(final String key, final BigDecimal value) { * @return a reference to this */ public KsqlObject put(final String key, final KsqlArray value) { - delegate.put(key, value); + delegate.put(key, KsqlArray.toJsonArray(value)); return this; } @@ -324,7 +324,7 @@ public KsqlObject put(final String key, final KsqlArray value) { * @return a reference to this */ public KsqlObject put(final String key, final KsqlObject value) { - delegate.put(key, value); + delegate.put(key, KsqlObject.toJsonObject(value)); return this; } @@ -422,7 +422,7 @@ public static KsqlObject fromArray(final List keys, final KsqlArray valu return ret; } - private static JsonObject toJsonObject(final KsqlObject ksqlObject) { + static JsonObject toJsonObject(final KsqlObject ksqlObject) { return new JsonObject(ksqlObject.getMap()); } } diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index f5d426748750..20f0864366e2 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -23,10 +23,12 @@ import io.confluent.ksql.api.client.ClientOptions; import io.confluent.ksql.api.client.InsertAck; import io.confluent.ksql.api.client.KsqlClientException; +import io.confluent.ksql.api.client.KsqlObject; import io.confluent.ksql.api.client.StreamedQueryResult; import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientRequest; @@ -40,7 +42,6 @@ import java.nio.charset.Charset; import java.util.Base64; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import org.reactivestreams.Publisher; @@ -118,20 +119,42 @@ public BatchedQueryResult executeQuery( } @Override - public CompletableFuture insertInto( - final String streamName, final Map row) { - return null; // not yet implemented + public CompletableFuture insertInto(final String streamName, final KsqlObject row) { + final CompletableFuture cf = new CompletableFuture<>(); + + final Buffer requestBody = Buffer.buffer(); + final JsonObject params = new JsonObject().put("target", streamName); + requestBody.appendBuffer(params.toBuffer()).appendString("\n"); + requestBody.appendString(row.toJsonString()).appendString("\n"); + + makeRequest( + "/inserts-stream", + requestBody, + cf, + response -> handleResponse(response, cf, InsertsResponseHandler::new) + ); + + return cf; } @Override public Publisher streamInserts( - final String streamName, final Publisher> insertsPublisher) { + final String streamName, final Publisher insertsPublisher) { return null; // not yet implemented } @Override public CompletableFuture terminatePushQuery(final String queryId) { - return makeCloseQueryRequest(queryId); + final CompletableFuture cf = new CompletableFuture<>(); + + makeRequest( + "/close-query", + new JsonObject().put("queryId", queryId), + cf, + response -> handleCloseQueryResponse(response, cf) + ); + + return cf; } @Override @@ -144,7 +167,7 @@ public void close() { @FunctionalInterface private interface ResponseHandlerSupplier> { - QueryResponseHandler get(Context ctx, RecordParser recordParser, T cf); + ResponseHandler get(Context ctx, RecordParser recordParser, T cf); } private > void makeQueryRequest( @@ -159,26 +182,21 @@ private > void makeQueryRequest( "/query-stream", requestBody, cf, - response -> handleQueryResponse(response, cf, responseHandlerSupplier) + response -> handleResponse(response, cf, responseHandlerSupplier) ); } - private CompletableFuture makeCloseQueryRequest(final String queryId) { - final CompletableFuture cf = new CompletableFuture<>(); - - makeRequest( - "/close-query", - new JsonObject().put("queryId", queryId), - cf, - response -> handleCloseQueryResponse(response, cf) - ); - - return cf; + private > void makeRequest( + final String path, + final JsonObject requestBody, + final T cf, + final Handler responseHandler) { + makeRequest(path, requestBody.toBuffer(), cf, responseHandler); } private > void makeRequest( final String path, - final JsonObject requestBody, + final Buffer requestBody, final T cf, final Handler responseHandler) { HttpClientRequest request = httpClient.request(HttpMethod.POST, @@ -189,20 +207,20 @@ private > void makeRequest( if (clientOptions.isUseBasicAuth()) { request = configureBasicAuth(request); } - request.end(requestBody.toBuffer()); + request.end(requestBody); } private HttpClientRequest configureBasicAuth(final HttpClientRequest request) { return request.putHeader(AUTHORIZATION.toString(), basicAuthHeader); } - private static > void handleQueryResponse( + private static > void handleResponse( final HttpClientResponse response, final T cf, final ResponseHandlerSupplier responseHandlerSupplier) { if (response.statusCode() == OK.code()) { final RecordParser recordParser = RecordParser.newDelimited("\n", response); - final QueryResponseHandler responseHandler = + final ResponseHandler responseHandler = responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf); recordParser.handler(responseHandler::handleBodyBuffer); diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java index 90c4407c156c..245cb6d9a891 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java @@ -73,7 +73,7 @@ protected void handleRow(final Buffer buff) { } @Override - protected void handleBodyEnd() { + protected void doHandleBodyEnd() { if (!hasReadArguments) { throw new IllegalStateException("Body ended before metadata received"); } diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertsResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertsResponseHandler.java new file mode 100644 index 000000000000..f4711445bee7 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertsResponseHandler.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.KsqlClientException; +import io.vertx.core.Context; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; +import io.vertx.core.parsetools.RecordParser; +import java.util.concurrent.CompletableFuture; + +public class InsertsResponseHandler extends ResponseHandler> { + + private int numAcks; + + InsertsResponseHandler( + final Context context, final RecordParser recordParser, final CompletableFuture cf) { + super(context, recordParser, cf); + } + + @Override + protected void doHandleBodyBuffer(final Buffer buff) { + final JsonObject jsonObject = new JsonObject(buff); + final String status = jsonObject.getString("status"); + if ("ok".equals(status)) { + numAcks++; + } else if ("error".equals(status)) { + cf.completeExceptionally(new KsqlClientException(String.format( + "Received error from /inserts-stream. Error code: %d. Message: %s", + jsonObject.getInteger("error_code"), + jsonObject.getString("message") + ))); + } else { + throw new IllegalStateException( + "Unrecognized status response from /inserts-stream: " + status); + } + } + + @Override + protected void doHandleException(final Throwable t) { + if (!cf.isDone()) { + cf.completeExceptionally(t); + } + } + + @Override + protected void doHandleBodyEnd() { + if (numAcks != 1) { + throw new IllegalStateException( + "Received unexpected number of acks from /inserts-stream. " + + "Expected: 1. Got: " + numAcks); + } + + if (!cf.isDone()) { + cf.complete(null); + } + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResponseHandler.java index 4054c43daa4b..21739a2e5522 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResponseHandler.java @@ -18,29 +18,23 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.confluent.ksql.api.client.util.JsonMapper; import io.confluent.ksql.rest.entity.QueryResponseMetadata; -import io.confluent.ksql.util.VertxUtils; import io.vertx.core.Context; import io.vertx.core.buffer.Buffer; import io.vertx.core.parsetools.RecordParser; import java.util.concurrent.CompletableFuture; -abstract class QueryResponseHandler> { +abstract class QueryResponseHandler> extends ResponseHandler { private static ObjectMapper JSON_MAPPER = JsonMapper.get(); - protected final Context context; - protected final RecordParser recordParser; - protected final T cf; protected boolean hasReadArguments; QueryResponseHandler(final Context context, final RecordParser recordParser, final T cf) { - this.context = context; - this.recordParser = recordParser; - this.cf = cf; + super(context, recordParser, cf); } - public void handleBodyBuffer(final Buffer buff) { - checkContext(); + @Override + protected void doHandleBodyBuffer(final Buffer buff) { if (!hasReadArguments) { handleArgs(buff); } else { @@ -48,8 +42,8 @@ public void handleBodyBuffer(final Buffer buff) { } } - public void handleException(final Throwable t) { - checkContext(); + @Override + protected void doHandleException(final Throwable t) { if (!cf.isDone()) { cf.completeExceptionally(t); } else { @@ -57,23 +51,12 @@ public void handleException(final Throwable t) { } } - public void handleBodyEnd(final Void v) { - checkContext(); - handleBodyEnd(); - } - - protected abstract void handleBodyEnd(); - protected abstract void handleMetadata(QueryResponseMetadata queryResponseMetadata); protected abstract void handleRow(Buffer buff); protected abstract void handleExceptionAfterFutureCompleted(Throwable t); - protected void checkContext() { - VertxUtils.checkContext(context); - } - private void handleArgs(final Buffer buff) { hasReadArguments = true; diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ResponseHandler.java new file mode 100644 index 000000000000..659a251d29c2 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ResponseHandler.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.util.VertxUtils; +import io.vertx.core.Context; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.parsetools.RecordParser; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +abstract class ResponseHandler> { + + protected final Context context; + protected final RecordParser recordParser; + protected final T cf; + + ResponseHandler(final Context context, final RecordParser recordParser, final T cf) { + this.context = Objects.requireNonNull(context); + this.recordParser = Objects.requireNonNull(recordParser); + this.cf = Objects.requireNonNull(cf); + } + + public void handleBodyBuffer(final Buffer buff) { + checkContext(); + doHandleBodyBuffer(buff); + } + + public void handleException(final Throwable t) { + checkContext(); + doHandleException(t); + } + + public void handleBodyEnd(final Void v) { + checkContext(); + doHandleBodyEnd(); + } + + protected abstract void doHandleBodyBuffer(Buffer buff); + + protected abstract void doHandleException(Throwable t); + + protected abstract void doHandleBodyEnd(); + + protected void checkContext() { + VertxUtils.checkContext(context); + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java index b2ca0f6f81f9..45a75fffaaa8 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java @@ -86,7 +86,7 @@ protected void handleRow(final Buffer buff) { } @Override - protected void handleBodyEnd() { + protected void doHandleBodyEnd() { queryResult.complete(); } diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index ff337d3892ec..31ed489754ed 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -36,6 +36,7 @@ import io.confluent.ksql.api.client.util.ClientTestUtil; import io.confluent.ksql.api.client.util.ClientTestUtil.TestSubscriber; import io.confluent.ksql.api.client.util.RowUtil; +import io.confluent.ksql.api.server.KsqlApiException; import io.confluent.ksql.parser.exception.ParseFailedException; import io.confluent.ksql.rest.entity.PushQueryId; import io.vertx.core.json.JsonArray; @@ -43,6 +44,7 @@ import io.vertx.ext.web.client.WebClient; import java.math.BigDecimal; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -69,6 +71,9 @@ public class ClientTest extends BaseApiTest { protected static final String DEFAULT_PUSH_QUERY_WITH_LIMIT = "select * from foo emit changes limit 10;"; protected static final List EXPECTED_ROWS = convertToClientRows(DEFAULT_JSON_ROWS); + protected static final List INSERT_ROWS = generateInsertRows(); + protected static final List EXPECTED_INSERT_ROWS = convertToJsonRows(INSERT_ROWS); + protected Client javaClient; @Override @@ -502,6 +507,55 @@ public void shouldHandleErrorResponseFromTerminatePushQuery() { assertThat(e.getCause().getMessage(), containsString("Error code: " + ERROR_CODE_BAD_REQUEST)); } + @Test + public void shouldInsertInto() throws Exception { + // When + javaClient.insertInto("test-stream", INSERT_ROWS.get(0)).get(); + + // Then + assertThatEventually(() -> testEndpoints.getInsertsSubscriber().getRowsInserted(), hasSize(1)); + assertThat(testEndpoints.getInsertsSubscriber().getRowsInserted().get(0), is(EXPECTED_INSERT_ROWS.get(0))); + assertThatEventually(() -> testEndpoints.getInsertsSubscriber().isCompleted(), is(true)); + assertThatEventually(() -> testEndpoints.getInsertsSubscriber().isClosed(), is(true)); + assertThat(testEndpoints.getLastTarget(), is("test-stream")); + } + + @Test + public void shouldHandleErrorResponseFromInsertInto() { + // Given + KsqlApiException exception = new KsqlApiException("Cannot insert into a table", ERROR_CODE_BAD_REQUEST); + testEndpoints.setCreateInsertsSubscriberException(exception); + + // When + final Exception e = assertThrows( + ExecutionException.class, // thrown from .get() when the future completes exceptionally + () -> javaClient.insertInto("a-table", INSERT_ROWS.get(0)).get() + ); + + // Then + assertThat(e.getCause(), instanceOf(KsqlClientException.class)); + assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); + assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); + } + + @Test + public void shouldHandleErrorFromInsertInto() { + // Given + testEndpoints.setAcksBeforePublisherError(0); + + // When + final Exception e = assertThrows( + ExecutionException.class, // thrown from .get() when the future completes exceptionally + () -> javaClient.insertInto("test-stream", INSERT_ROWS.get(0)).get() + ); + + // Then + assertThat(e.getCause(), instanceOf(KsqlClientException.class)); + assertThat(e.getCause().getMessage(), containsString("Received error from /inserts-stream")); + assertThat(e.getCause().getMessage(), containsString("Error code: 50000")); + assertThat(e.getCause().getMessage(), containsString("Message: Error in processing inserts")); + } + protected Client createJavaClient() { return Client.create(createJavaClientOptions(), vertx); } @@ -638,4 +692,29 @@ private static List convertToClientRows(final List rows) { .map(row -> new KsqlArray(row.getList())) .collect(Collectors.toList()); } + + private static List convertToJsonRows(final List rows) { + return rows.stream() + .map(row -> new JsonObject(row.getMap())) + .collect(Collectors.toList()); + } + + private static List generateInsertRows() { + List rows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + KsqlObject row = new KsqlObject() + .put("f_str", "foo" + i) + .put("f_int", i) + .put("f_bool", i % 2 == 0) + .put("f_long", i * i) + .put("f_double", i + 0.1111) + .put("f_decimal", new BigDecimal(i + 0.1)) + .put("f_array", new KsqlArray().add("s" + i).add("t" + i)) + .put("f_map", new KsqlObject().put("k" + i, "v" + i)) + .put("f_struct", new KsqlObject().put("F1", "v" + i).put("F2", i)) + .putNull("f_null"); + rows.add(row); + } + return rows; + } } \ No newline at end of file diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 407134fe419e..a85070c9076f 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -127,9 +127,7 @@ public class ClientIntegrationTest { @BeforeClass public static void setUpClass() { TEST_HARNESS.ensureTopics(PAGE_VIEW_TOPIC); - TEST_HARNESS.produceRows(PAGE_VIEW_TOPIC, PAGE_VIEWS_PROVIDER, FormatFactory.JSON); - RestIntegrationTestUtil.createStream(REST_APP, PAGE_VIEWS_PROVIDER); makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS " @@ -407,6 +405,47 @@ public void shouldHandleErrorResponseFromTerminatePushQuery() { assertThat(e.getCause().getMessage(), containsString("No query with id NONEXISTENT")); } + @Test + public void shouldInsertInto() throws Exception { + // Given + final KsqlObject insertRow = new KsqlObject() + .put("VIEWTIME", 1000L) + .put("USERID", "User_1") + .put("PAGEID", "Page_28"); + + // When + client.insertInto(PAGE_VIEW_STREAM, insertRow).get(); + + // Then: should receive new row + final String query = "SELECT * FROM " + PAGE_VIEW_STREAM + " EMIT CHANGES LIMIT " + (PAGE_VIEW_NUM_ROWS + 1) + ";"; + final List rows = client.executeQuery(query).get(); + + // Verify last row is as expected + final Row row = rows.get(rows.size() - 1); + assertThat(row.getLong("VIEWTIME"), is(1000L)); + assertThat(row.getString("USERID"), is("User_1")); + assertThat(row.getString("PAGEID"), is("Page_28")); + } + + @Test + public void shouldHandleErrorResponseFromInsertInto() { + // Given + final KsqlObject insertRow = new KsqlObject() + .put("USERID", "User_11") + .put("COUNT", 11L); + + // When + final Exception e = assertThrows( + ExecutionException.class, // thrown from .get() when the future completes exceptionally + () -> client.insertInto(AGG_TABLE, insertRow).get() + ); + + // Then + assertThat(e.getCause(), instanceOf(KsqlClientException.class)); + assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); + assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); + } + private Client createClient() { final ClientOptions clientOptions = ClientOptions.create() .setHost("localhost") diff --git a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TestDataProvider.java b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TestDataProvider.java index c2dc500da371..651daf053139 100644 --- a/ksqldb-engine/src/test/java/io/confluent/ksql/util/TestDataProvider.java +++ b/ksqldb-engine/src/test/java/io/confluent/ksql/util/TestDataProvider.java @@ -26,7 +26,7 @@ import java.util.stream.Collectors; import org.apache.kafka.streams.KeyValue; -public abstract class TestDataProvider { +public class TestDataProvider { private final String topicName; private final PhysicalSchema schema; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/InsertsStreamEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/InsertsStreamEndpoint.java index e9caa87c90db..5162adb832e1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/InsertsStreamEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/InsertsStreamEndpoint.java @@ -15,8 +15,11 @@ package io.confluent.ksql.api.impl; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT; + import io.confluent.ksql.api.server.InsertResult; import io.confluent.ksql.api.server.InsertsStreamSubscriber; +import io.confluent.ksql.api.server.KsqlApiException; import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; @@ -24,7 +27,6 @@ import io.confluent.ksql.name.SourceName; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.VertxUtils; import io.vertx.core.Context; @@ -54,7 +56,7 @@ public InsertsStreamSubscriber createInsertsSubscriber(final String target, final DataSource dataSource = getDataSource(ksqlEngine.getMetaStore(), SourceName.of(target)); if (dataSource.getDataSourceType() == DataSourceType.KTABLE) { - throw new KsqlException("Cannot insert into a table"); + throw new KsqlApiException("Cannot insert into a table", ERROR_CODE_BAD_STATEMENT); } return InsertsSubscriber.createInsertsSubscriber(serviceContext, properties, dataSource, ksqlConfig, context, acksSubscriber, workerExecutor); @@ -66,17 +68,20 @@ private DataSource getDataSource( ) { final DataSource dataSource = metaStore.getSource(sourceName); if (dataSource == null) { - throw new KsqlException("Cannot insert values into an unknown stream: " - + sourceName); + throw new KsqlApiException( + "Cannot insert values into an unknown stream: " + sourceName, ERROR_CODE_BAD_STATEMENT); } if (dataSource.getKsqlTopic().getKeyFormat().isWindowed()) { - throw new KsqlException("Cannot insert values into windowed stream"); + throw new KsqlApiException( + "Cannot insert values into windowed stream", ERROR_CODE_BAD_STATEMENT); } if (reservedInternalTopics.isReadOnly(dataSource.getKafkaTopicName())) { - throw new KsqlException("Cannot insert values into read-only topic: " - + dataSource.getKafkaTopicName()); + throw new KsqlApiException( + "Cannot insert values into read-only topic: " + dataSource.getKafkaTopicName(), + ERROR_CODE_BAD_STATEMENT + ); } return dataSource; diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/InsertsStreamHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/InsertsStreamHandler.java index 98cd56d57291..2840aa227e9d 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/InsertsStreamHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/InsertsStreamHandler.java @@ -18,8 +18,6 @@ import static io.confluent.ksql.api.server.QueryStreamHandler.DELIMITED_CONTENT_TYPE; import static io.confluent.ksql.api.server.ServerUtils.checkHttp2; import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_REQUEST; -import static io.confluent.ksql.rest.Errors.ERROR_CODE_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import io.confluent.ksql.api.auth.DefaultApiSecurityContext; import io.confluent.ksql.api.spi.Endpoints; @@ -36,7 +34,6 @@ import io.vertx.ext.web.RoutingContext; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,22 +151,8 @@ private void handleArgs(final Buffer buff) { this.insertsSubscriber = insertsSubscriber; }) - .exceptionally(t -> handleInsertSubscriberException(t, routingContext)); - } - - private Void handleInsertSubscriberException(final Throwable t, - final RoutingContext routingContext) { - Throwable toLog = t; - if (t instanceof CompletionException) { - toLog = t.getCause(); - } - log.error("Failed to execute inserts", toLog); - // We don't expose internal error message via public API - routingContext.fail(INTERNAL_SERVER_ERROR.code(), - new KsqlApiException("The server encountered an internal error when processing inserts." - + " Please consult the server logs for more information.", - ERROR_CODE_SERVER_ERROR)); - return null; + .exceptionally(t -> + ServerUtils.handleEndpointException(t, routingContext, "Failed to execute inserts")); } private void handleRow(final Buffer buff) { diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java index 52cf519bc933..50eb3eb33e37 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java @@ -16,22 +16,16 @@ package io.confluent.ksql.api.server; import static io.confluent.ksql.api.server.ServerUtils.checkHttp2; -import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT; -import static io.confluent.ksql.rest.Errors.ERROR_CODE_SERVER_ERROR; -import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; -import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import io.confluent.ksql.api.auth.DefaultApiSecurityContext; import io.confluent.ksql.api.spi.Endpoints; import io.confluent.ksql.rest.entity.QueryResponseMetadata; import io.confluent.ksql.rest.entity.QueryStreamArgs; -import io.confluent.ksql.util.KsqlStatementException; import io.vertx.core.Context; import io.vertx.core.Handler; import io.vertx.ext.web.RoutingContext; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.CompletionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,29 +108,8 @@ public void handle(final RoutingContext routingContext) { queryPublisher.subscribe(querySubscriber); }) - .exceptionally(t -> handleQueryPublisherException(t, routingContext)); - } - - private Void handleQueryPublisherException(final Throwable t, - final RoutingContext routingContext) { - log.error("Failed to execute query", t); - if (t instanceof CompletionException) { - final Throwable actual = t.getCause(); - if (actual instanceof KsqlStatementException) { - routingContext.fail(BAD_REQUEST.code(), - new KsqlApiException(actual.getMessage(), ERROR_CODE_BAD_STATEMENT)); - return null; - } else if (actual instanceof KsqlApiException) { - routingContext.fail(BAD_REQUEST.code(), actual); - return null; - } - } - // We don't expose internal error message via public API - routingContext.fail(INTERNAL_SERVER_ERROR.code(), new KsqlApiException( - "The server encountered an internal error when processing the query." - + " Please consult the server logs for more information.", - ERROR_CODE_SERVER_ERROR)); - return null; + .exceptionally(t -> + ServerUtils.handleEndpointException(t, routingContext, "Failed to execute query")); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerUtils.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerUtils.java index 709c17a33f21..b880492b7983 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerUtils.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/ServerUtils.java @@ -16,19 +16,24 @@ package io.confluent.ksql.api.server; import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_REQUEST; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_BAD_STATEMENT; import static io.confluent.ksql.rest.Errors.ERROR_CODE_HTTP2_ONLY; +import static io.confluent.ksql.rest.Errors.ERROR_CODE_SERVER_ERROR; import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.MismatchedInputException; import io.confluent.ksql.rest.ApiJsonMapper; +import io.confluent.ksql.util.KsqlStatementException; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpVersion; import io.vertx.ext.web.RoutingContext; import java.io.IOException; import java.util.Optional; +import java.util.concurrent.CompletionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,4 +108,27 @@ public static boolean checkHttp2(final RoutingContext routingContext) { } } + static Void handleEndpointException( + final Throwable t, final RoutingContext routingContext, final String logMsg) { + if (t instanceof CompletionException) { + final Throwable actual = t.getCause(); + log.error(logMsg, actual); + if (actual instanceof KsqlStatementException) { + routingContext.fail(BAD_REQUEST.code(), + new KsqlApiException(actual.getMessage(), ERROR_CODE_BAD_STATEMENT)); + return null; + } else if (actual instanceof KsqlApiException) { + routingContext.fail(BAD_REQUEST.code(), actual); + return null; + } + } else { + log.error(logMsg, t); + } + // We don't expose internal error message via public API + routingContext.fail(INTERNAL_SERVER_ERROR.code(), new KsqlApiException( + "The server encountered an internal error when processing the query." + + " Please consult the server logs for more information.", + ERROR_CODE_SERVER_ERROR)); + return null; + } } diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java index ed97dc46ec21..ac189eda7808 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java @@ -53,6 +53,7 @@ public class TestEndpoints implements Endpoints { private int acksBeforePublisherError = -1; private int rowsBeforePublisherError = -1; private RuntimeException createQueryPublisherException; + private RuntimeException createInsertsSubscriberException; private ApiSecurityContext lastApiSecurityContext; @Override @@ -88,14 +89,22 @@ public synchronized CompletableFuture createInsertsSubs final Context context, final WorkerExecutor workerExecutor, final ApiSecurityContext apiSecurityContext) { - this.lastTarget = target; - this.lastProperties = properties; - this.lastApiSecurityContext = apiSecurityContext; - BufferedPublisher acksPublisher = new BufferedPublisher<>(Vertx.currentContext()); - acksPublisher.subscribe(acksSubscriber); - this.insertsSubscriber = new TestInsertsSubscriber(Vertx.currentContext(), acksPublisher, - acksBeforePublisherError); - return CompletableFuture.completedFuture(insertsSubscriber); + CompletableFuture completableFuture = new CompletableFuture<>(); + if (createInsertsSubscriberException != null) { + createInsertsSubscriberException.fillInStackTrace(); + completableFuture.completeExceptionally(createInsertsSubscriberException); + } else { + this.lastTarget = target; + this.lastProperties = properties; + this.lastApiSecurityContext = apiSecurityContext; + BufferedPublisher acksPublisher = new BufferedPublisher<>( + Vertx.currentContext()); + acksPublisher.subscribe(acksSubscriber); + this.insertsSubscriber = new TestInsertsSubscriber(Vertx.currentContext(), acksPublisher, + acksBeforePublisherError); + completableFuture.complete(insertsSubscriber); + } + return completableFuture; } @Override @@ -227,6 +236,10 @@ public synchronized void setCreateQueryPublisherException(final RuntimeException this.createQueryPublisherException = exception; } + public synchronized void setCreateInsertsSubscriberException(final RuntimeException exception) { + this.createInsertsSubscriberException = exception; + } + private static int extractLimit(final String sql) { final int ind = sql.toLowerCase().indexOf("limit"); if (ind == -1) { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ApiIntegrationTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ApiIntegrationTest.java index a3b613a221d2..49333598fba3 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ApiIntegrationTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/integration/ApiIntegrationTest.java @@ -327,7 +327,6 @@ public void shouldExecuteInserts() { for (int i = 0; i < numRows; i++) { JsonObject row = new JsonObject() - .put("ROWKEY", 10 + i) .put("VIEWTIME", 1000 + i) .put("USERID", "User" + i % 3) .put("PAGEID", "PAGE" + (numRows - i)); @@ -386,7 +385,6 @@ public void shouldFailToInsertWithNonMatchingValueType() { // Given: JsonObject row = new JsonObject() - .put("ROWKEY", 10) .put("VIEWTIME", 1000) .put("USERID", 123) .put("PAGEID", "PAGE23");