diff --git a/ksqldb-api-client/pom.xml b/ksqldb-api-client/pom.xml
new file mode 100644
index 000000000000..8096a02b1a36
--- /dev/null
+++ b/ksqldb-api-client/pom.xml
@@ -0,0 +1,97 @@
+
+
+
+
+ 4.0.0
+
+
+
+ io.confluent.ksql
+ ksqldb-parent
+ 6.0.0-SNAPSHOT
+
+
+ ksqldb-api-client
+
+
+
+ io.confluent.ksql
+ ksqldb-rest-app
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jdk8
+ ${jackson.version}
+
+
+
+ io.vertx
+ vertx-core
+ ${vertx.version}
+
+
+
+ org.reactivestreams
+ reactive-streams
+ ${reactive-streams.version}
+
+
+
+
+
+ io.confluent.ksql
+ ksqldb-rest-app
+ ${project.parent.version}
+ test-jar
+ test
+
+
+
+ io.confluent.ksql
+ ksqldb-test-util
+ ${project.version}
+ test
+
+
+
+
+ org.eclipse.jetty
+ jetty-jaas
+ ${jetty.version}
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+
+ test-jar
+
+
+
+
+
+
+
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java
new file mode 100644
index 000000000000..5cacefd7ca47
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client;
+
+import io.confluent.ksql.api.client.impl.ClientImpl;
+import io.vertx.core.Vertx;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.reactivestreams.Publisher;
+
+public interface Client {
+
+ /**
+ * Execute a query (push or pull) and receive the results one row at a time.
+ *
+ * @param sql statement of query to execute.
+ * @return query result.
+ */
+ CompletableFuture streamQuery(String sql);
+
+ /**
+ * Execute a query (push or pull) and receive the results one row at a time.
+ *
+ * @param sql statement of query to execute.
+ * @param properties query properties.
+ * @return query result.
+ */
+ CompletableFuture streamQuery(String sql, Map properties);
+
+ /**
+ * Execute a query (push or pull) and receive all result rows together, once the query has
+ * completed.
+ *
+ * @param sql statement of query to execute.
+ * @return query result.
+ */
+ CompletableFuture> executeQuery(String sql);
+
+ /**
+ * Execute a query (push or pull) and receive all result rows together, once the query has
+ * completed.
+ *
+ * @param sql statement of query to execute.
+ * @param properties query properties.
+ * @return query result.
+ */
+ CompletableFuture> executeQuery(String sql, Map properties);
+
+ CompletableFuture insertInto(String streamName, Map row);
+
+ Publisher streamInserts(String streamName, Publisher> insertsPublisher);
+
+ void close();
+
+ static Client create(ClientOptions clientOptions) {
+ return new ClientImpl(clientOptions);
+ }
+
+ static Client create(ClientOptions clientOptions, Vertx vertx) {
+ return new ClientImpl(clientOptions, vertx);
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java
new file mode 100644
index 000000000000..93c9d80ad289
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/ClientOptions.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client;
+
+import io.confluent.ksql.api.client.impl.ClientOptionsImpl;
+
+public interface ClientOptions {
+
+ ClientOptions setHost(String host);
+
+ ClientOptions setPort(int port);
+
+ ClientOptions setUseTls(boolean useTls);
+
+ ClientOptions setUseClientAuth(boolean useClientAuth);
+
+ ClientOptions setVerifyHost(boolean verifyHost);
+
+ ClientOptions setTrustAll(boolean trustAll);
+
+ ClientOptions setTrustStore(String trustStorePath);
+
+ ClientOptions setTrustStorePassword(String trustStorePassword);
+
+ ClientOptions setKeyStore(String keyStorePath);
+
+ ClientOptions setKeyStorePassword(String keyStorePassword);
+
+ ClientOptions setBasicAuthCredentials(String username, String password);
+
+ ClientOptions setExecuteQueryMaxResultRows(int maxRows);
+
+ String getHost();
+
+ int getPort();
+
+ boolean isUseTls();
+
+ boolean isUseClientAuth();
+
+ boolean isVerifyHost();
+
+ boolean isTrustAll();
+
+ boolean isUseBasicAuth();
+
+ String getTrustStore();
+
+ String getTrustStorePassword();
+
+ String getKeyStore();
+
+ String getKeyStorePassword();
+
+ String getBasicAuthUsername();
+
+ String getBasicAuthPassword();
+
+ int getExecuteQueryMaxResultRows();
+
+ ClientOptions copy();
+
+ static ClientOptions create() {
+ return new ClientOptionsImpl();
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java
new file mode 100644
index 000000000000..8beca33e9e79
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client;
+
+public interface InsertAck {
+
+ int getNum();
+
+}
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/QueryResult.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/QueryResult.java
new file mode 100644
index 000000000000..6c231ce004cc
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/QueryResult.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.reactivestreams.Publisher;
+
+/**
+ * The result of a query (push or pull), streamed one row at time. Records may be consumed by either
+ * subscribing to the publisher or polling (blocking) for one record at a time. These two methods of
+ * consumption are mutually exclusive; only one method may be used (per QueryResult).
+ */
+public interface QueryResult extends Publisher {
+
+ List columnNames();
+
+ List columnTypes();
+
+ String queryID();
+
+ /**
+ * Block until a row becomes available.
+ *
+ * @return the row.
+ */
+ Row poll();
+
+ /**
+ * Block until a row becomes available or the timeout has elapsed.
+ *
+ * @param timeout amount of to wait for a row. Non-positive values are interpreted as no timeout.
+ * @param timeUnit unit for timeout param.
+ * @return the row, if available; else, null.
+ */
+ Row poll(long timeout, TimeUnit timeUnit);
+
+ boolean isComplete();
+
+ void close();
+
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Row.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Row.java
new file mode 100644
index 000000000000..f1cedf07d38f
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Row.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client;
+
+import java.util.List;
+
+/**
+ * A single record, returned as part of a query result.
+ */
+public interface Row {
+
+ List columnNames();
+
+ List columnTypes();
+
+ List values();
+
+ /**
+ * Get the value for a particular column of the Row as an Object.
+ *
+ * @param columnIndex index of column (1-indexed).
+ * @return column value.
+ */
+ Object getObject(int columnIndex);
+
+ /**
+ * Get the value for a particular column of the Row as an Object.
+ *
+ * @param columnName name of column.
+ * @return column value.
+ */
+ Object getObject(String columnName);
+
+ /**
+ * Get the value for a particular column of the Row as a string.
+ *
+ * @param columnIndex index of column (1-indexed).
+ * @return column value.
+ */
+ String getString(int columnIndex);
+
+ /**
+ * Get the value for a particular column of the Row as a string.
+ *
+ * @param columnName name of column.
+ * @return column value.
+ */
+ String getString(String columnName);
+
+ /**
+ * Get the value for a particular column of the Row as an integer.
+ *
+ * @param columnIndex index of column (1-indexed).
+ * @return column value.
+ */
+ Integer getInt(int columnIndex);
+
+ /**
+ * Get the value for a particular column of the Row as an integer.
+ *
+ * @param columnName name of column.
+ * @return column value.
+ */
+ Integer getInt(String columnName);
+
+ /**
+ * Get the value for a particular column of the Row as a long.
+ *
+ * @param columnIndex index of column (1-indexed).
+ * @return column value.
+ */
+ Long getLong(int columnIndex);
+
+ /**
+ * Get the value for a particular column of the Row as a long.
+ *
+ * @param columnName name of column.
+ * @return column value.
+ */
+ Long getLong(String columnName);
+
+ /**
+ * Get the value for a particular column of the Row as a double.
+ *
+ * @param columnIndex index of column (1-indexed).
+ * @return column value.
+ */
+ Double getDouble(int columnIndex);
+
+ /**
+ * Get the value for a particular column of the Row as a double.
+ *
+ * @param columnName name of column.
+ * @return column value.
+ */
+ Double getDouble(String columnName);
+
+ /**
+ * Get the value for a particular column of the Row as a boolean.
+ *
+ * @param columnIndex index of column (1-indexed).
+ * @return column value.
+ */
+ Boolean getBoolean(int columnIndex);
+
+ /**
+ * Get the value for a particular column of the Row as a boolean.
+ *
+ * @param columnName name of column.
+ * @return column value.
+ */
+ Boolean getBoolean(String columnName);
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
new file mode 100644
index 000000000000..5a7b0ceb3de8
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.AUTHORIZATION;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+
+import io.confluent.ksql.api.client.Client;
+import io.confluent.ksql.api.client.ClientOptions;
+import io.confluent.ksql.api.client.InsertAck;
+import io.confluent.ksql.api.client.QueryResult;
+import io.confluent.ksql.api.client.Row;
+import io.confluent.ksql.rest.client.KsqlRestClientException;
+import io.vertx.core.Context;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpClient;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.http.HttpClientRequest;
+import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.core.http.HttpVersion;
+import io.vertx.core.json.JsonObject;
+import io.vertx.core.net.JksOptions;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.core.parsetools.RecordParser;
+import java.nio.charset.Charset;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.reactivestreams.Publisher;
+
+public class ClientImpl implements Client {
+
+ private final ClientOptions clientOptions;
+ private final Vertx vertx;
+ private final HttpClient httpClient;
+ private final SocketAddress serverSocketAddress;
+ private final String basicAuthHeader;
+ private final boolean ownedVertx;
+
+ public ClientImpl(final ClientOptions clientOptions) {
+ this(clientOptions, Vertx.vertx(), true);
+ }
+
+ public ClientImpl(final ClientOptions clientOptions, final Vertx vertx) {
+ this(clientOptions, vertx, false);
+ }
+
+ private ClientImpl(final ClientOptions clientOptions, final Vertx vertx,
+ final boolean ownedVertx) {
+ this.clientOptions = clientOptions.copy();
+ this.vertx = vertx;
+ this.ownedVertx = ownedVertx;
+ this.httpClient = createHttpClient(vertx, clientOptions);
+ this.basicAuthHeader = createBasicAuthHeader(clientOptions);
+ this.serverSocketAddress =
+ SocketAddress.inetSocketAddress(clientOptions.getPort(), clientOptions.getHost());
+ }
+
+ @Override
+ public CompletableFuture streamQuery(final String sql) {
+ return streamQuery(sql, Collections.emptyMap());
+ }
+
+ @Override
+ public CompletableFuture streamQuery(
+ final String sql,
+ final Map properties
+ ) {
+ return makeQueryRequest(sql, properties, StreamQueryResponseHandler::new);
+ }
+
+ @Override
+ public CompletableFuture> executeQuery(final String sql) {
+ return executeQuery(sql, Collections.emptyMap());
+ }
+
+ @Override
+ public CompletableFuture> executeQuery(
+ final String sql,
+ final Map properties
+ ) {
+ return makeQueryRequest(
+ sql,
+ properties,
+ (context, recordParser, cf) -> new ExecuteQueryResponseHandler(
+ context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows())
+ );
+ }
+
+ @Override
+ public CompletableFuture insertInto(
+ final String streamName, final Map row) {
+ return null; // not yet implemented
+ }
+
+ @Override
+ public Publisher streamInserts(
+ final String streamName, final Publisher> insertsPublisher) {
+ return null; // not yet implemented
+ }
+
+ @Override
+ public void close() {
+ httpClient.close();
+ if (ownedVertx) {
+ vertx.close();
+ }
+ }
+
+ @FunctionalInterface
+ private interface ResponseHandlerSupplier {
+ QueryResponseHandler get(Context ctx, RecordParser recordParser, CompletableFuture cf);
+ }
+
+ private CompletableFuture makeQueryRequest(
+ final String sql,
+ final Map properties,
+ final ResponseHandlerSupplier responseHandlerSupplier
+ ) {
+
+ final JsonObject requestBody = new JsonObject().put("sql", sql).put("properties", properties);
+
+ final CompletableFuture cf = new CompletableFuture<>();
+
+ HttpClientRequest request = httpClient.request(HttpMethod.POST,
+ serverSocketAddress, clientOptions.getPort(), clientOptions.getHost(),
+ "/query-stream",
+ response -> handleResponse(response, cf, responseHandlerSupplier))
+ .exceptionHandler(cf::completeExceptionally);
+ if (clientOptions.isUseBasicAuth()) {
+ request = configureBasicAuth(request);
+ }
+ request.end(requestBody.toBuffer());
+
+ return cf;
+ }
+
+ private HttpClientRequest configureBasicAuth(final HttpClientRequest request) {
+ return request.putHeader(AUTHORIZATION.toString(), basicAuthHeader);
+ }
+
+ private static void handleResponse(
+ final HttpClientResponse response,
+ final CompletableFuture cf,
+ final ResponseHandlerSupplier responseHandlerSupplier) {
+ if (response.statusCode() == OK.code()) {
+ final RecordParser recordParser = RecordParser.newDelimited("\n", response);
+ final QueryResponseHandler responseHandler =
+ responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf);
+
+ recordParser.handler(responseHandler::handleBodyBuffer);
+ recordParser.endHandler(responseHandler::handleBodyEnd);
+ recordParser.exceptionHandler(responseHandler::handleException);
+ } else {
+ response.bodyHandler(buffer -> {
+ final JsonObject errorResponse = buffer.toJsonObject();
+ cf.completeExceptionally(new KsqlRestClientException(String.format(
+ "Received %d response from server: %s. Error code: %d",
+ response.statusCode(),
+ errorResponse.getString("message"),
+ errorResponse.getInteger("errorCode")
+ )));
+ });
+ }
+ }
+
+ private static HttpClient createHttpClient(final Vertx vertx, final ClientOptions clientOptions) {
+ HttpClientOptions options = new HttpClientOptions()
+ .setSsl(clientOptions.isUseTls())
+ .setUseAlpn(true)
+ .setProtocolVersion(HttpVersion.HTTP_2)
+ .setVerifyHost(clientOptions.isVerifyHost())
+ .setTrustAll(clientOptions.isTrustAll())
+ .setDefaultHost(clientOptions.getHost())
+ .setDefaultPort(clientOptions.getPort());
+ if (clientOptions.isUseTls() && !clientOptions.getTrustStore().isEmpty()) {
+ options = options.setTrustStoreOptions(
+ new JksOptions()
+ .setPath(clientOptions.getTrustStore())
+ .setPassword(clientOptions.getTrustStorePassword())
+ );
+ }
+ if (clientOptions.isUseClientAuth()) {
+ options = options.setKeyStoreOptions(
+ new JksOptions()
+ .setPath(clientOptions.getKeyStore())
+ .setPassword(clientOptions.getKeyStorePassword())
+ );
+ }
+ return vertx.createHttpClient(options);
+ }
+
+ private static String createBasicAuthHeader(final ClientOptions clientOptions) {
+ if (!clientOptions.isUseBasicAuth()) {
+ return "";
+ }
+
+ final String creds = clientOptions.getBasicAuthUsername()
+ + ":"
+ + clientOptions.getBasicAuthPassword();
+ final String base64creds =
+ Base64.getEncoder().encodeToString(creds.getBytes(Charset.defaultCharset()));
+ return "Basic " + base64creds;
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java
new file mode 100644
index 000000000000..393a3e6d73e6
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientOptionsImpl.java
@@ -0,0 +1,224 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import io.confluent.ksql.api.client.ClientOptions;
+import java.util.Objects;
+
+public class ClientOptionsImpl implements ClientOptions {
+
+ private String host = "localhost";
+ private int port = 8088;
+ private boolean useTls = false;
+ private boolean useClientAuth = false;
+ private boolean verifyHost = true;
+ private boolean trustAll = false;
+ private boolean useBasicAuth = false;
+ private String trustStorePath;
+ private String trustStorePassword;
+ private String keyStorePath;
+ private String keyStorePassword;
+ private String basicAuthUsername;
+ private String basicAuthPassword;
+ private int executeQueryMaxResultRows = 10000;
+
+ public ClientOptionsImpl() {
+ }
+
+ // CHECKSTYLE_RULES.OFF: ParameterNumberCheck
+ private ClientOptionsImpl(
+ // CHECKSTYLE_RULES.ON: ParameterNumberCheck
+ final String host, final int port,
+ final boolean useTls, final boolean useClientAuth,
+ final boolean verifyHost, final boolean trustAll,
+ final boolean useBasicAuth,
+ final String trustStorePath, final String trustStorePassword,
+ final String keyStorePath, final String keyStorePassword,
+ final String basicAuthUsername, final String basicAuthPassword,
+ final int executeQueryMaxResultRows) {
+ this.host = Objects.requireNonNull(host);
+ this.port = port;
+ this.useTls = useTls;
+ this.useClientAuth = useClientAuth;
+ this.verifyHost = verifyHost;
+ this.trustAll = trustAll;
+ this.useBasicAuth = useBasicAuth;
+ this.trustStorePath = trustStorePath;
+ this.trustStorePassword = trustStorePassword;
+ this.keyStorePath = keyStorePath;
+ this.keyStorePassword = keyStorePassword;
+ this.basicAuthUsername = basicAuthUsername;
+ this.basicAuthPassword = basicAuthPassword;
+ this.executeQueryMaxResultRows = executeQueryMaxResultRows;
+ }
+
+ @Override
+ public ClientOptions setHost(final String host) {
+ this.host = host;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setPort(final int port) {
+ this.port = port;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setUseTls(final boolean useTls) {
+ this.useTls = useTls;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setUseClientAuth(final boolean useClientAuth) {
+ this.useClientAuth = useClientAuth;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setVerifyHost(final boolean verifyHost) {
+ this.verifyHost = verifyHost;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setTrustAll(final boolean trustAll) {
+ this.trustAll = trustAll;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setTrustStore(final String trustStorePath) {
+ this.trustStorePath = trustStorePath;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setTrustStorePassword(final String trustStorePassword) {
+ this.trustStorePassword = trustStorePassword;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setKeyStore(final String keyStorePath) {
+ this.keyStorePath = keyStorePath;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setKeyStorePassword(final String keyStorePassword) {
+ this.keyStorePassword = keyStorePassword;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setBasicAuthCredentials(final String username, final String password) {
+ this.useBasicAuth = username != null || password != null;
+ this.basicAuthUsername = username;
+ this.basicAuthPassword = password;
+ return this;
+ }
+
+ @Override
+ public ClientOptions setExecuteQueryMaxResultRows(final int maxRows) {
+ this.executeQueryMaxResultRows = maxRows;
+ return this;
+ }
+
+ @Override
+ public String getHost() {
+ return host == null ? "" : host;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public boolean isUseTls() {
+ return useTls;
+ }
+
+ @Override
+ public boolean isUseClientAuth() {
+ return useClientAuth;
+ }
+
+ @Override
+ public boolean isVerifyHost() {
+ return verifyHost;
+ }
+
+ @Override
+ public boolean isTrustAll() {
+ return trustAll;
+ }
+
+ @Override
+ public boolean isUseBasicAuth() {
+ return useBasicAuth;
+ }
+
+ @Override
+ public String getTrustStore() {
+ return trustStorePath == null ? "" : trustStorePath;
+ }
+
+ @Override
+ public String getTrustStorePassword() {
+ return trustStorePassword == null ? "" : trustStorePassword;
+ }
+
+ @Override
+ public String getKeyStore() {
+ return keyStorePath == null ? "" : keyStorePath;
+ }
+
+ @Override
+ public String getKeyStorePassword() {
+ return keyStorePassword == null ? "" : keyStorePassword;
+ }
+
+ @Override
+ public String getBasicAuthUsername() {
+ return basicAuthUsername == null ? "" : basicAuthUsername;
+ }
+
+ @Override
+ public String getBasicAuthPassword() {
+ return basicAuthPassword == null ? "" : basicAuthPassword;
+ }
+
+ @Override
+ public int getExecuteQueryMaxResultRows() {
+ return executeQueryMaxResultRows;
+ }
+
+ @Override
+ public ClientOptions copy() {
+ return new ClientOptionsImpl(
+ host, port,
+ useTls, useClientAuth,
+ verifyHost, trustAll,
+ useBasicAuth,
+ trustStorePath, trustStorePassword,
+ keyStorePath, keyStorePassword,
+ basicAuthUsername, basicAuthPassword,
+ executeQueryMaxResultRows);
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java
new file mode 100644
index 000000000000..0d0ef6c3f096
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ExecuteQueryResponseHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import io.confluent.ksql.api.client.Row;
+import io.confluent.ksql.api.client.util.RowUtil;
+import io.confluent.ksql.api.server.protocol.QueryResponseMetadata;
+import io.confluent.ksql.rest.client.KsqlRestClientException;
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.parsetools.RecordParser;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ExecuteQueryResponseHandler extends QueryResponseHandler> {
+
+ private static final Logger log = LoggerFactory.getLogger(ExecuteQueryResponseHandler.class);
+
+ private final List rows;
+ private final int maxRows;
+ private List columnNames;
+ private List columnTypes;
+ private Map columnNameToIndex;
+
+ ExecuteQueryResponseHandler(
+ final Context context,
+ final RecordParser recordParser,
+ final CompletableFuture> cf,
+ final int maxRows) {
+ super(context, recordParser, cf);
+ this.maxRows = maxRows;
+ this.rows = new ArrayList<>();
+ }
+
+ @Override
+ protected void handleMetadata(final QueryResponseMetadata queryResponseMetadata) {
+ columnNames = queryResponseMetadata.columnNames;
+ columnTypes = queryResponseMetadata.columnTypes;
+ columnNameToIndex = RowUtil.valueToIndexMap(columnNames);
+ }
+
+ @Override
+ protected void handleRow(final Buffer buff) {
+ final JsonArray values = new JsonArray(buff);
+ if (rows.size() < maxRows) {
+ rows.add(new RowImpl(columnNames, columnTypes, values, columnNameToIndex));
+ } else {
+ throw new KsqlRestClientException(
+ "Reached max number of rows that may be returned by executeQuery(). "
+ + "Increase the limit via ClientOptions#setExecuteQueryMaxResultRows(). "
+ + "Current limit: " + maxRows);
+ }
+ }
+
+ @Override
+ protected void handleBodyEnd() {
+ if (!hasReadArguments) {
+ throw new IllegalStateException("Body ended before metadata received");
+ }
+
+ cf.complete(rows);
+ }
+
+ @Override
+ public void handleExceptionAfterFutureCompleted(final Throwable t) {
+ // This should not happen
+ log.error("Exceptions should not occur after the future has been completed", t);
+ }
+}
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/PollableSubscriber.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/PollableSubscriber.java
new file mode 100644
index 000000000000..dd4c0e48431a
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/PollableSubscriber.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import io.confluent.ksql.api.client.Row;
+import io.confluent.ksql.reactive.BaseSubscriber;
+import io.vertx.core.Context;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.reactivestreams.Subscription;
+
+public class PollableSubscriber extends BaseSubscriber {
+
+ private static final int REQUEST_BATCH_SIZE = 100;
+ // 100ms in ns
+ private static final long MAX_POLL_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
+
+ private final BlockingQueue queue = new LinkedBlockingQueue<>();
+ private final Consumer errorHandler;
+ private int tokens;
+ private volatile boolean closed;
+
+ public PollableSubscriber(final Context context, final Consumer errorHandler) {
+ super(context);
+
+ this.errorHandler = Objects.requireNonNull(errorHandler);
+ }
+
+ @Override
+ protected void afterSubscribe(final Subscription subscription) {
+ checkRequestTokens();
+ }
+
+ @Override
+ protected void handleValue(final Row row) {
+ queue.add(row);
+ }
+
+ @Override
+ protected void handleError(final Throwable t) {
+ errorHandler.accept(new Exception(t));
+ }
+
+ @Override
+ protected void handleComplete() {
+ close();
+ }
+
+ public synchronized Row poll(final long timeout, final TimeUnit timeUnit) {
+ if (closed) {
+ return null;
+ }
+ final long timeoutNs = timeUnit.toNanos(timeout);
+ final long end;
+ long remainingTime;
+ if (timeoutNs > 0) {
+ end = System.nanoTime() + timeoutNs;
+ remainingTime = timeoutNs;
+ } else {
+ end = Long.MAX_VALUE;
+ remainingTime = Long.MAX_VALUE;
+ }
+ do {
+ // Poll in smaller units so we can exit on close
+ final long pollTime = Math.min(remainingTime, MAX_POLL_NANOS);
+ try {
+ final Row row = queue.poll(pollTime, TimeUnit.NANOSECONDS);
+ if (row != null) {
+ tokens--;
+ checkRequestTokens();
+ return row;
+ }
+ } catch (InterruptedException e) {
+ return null;
+ }
+ remainingTime = end - System.nanoTime();
+ } while (!closed && remainingTime > 0);
+ return null;
+ }
+
+ public void close() {
+ closed = true;
+ }
+
+ synchronized boolean isClosed() {
+ return closed;
+ }
+
+ private void checkRequestTokens() {
+ if (tokens == 0) {
+ tokens += REQUEST_BATCH_SIZE;
+ makeRequest(REQUEST_BATCH_SIZE);
+ }
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResponseHandler.java
new file mode 100644
index 000000000000..179f6c0dce3c
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResponseHandler.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import io.confluent.ksql.api.client.util.JsonMapper;
+import io.confluent.ksql.api.server.protocol.QueryResponseMetadata;
+import io.confluent.ksql.util.VertxUtils;
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.parsetools.RecordParser;
+import java.util.concurrent.CompletableFuture;
+
+abstract class QueryResponseHandler {
+
+ protected final Context context;
+ protected final RecordParser recordParser;
+ protected final CompletableFuture cf;
+ protected boolean hasReadArguments;
+
+ QueryResponseHandler(final Context context, final RecordParser recordParser,
+ final CompletableFuture cf) {
+ this.context = context;
+ this.recordParser = recordParser;
+ this.cf = cf;
+ }
+
+ public void handleBodyBuffer(final Buffer buff) {
+ checkContext();
+ if (!hasReadArguments) {
+ handleArgs(buff);
+ } else {
+ handleRow(buff);
+ }
+ }
+
+ public void handleException(final Throwable t) {
+ checkContext();
+ if (!cf.isDone()) {
+ cf.completeExceptionally(t);
+ } else {
+ handleExceptionAfterFutureCompleted(t);
+ }
+ }
+
+ public void handleBodyEnd(final Void v) {
+ checkContext();
+ handleBodyEnd();
+ }
+
+ protected abstract void handleBodyEnd();
+
+ protected abstract void handleMetadata(QueryResponseMetadata queryResponseMetadata);
+
+ protected abstract void handleRow(Buffer buff);
+
+ protected abstract void handleExceptionAfterFutureCompleted(Throwable t);
+
+ protected void checkContext() {
+ VertxUtils.checkContext(context);
+ }
+
+ private void handleArgs(final Buffer buff) {
+ hasReadArguments = true;
+
+ final QueryResponseMetadata queryResponseMetadata;
+ try {
+ queryResponseMetadata = JsonMapper.get()
+ .readValue(buff.getBytes(), QueryResponseMetadata.class);
+ } catch (Exception e) {
+ cf.completeExceptionally(e);
+ return;
+ }
+
+ handleMetadata(queryResponseMetadata);
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResultImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResultImpl.java
new file mode 100644
index 000000000000..4bb6c7c5fd2e
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/QueryResultImpl.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import io.confluent.ksql.api.client.QueryResult;
+import io.confluent.ksql.api.client.Row;
+import io.confluent.ksql.reactive.BufferedPublisher;
+import io.vertx.core.Context;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.reactivestreams.Subscriber;
+
+class QueryResultImpl extends BufferedPublisher implements QueryResult {
+
+ private final String queryId;
+ private final List columnNames;
+ private final List columnTypes;
+ private final PollableSubscriber pollableSubscriber;
+ private volatile boolean polling;
+ private boolean subscribing;
+
+ QueryResultImpl(final Context context, final String queryId, final List columnNames,
+ final List columnTypes) {
+ super(context);
+ this.queryId = queryId;
+ this.columnNames = columnNames;
+ this.columnTypes = columnTypes;
+ this.pollableSubscriber = new PollableSubscriber(ctx, this::sendError);
+ }
+
+ @Override
+ public List columnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public List columnTypes() {
+ return columnTypes;
+ }
+
+ @Override
+ public String queryID() {
+ return queryId;
+ }
+
+ @Override
+ public void subscribe(final Subscriber super Row> subscriber) {
+ if (polling) {
+ throw new IllegalStateException("Cannot set subscriber if polling");
+ }
+ synchronized (this) {
+ subscribing = true;
+ super.subscribe(subscriber);
+ }
+ }
+
+ @Override
+ public Row poll() {
+ return poll(0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public synchronized Row poll(final long timeout, final TimeUnit timeUnit) {
+ if (subscribing) {
+ throw new IllegalStateException("Cannot poll if subscriber has been set");
+ }
+ if (!polling) {
+ subscribe(pollableSubscriber);
+ subscribing = false;
+ polling = true;
+ }
+ return pollableSubscriber.poll(timeout, timeUnit);
+ }
+
+ @Override
+ public boolean isComplete() {
+ return super.isComplete();
+ }
+
+ public void handleError(final Exception e) {
+ sendError(e);
+ }
+
+ @Override
+ public void close() {
+ pollableSubscriber.close();
+ }
+
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/RowImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/RowImpl.java
new file mode 100644
index 000000000000..81213cd94e5c
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/RowImpl.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import io.confluent.ksql.api.client.Row;
+import io.vertx.core.json.JsonArray;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class RowImpl implements Row {
+
+ private final List columnNames;
+ private final List columnTypes;
+ private final List values;
+ private final Map columnNameToIndex;
+
+ @SuppressWarnings("unchecked")
+ public RowImpl(
+ final List columnNames,
+ final List columnTypes,
+ final JsonArray values,
+ final Map columnNameToIndex) {
+ this.columnNames = Objects.requireNonNull(columnNames);
+ this.columnTypes = Objects.requireNonNull(columnTypes);
+ this.values = Objects.requireNonNull(values).getList();
+ this.columnNameToIndex = Objects.requireNonNull(columnNameToIndex);
+ }
+
+ @Override
+ public List columnNames() {
+ return columnNames;
+ }
+
+ @Override
+ public List columnTypes() {
+ return columnTypes;
+ }
+
+ @Override
+ public List values() {
+ return values;
+ }
+
+ @Override
+ public Object getObject(final int columnIndex) {
+ return values.get(columnIndex - 1);
+ }
+
+ @Override
+ public Object getObject(final String columnName) {
+ return getObject(indexFromName(columnName));
+ }
+
+ @Override
+ public String getString(final int columnIndex) {
+ return (String)getObject(columnIndex);
+ }
+
+ @Override
+ public String getString(final String columnName) {
+ return getString(indexFromName(columnName));
+ }
+
+ @Override
+ public Integer getInt(final int columnIndex) {
+ final Number number = (Number)getObject(columnIndex);
+ if (number == null) {
+ return null;
+ } else {
+ return number instanceof Integer ? (Integer)number : number.intValue();
+ }
+ }
+
+ @Override
+ public Integer getInt(final String columnName) {
+ return getInt(indexFromName(columnName));
+ }
+
+ @Override
+ public Long getLong(final int columnIndex) {
+ final Number number = (Number)getObject(columnIndex);
+ if (number == null) {
+ return null;
+ } else {
+ return number instanceof Long ? (Long)number : number.longValue();
+ }
+ }
+
+ @Override
+ public Long getLong(final String columnName) {
+ return getLong(indexFromName(columnName));
+ }
+
+ @Override
+ public Double getDouble(final int columnIndex) {
+ final Number number = (Number)getObject(columnIndex);
+ if (number == null) {
+ return null;
+ } else {
+ return number instanceof Double ? (Double)number : number.doubleValue();
+ }
+ }
+
+ @Override
+ public Double getDouble(final String columnName) {
+ return getDouble(indexFromName(columnName));
+ }
+
+ @Override
+ public Boolean getBoolean(final int columnIndex) {
+ return (Boolean)getObject(columnIndex);
+ }
+
+ @Override
+ public Boolean getBoolean(final String columnName) {
+ return getBoolean(indexFromName(columnName));
+ }
+
+ private int indexFromName(final String columnName) {
+ final Integer index = columnNameToIndex.get(columnName);
+ if (index == null) {
+ throw new IllegalArgumentException("No column exists with name: " + columnName);
+ }
+ return index;
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java
new file mode 100644
index 000000000000..4623f560ab36
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamQueryResponseHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import io.confluent.ksql.api.client.QueryResult;
+import io.confluent.ksql.api.client.Row;
+import io.confluent.ksql.api.client.util.RowUtil;
+import io.confluent.ksql.api.server.protocol.QueryResponseMetadata;
+import io.vertx.core.Context;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.parsetools.RecordParser;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class StreamQueryResponseHandler extends QueryResponseHandler {
+
+ private QueryResultImpl queryResult;
+ private Map columnNameToIndex;
+ private boolean paused;
+
+ StreamQueryResponseHandler(final Context context, final RecordParser recordParser,
+ final CompletableFuture cf) {
+ super(context, recordParser, cf);
+ }
+
+ @Override
+ protected void handleMetadata(final QueryResponseMetadata queryResponseMetadata) {
+ this.queryResult = new QueryResultImpl(context, queryResponseMetadata.queryId,
+ Collections.unmodifiableList(queryResponseMetadata.columnNames),
+ Collections.unmodifiableList(queryResponseMetadata.columnTypes));
+ this.columnNameToIndex = RowUtil.valueToIndexMap(queryResponseMetadata.columnNames);
+ cf.complete(queryResult);
+ }
+
+ @Override
+ protected void handleRow(final Buffer buff) {
+ if (queryResult == null) {
+ throw new IllegalStateException("handleRow called before metadata processed");
+ }
+
+ final JsonArray values = new JsonArray(buff);
+ final Row row = new RowImpl(
+ queryResult.columnNames(),
+ queryResult.columnTypes(),
+ values,
+ columnNameToIndex
+ );
+ final boolean full = queryResult.accept(row);
+ if (full && !paused) {
+ recordParser.pause();
+ queryResult.drainHandler(this::publisherReceptive);
+ paused = true;
+ }
+ }
+
+ @Override
+ protected void handleBodyEnd() {
+ }
+
+ @Override
+ public void handleExceptionAfterFutureCompleted(final Throwable t) {
+ queryResult.handleError(new Exception(t));
+ }
+
+ private void publisherReceptive() {
+ checkContext();
+
+ paused = false;
+ recordParser.resume();
+ }
+}
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/util/JsonMapper.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/util/JsonMapper.java
new file mode 100644
index 000000000000..b98621d7098a
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/util/JsonMapper.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.guava.GuavaModule;
+import com.google.common.collect.ImmutableList;
+import io.vertx.core.json.jackson.DatabindCodec;
+
+public final class JsonMapper {
+
+ private static final ObjectMapper MAPPER = DatabindCodec.mapper();
+
+ static {
+ MAPPER.registerModule(new GuavaModule());
+ MAPPER.registerSubtypes(ImmutableList.class);
+ }
+
+ private JsonMapper() {
+ }
+
+ public static ObjectMapper get() {
+ return MAPPER;
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/util/RowUtil.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/util/RowUtil.java
new file mode 100644
index 000000000000..ee963c7701bf
--- /dev/null
+++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/util/RowUtil.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.util;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public final class RowUtil {
+
+ private RowUtil() {
+ }
+
+ // Given list of values, return map of value to index in list. Returned indices are 1-indexed.
+ public static Map valueToIndexMap(final List values) {
+ final Map valueToIndex = new HashMap<>();
+ for (int i = 0; i < values.size(); i++) {
+ valueToIndex.put(values.get(i), i + 1);
+ }
+ return valueToIndex;
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java
new file mode 100644
index 000000000000..8aa146b0f2ae
--- /dev/null
+++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientBasicAuthTest.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client;
+
+import io.confluent.ksql.rest.server.KsqlRestConfig;
+import io.confluent.ksql.test.util.TestBasicJaasConfig;
+import java.util.Map;
+import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientBasicAuthTest extends ClientTest {
+
+ protected static final Logger log = LoggerFactory.getLogger(ClientBasicAuthTest.class);
+
+ private static final String PROPS_JAAS_REALM = "KsqlServer-Props";
+ private static final String KSQL_RESOURCE = "ksql-user";
+ private static final String USER_WITH_ACCESS = "harry";
+ private static final String USER_WITH_ACCESS_PWD = "changeme";
+
+ @ClassRule
+ public static final TestBasicJaasConfig JAAS_CONFIG = TestBasicJaasConfig
+ .builder(PROPS_JAAS_REALM)
+ .addUser(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD, KSQL_RESOURCE)
+ .build();
+
+ @Override
+ protected KsqlRestConfig createServerConfig() {
+ KsqlRestConfig config = super.createServerConfig();
+ Map origs = config.originals();
+ origs.put(
+ KsqlRestConfig.AUTHENTICATION_METHOD_CONFIG,
+ KsqlRestConfig.AUTHENTICATION_METHOD_BASIC);
+ origs.put(
+ KsqlRestConfig.AUTHENTICATION_REALM_CONFIG,
+ PROPS_JAAS_REALM
+ );
+ origs.put(
+ KsqlRestConfig.AUTHENTICATION_ROLES_CONFIG,
+ KSQL_RESOURCE
+ );
+ return new KsqlRestConfig(origs);
+ }
+
+ @Override
+ protected ClientOptions createJavaClientOptions() {
+ return ClientOptions.create()
+ .setHost("localhost")
+ .setPort(server.getListeners().get(0).getPort())
+ .setUseTls(false)
+ .setBasicAuthCredentials(USER_WITH_ACCESS, USER_WITH_ACCESS_PWD);
+ }
+
+}
diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java
new file mode 100644
index 000000000000..add22a3a0c57
--- /dev/null
+++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java
@@ -0,0 +1,318 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client;
+
+import static io.confluent.ksql.test.util.AssertEventually.assertThatEventually;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThrows;
+
+import io.confluent.ksql.api.BaseApiTest;
+import io.confluent.ksql.api.server.PushQueryId;
+import io.confluent.ksql.parser.exception.ParseFailedException;
+import io.vertx.ext.web.client.WebClient;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientTest extends BaseApiTest {
+
+ protected static final Logger log = LoggerFactory.getLogger(ClientTest.class);
+
+ @SuppressWarnings("unchecked")
+ protected static final List DEFAULT_COLUMN_NAMES = BaseApiTest.DEFAULT_COLUMN_NAMES.getList();
+ @SuppressWarnings("unchecked")
+ protected static final List DEFAULT_COLUMN_TYPES = BaseApiTest.DEFAULT_COLUMN_TYPES.getList();
+ protected static final Map DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES =
+ BaseApiTest.DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES.getMap();
+ protected static final String DEFAULT_PUSH_QUERY_WITH_LIMIT = "select * from foo emit changes limit 10;";
+
+ protected Client javaClient;
+
+ @Override
+ public void setUp() {
+ super.setUp();
+
+ this.javaClient = createJavaClient();
+ }
+
+ @Override
+ protected WebClient createClient() {
+ // Ensure these tests use Java client rather than WebClient (as in BaseApiTest)
+ return null;
+ }
+
+ @Override
+ protected void stopClient() {
+ if (javaClient != null) {
+ try {
+ javaClient.close();
+ } catch (Exception e) {
+ log.error("Failed to close client", e);
+ }
+ }
+ }
+
+ @Test
+ public void shouldStreamPushQueryAsync() throws Exception {
+ // When
+ final QueryResult queryResult =
+ javaClient.streamQuery(DEFAULT_PUSH_QUERY, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
+
+ // Then
+ assertThat(queryResult.columnNames(), is(DEFAULT_COLUMN_NAMES));
+ assertThat(queryResult.columnTypes(), is(DEFAULT_COLUMN_TYPES));
+
+ shouldDeliver(queryResult, DEFAULT_ROWS.size());
+
+ String queryId = queryResult.queryID();
+ assertThat(queryId, is(notNullValue()));
+ verifyPushQueryServerState(DEFAULT_PUSH_QUERY, queryId);
+ }
+
+ @Test
+ public void shouldStreamPushQuerySync() throws Exception {
+ // When
+ final QueryResult queryResult =
+ javaClient.streamQuery(DEFAULT_PUSH_QUERY, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
+
+ // Then
+ assertThat(queryResult.columnNames(), is(DEFAULT_COLUMN_NAMES));
+ assertThat(queryResult.columnTypes(), is(DEFAULT_COLUMN_TYPES));
+
+ for (int i = 0; i < DEFAULT_ROWS.size(); i++) {
+ final Row row = queryResult.poll();
+ assertThat(row.values(), equalTo(rowWithIndex(i).getList()));
+ assertThat(row.columnNames(), equalTo(DEFAULT_COLUMN_NAMES));
+ assertThat(row.columnTypes(), equalTo(DEFAULT_COLUMN_TYPES));
+ }
+
+ String queryId = queryResult.queryID();
+ assertThat(queryId, is(notNullValue()));
+ verifyPushQueryServerState(DEFAULT_PUSH_QUERY, queryId);
+ }
+
+ @Test
+ public void shouldStreamPullQueryAsync() throws Exception {
+ // When
+ final QueryResult queryResult =
+ javaClient.streamQuery(DEFAULT_PULL_QUERY).get();
+
+ // Then
+ assertThat(queryResult.columnNames(), is(DEFAULT_COLUMN_NAMES));
+ assertThat(queryResult.columnTypes(), is(DEFAULT_COLUMN_TYPES));
+
+ shouldDeliver(queryResult, DEFAULT_ROWS.size());
+
+ verifyPullQueryServerState();
+ }
+
+ @Test
+ public void shouldStreamPullQuerySync() throws Exception {
+ // When
+ final QueryResult queryResult =
+ javaClient.streamQuery(DEFAULT_PULL_QUERY).get();
+
+ // Then
+ assertThat(queryResult.columnNames(), is(DEFAULT_COLUMN_NAMES));
+ assertThat(queryResult.columnTypes(), is(DEFAULT_COLUMN_TYPES));
+
+ for (int i = 0; i < DEFAULT_ROWS.size(); i++) {
+ final Row row = queryResult.poll();
+ assertThat(row.values(), equalTo(rowWithIndex(i).getList()));
+ assertThat(row.columnNames(), equalTo(DEFAULT_COLUMN_NAMES));
+ assertThat(row.columnTypes(), equalTo(DEFAULT_COLUMN_TYPES));
+ }
+
+ verifyPullQueryServerState();
+ }
+
+ @Test
+ public void shouldHandleErrorResponseFromStreamQuery() {
+ // Given
+ ParseFailedException pfe = new ParseFailedException("invalid query blah");
+ testEndpoints.setCreateQueryPublisherException(pfe);
+
+ // When
+ final Exception e = assertThrows(
+ ExecutionException.class, // thrown from .get() when the future completes exceptionally
+ () -> javaClient.streamQuery("bad query", DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get()
+ );
+
+ // Then
+ assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
+ assertThat(e.getCause().getMessage(), containsString("invalid query blah"));
+ }
+
+ @Test
+ public void shouldExecutePullQuery() throws Exception {
+ // When
+ final List rows = javaClient.executeQuery(DEFAULT_PULL_QUERY).get();
+
+ // Then
+ assertThat(rows, hasSize(DEFAULT_ROWS.size()));
+ for (int i = 0; i < DEFAULT_ROWS.size(); i++) {
+ assertThat(rows.get(i).values(), equalTo(rowWithIndex(i).getList()));
+ assertThat(rows.get(i).columnNames(), equalTo(DEFAULT_COLUMN_NAMES));
+ assertThat(rows.get(i).columnTypes(), equalTo(DEFAULT_COLUMN_TYPES));
+ }
+
+ verifyPullQueryServerState();
+ }
+
+ @Test
+ public void shouldExecutePushQuery() throws Exception {
+ // When
+ final List rows =
+ javaClient.executeQuery(DEFAULT_PUSH_QUERY_WITH_LIMIT, DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES).get();
+
+ // Then
+ assertThat(rows, hasSize(DEFAULT_ROWS.size()));
+ for (int i = 0; i < DEFAULT_ROWS.size(); i++) {
+ assertThat(rows.get(i).values(), equalTo(rowWithIndex(i).getList()));
+ assertThat(rows.get(i).columnNames(), equalTo(DEFAULT_COLUMN_NAMES));
+ assertThat(rows.get(i).columnTypes(), equalTo(DEFAULT_COLUMN_TYPES));
+ }
+
+ verifyPushQueryServerState(DEFAULT_PUSH_QUERY_WITH_LIMIT);
+ }
+
+ @Test
+ public void shouldHandleErrorResponseFromExecuteQuery() {
+ // Given
+ ParseFailedException pfe = new ParseFailedException("invalid query blah");
+ testEndpoints.setCreateQueryPublisherException(pfe);
+
+ // When
+ final Exception e = assertThrows(
+ ExecutionException.class, // thrown from .get() when the future completes exceptionally
+ () -> javaClient.executeQuery("bad query").get()
+ );
+
+ // Then
+ assertThat(e.getCause().getMessage(), containsString("Received 400 response from server"));
+ assertThat(e.getCause().getMessage(), containsString("invalid query blah"));
+ }
+
+ protected Client createJavaClient() {
+ return Client.create(createJavaClientOptions(), vertx);
+ }
+
+ protected ClientOptions createJavaClientOptions() {
+ return ClientOptions.create()
+ .setHost("localhost")
+ .setPort(server.getListeners().get(0).getPort())
+ .setUseTls(false);
+ }
+
+ private void verifyPushQueryServerState(final String sql) {
+ verifyPushQueryServerState(sql, null);
+ }
+
+ private void verifyPushQueryServerState(final String sql, final String queryId) {
+ assertThat(testEndpoints.getLastSql(), is(sql));
+ assertThat(testEndpoints.getLastProperties(), is(BaseApiTest.DEFAULT_PUSH_QUERY_REQUEST_PROPERTIES));
+
+ if (queryId != null) {
+ assertThat(server.getQueryIDs(), hasSize(1));
+ assertThat(server.getQueryIDs().contains(new PushQueryId(queryId)), is(true));
+ }
+ }
+
+ private void verifyPullQueryServerState() {
+ assertThat(testEndpoints.getLastSql(), is(DEFAULT_PULL_QUERY));
+ assertThat(testEndpoints.getLastProperties().getMap(), is(Collections.emptyMap()));
+
+ assertThat(server.getQueryIDs(), hasSize(0));
+ }
+
+ private void shouldDeliver(final Publisher publisher, final int numRows) {
+ TestSubscriber subscriber = new TestSubscriber() {
+ @Override
+ public synchronized void onSubscribe(final Subscription sub) {
+ super.onSubscribe(sub);
+ sub.request(numRows);
+ }
+ };
+ publisher.subscribe(subscriber);
+ assertThatEventually(subscriber::getValues, hasSize(numRows));
+ for (int i = 0; i < numRows; i++) {
+ assertThat(subscriber.getValues().get(i).values(), equalTo(rowWithIndex(i).getList()));
+ }
+ assertThat(subscriber.isCompleted(), equalTo(false));
+ assertThat(subscriber.getError(), is(nullValue()));
+ }
+
+ private static class TestSubscriber implements Subscriber {
+
+ private Subscription sub;
+ private boolean completed;
+ private Throwable error;
+ private final List values = new ArrayList<>();
+
+ public TestSubscriber() {
+ }
+
+ @Override
+ public synchronized void onSubscribe(final Subscription sub) {
+ this.sub = sub;
+ }
+
+ @Override
+ public synchronized void onNext(final T value) {
+ values.add(value);
+ }
+
+ @Override
+ public synchronized void onError(final Throwable t) {
+ this.error = t;
+ }
+
+ @Override
+ public synchronized void onComplete() {
+ this.completed = true;
+ }
+
+ public synchronized boolean isCompleted() {
+ return completed;
+ }
+
+ public synchronized Throwable getError() {
+ return error;
+ }
+
+ public synchronized List getValues() {
+ return values;
+ }
+
+ public synchronized Subscription getSub() {
+ return sub;
+ }
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java
new file mode 100644
index 000000000000..120f86c2e88d
--- /dev/null
+++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTlsTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client;
+
+import io.confluent.ksql.rest.server.KsqlRestConfig;
+import io.confluent.ksql.test.util.secure.ServerKeyStore;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClientTlsTest extends ClientTest {
+
+ protected static final Logger log = LoggerFactory.getLogger(ClientTlsTest.class);
+
+ @Override
+ protected KsqlRestConfig createServerConfig() {
+
+ String keyStorePath = ServerKeyStore.keyStoreProps()
+ .get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG);
+ String keyStorePassword = ServerKeyStore.keyStoreProps()
+ .get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG);
+ String trustStorePath = ServerKeyStore.keyStoreProps()
+ .get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG);
+ String trustStorePassword = ServerKeyStore.keyStoreProps()
+ .get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG);
+
+ Map config = new HashMap<>();
+ config.put(KsqlRestConfig.LISTENERS_CONFIG, "https://localhost:0");
+ config.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStorePath);
+ config.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, keyStorePassword);
+ config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStorePath);
+ config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
+ config.put(KsqlRestConfig.VERTICLE_INSTANCES, 4);
+
+ return new KsqlRestConfig(config);
+ }
+
+ @Override
+ protected ClientOptions createJavaClientOptions() {
+ return ClientOptions.create()
+ .setHost("localhost")
+ .setPort(server.getListeners().get(0).getPort())
+ .setUseTls(true)
+ .setVerifyHost(false)
+ .setTrustAll(true);
+ }
+
+}
diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/QueryResultImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/QueryResultImplTest.java
new file mode 100644
index 000000000000..683111649eec
--- /dev/null
+++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/QueryResultImplTest.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import static org.junit.Assert.assertThrows;
+
+import io.confluent.ksql.api.client.Row;
+import io.vertx.core.Context;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.reactivestreams.Subscriber;
+
+@RunWith(MockitoJUnitRunner.class)
+public class QueryResultImplTest {
+
+ @Mock
+ private Context context;
+ @Mock
+ private Subscriber subscriber;
+
+ private QueryResultImpl queryResult;
+
+ @Before
+ public void setUp() {
+ queryResult = new QueryResultImpl(context, "queryId", Collections.emptyList(), Collections.emptyList());
+ }
+
+ @Test
+ public void shouldNotSubscribeIfPolling() {
+ // Given
+ queryResult.poll(1, TimeUnit.NANOSECONDS);
+
+ // When / Then
+ assertThrows(IllegalStateException.class, () -> queryResult.subscribe(subscriber));
+ }
+
+ @Test
+ public void shouldNotPollIfSubscribed() {
+ // Given
+ queryResult.subscribe(subscriber);
+
+ // When / Then
+ assertThrows(IllegalStateException.class, () -> queryResult.poll());
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java
new file mode 100644
index 000000000000..171b8278ac61
--- /dev/null
+++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/RowImplTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2020 Confluent Inc.
+ *
+ * Licensed under the Confluent Community License (the "License"); you may not use
+ * this file except in compliance with the License. You may obtain a copy of the
+ * License at
+ *
+ * http://www.confluent.io/confluent-community-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package io.confluent.ksql.api.client.impl;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import com.google.common.collect.ImmutableList;
+import io.confluent.ksql.api.client.util.RowUtil;
+import io.vertx.core.json.JsonArray;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RowImplTest {
+
+ private static final List COLUMN_NAMES = ImmutableList.of("f_str", "f_int", "f_long", "f_double", "f_bool");
+ private static final List COLUMN_TYPES = ImmutableList.of("STRING", "INTEGER", "BIGINT", "DOUBLE", "BOOLEAN");
+ private static final Map COLUMN_NAME_TO_INDEX = RowUtil.valueToIndexMap(COLUMN_NAMES);
+ private static final JsonArray VALUES = new JsonArray(ImmutableList.of("foo", 2, 1234L, 34.43, false));
+
+ private RowImpl row;
+
+ @Before
+ public void setUp() {
+ row = new RowImpl(COLUMN_NAMES, COLUMN_TYPES, VALUES, COLUMN_NAME_TO_INDEX);
+ }
+
+ @Test
+ public void shouldOneIndexColumnNames() {
+ assertThat(row.getObject(1), is("foo"));
+ assertThat(row.getObject(2), is(2));
+ assertThat(row.getObject(3), is(1234L));
+ assertThat(row.getObject(4), is(34.43));
+ assertThat(row.getObject(5), is(false));
+ }
+
+ @Test
+ public void shouldGetString() {
+ assertThat(row.getString("f_str"), is("foo"));
+ }
+
+ @Test
+ public void shouldGetInt() {
+ assertThat(row.getInt("f_int"), is(2));
+ }
+
+ @Test
+ public void shouldGetLong() {
+ assertThat(row.getLong("f_long"), is(1234L));
+ assertThat(row.getLong("f_int"), is(2L));
+ }
+
+ @Test
+ public void shouldGetDouble() {
+ assertThat(row.getDouble("f_double"), is(34.43));
+ }
+
+ @Test
+ public void shouldGetBoolean() {
+ assertThat(row.getBoolean("f_bool"), is(false));
+ }
+}
\ No newline at end of file
diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/reactive/BufferedPublisher.java b/ksqldb-common/src/main/java/io/confluent/ksql/reactive/BufferedPublisher.java
index da283b93a444..afe089271f83 100644
--- a/ksqldb-common/src/main/java/io/confluent/ksql/reactive/BufferedPublisher.java
+++ b/ksqldb-common/src/main/java/io/confluent/ksql/reactive/BufferedPublisher.java
@@ -39,8 +39,8 @@ public class BufferedPublisher extends BasePublisher {
private final Queue buffer = new ArrayDeque<>();
private final int bufferMaxSize;
private Runnable drainHandler;
+ private boolean shouldSendComplete;
private boolean complete;
- private boolean completing;
/**
* Construct a BufferedPublisher
@@ -63,6 +63,7 @@ public BufferedPublisher(final Context ctx, final Collection initialBuffer) {
this(ctx);
this.buffer.addAll(initialBuffer);
complete = true;
+ shouldSendComplete = true;
}
/**
@@ -89,7 +90,7 @@ public BufferedPublisher(final Context ctx, final int bufferMaxSize) {
*/
public boolean accept(final T t) {
checkContext();
- if (completing) {
+ if (complete) {
throw new IllegalStateException("Cannot call accept after complete is called");
}
if (!isCancelled()) {
@@ -124,17 +125,21 @@ public void drainHandler(final Runnable handler) {
*/
public void complete() {
checkContext();
- if (isCancelled() || completing) {
+ if (isCancelled() || complete) {
return;
}
- completing = true;
+ complete = true;
if (buffer.isEmpty() && getSubscriber() != null) {
sendComplete();
} else {
- complete = true;
+ shouldSendComplete = true;
}
}
+ protected boolean isComplete() {
+ return complete;
+ }
+
protected void maybeSend() {
int numSent = 0;
while (!isCancelled() && getDemand() > 0 && !buffer.isEmpty()) {
@@ -150,8 +155,9 @@ protected void maybeSend() {
}
if (buffer.isEmpty() && !isCancelled()) {
- if (complete) {
+ if (shouldSendComplete) {
sendComplete();
+ shouldSendComplete = false;
} else if (getDemand() > 0 && drainHandler != null) {
final Runnable handler = drainHandler;
ctx.runOnContext(v -> handler.run());
diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/protocol/QueryResponseMetadata.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/protocol/QueryResponseMetadata.java
index 504aec7b14cd..2c781e61d46b 100644
--- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/protocol/QueryResponseMetadata.java
+++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/protocol/QueryResponseMetadata.java
@@ -15,6 +15,7 @@
package io.confluent.ksql.api.server.protocol;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.Immutable;
import java.util.List;
@@ -30,8 +31,10 @@ public class QueryResponseMetadata extends SerializableObject {
public final ImmutableList columnNames;
public final ImmutableList columnTypes;
- public QueryResponseMetadata(final String queryId, final List columnNames,
- final List columnTypes) {
+ public QueryResponseMetadata(
+ final @JsonProperty(value = "queryId") String queryId,
+ final @JsonProperty(value = "columnNames") List columnNames,
+ final @JsonProperty(value = "columnTypes") List columnTypes) {
this.queryId = queryId;
this.columnNames = ImmutableList.copyOf(Objects.requireNonNull(columnNames));
this.columnTypes = ImmutableList.copyOf(Objects.requireNonNull(columnTypes));
diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java
index a80895e57836..5cf1bcd1e312 100644
--- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java
+++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/BaseApiTest.java
@@ -213,6 +213,19 @@ protected static void validateErrorCommon(final int errorCode, final String mess
assertThat(error.getString("message"), is(message));
}
+ protected static JsonArray rowWithIndex(final int index) {
+ return new JsonArray().add("foo" + index).add(index).add(index % 2 == 0);
+ }
+
+ private static List generateRows() {
+ List rows = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ JsonArray row = rowWithIndex(i);
+ rows.add(row);
+ }
+ return rows;
+ }
+
@SuppressWarnings("unchecked")
protected void setDefaultRowGenerator() {
List rows = new ArrayList<>();
@@ -225,14 +238,4 @@ protected void setDefaultRowGenerator() {
DEFAULT_COLUMN_TYPES.getList(),
rows));
}
-
- private static List generateRows() {
- List rows = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- JsonArray row = new JsonArray().add("foo" + i).add(i).add(i % 2 == 0);
- rows.add(row);
- }
- return rows;
- }
-
}
diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java
index 0a2bd6adf97c..7bf263a2d6f1 100644
--- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java
+++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestEndpoints.java
@@ -66,11 +66,13 @@ public synchronized CompletableFuture createQueryPublisher(final
this.lastSql = sql;
this.lastProperties = properties;
this.lastApiSecurityContext = apiSecurityContext;
- boolean push = sql.toLowerCase().contains("emit changes");
- TestQueryPublisher queryPublisher = new TestQueryPublisher(context,
+ final boolean push = sql.toLowerCase().contains("emit changes");
+ final int limit = extractLimit(sql);
+ final TestQueryPublisher queryPublisher = new TestQueryPublisher(context,
rowGeneratorFactory.get(),
rowsBeforePublisherError,
- push);
+ push,
+ limit);
queryPublishers.add(queryPublisher);
completableFuture.complete(queryPublisher);
}
@@ -223,5 +225,16 @@ public synchronized void setRowsBeforePublisherError(final int rowsBeforePublish
public synchronized void setCreateQueryPublisherException(final RuntimeException exception) {
this.createQueryPublisherException = exception;
}
+
+ private static int extractLimit(final String sql) {
+ final int ind = sql.toLowerCase().indexOf("limit");
+ if (ind == -1) {
+ return -1;
+ }
+
+ // extract the string between "limit" and the following semicolon
+ final String limit = sql.substring(ind + 5, ind + sql.substring(ind).indexOf(";")).trim();
+ return Integer.parseInt(limit);
+ }
}
diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestQueryPublisher.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestQueryPublisher.java
index ad224d7454d0..e257e710de2c 100644
--- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestQueryPublisher.java
+++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/TestQueryPublisher.java
@@ -27,14 +27,16 @@ public class TestQueryPublisher extends BasePublisher implements Que
private final RowGenerator rowGenerator;
private final int rowsBeforePublisherError;
private final boolean push;
+ private final int limit;
private int rowsSent;
public TestQueryPublisher(final Context ctx, final RowGenerator rowGenerator,
- final int rowsBeforePublisherError, final boolean push) {
+ final int rowsBeforePublisherError, final boolean push, final int limit) {
super(ctx);
this.rowGenerator = rowGenerator;
this.rowsBeforePublisherError = rowsBeforePublisherError;
this.push = push;
+ this.limit = limit;
}
synchronized boolean hasSubscriber() {
@@ -64,6 +66,9 @@ private void doSend(long amount) {
} else {
rowsSent++;
getSubscriber().onNext(row);
+ if (rowsSent == limit) {
+ sendComplete();
+ }
}
}
}
diff --git a/pom.xml b/pom.xml
index 84bea610f9f6..e1b01e62d9d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@
ksqldb-rocksdb-config-setter
ksqldb-docker
ksqldb-api-reactive-streams-tck
+ ksqldb-api-client