From 23955f6b4c07c8fb166ee788b8648c303f38b686 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Mon, 8 Jun 2020 15:55:39 -0700 Subject: [PATCH 01/12] feat(client): support streaming inserts (wip) --- .../ksql/api/client/AcksPublisher.java | 48 +++++++++ .../io/confluent/ksql/api/client/Client.java | 4 + .../confluent/ksql/api/client/InsertAck.java | 22 ++++ .../api/client/impl/AcksPublisherImpl.java | 42 ++++++++ .../ksql/api/client/impl/ClientImpl.java | 52 +++++++-- .../ksql/api/client/impl/InsertAckImpl.java | 32 ++++++ ...er.java => InsertIntoResponseHandler.java} | 4 +- .../impl/StreamInsertsResponseHandler.java | 101 ++++++++++++++++++ .../client/impl/StreamInsertsSubscriber.java | 85 +++++++++++++++ .../confluent/ksql/api/client/ClientTest.java | 15 +++ 10 files changed, 397 insertions(+), 8 deletions(-) create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AcksPublisherImpl.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertAckImpl.java rename ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/{InsertsResponseHandler.java => InsertIntoResponseHandler.java} (94%) create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.java create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java new file mode 100644 index 000000000000..75194282fbf7 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java @@ -0,0 +1,48 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client; + +import org.reactivestreams.Publisher; + +/** + * TODO + */ +public interface AcksPublisher extends Publisher { + + /** + * Returns whether the {@code AcksPublisher} is complete. + * + *

An {@code AcksPublisher} is complete if the HTTP connection associated with this + * {@code insertsStream()} request has been ended gracefully. Once complete, the + * {@code AcksPublisher} will continue to deliver any remaining rows, then call + * {@code onComplete()} on the subscriber, if present. + * + * @return whether the {@code AcksPublisher} is complete. + */ + boolean isComplete(); + + /** + * Returns whether the {@code AcksPublisher} is failed. + * + *

An {@code AcksPublisher} is failed if an error is received from the server. Once + * failed, {@code onError()} is called on the subscriber, if present, and new calls to + * {@code subscribe()} will be rejected. + * + * @return whether the {@code AcksPublisher} is failed. + */ + boolean isFailed(); + +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java index 993b3e9355ec..6a36ff8a462d 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java @@ -19,6 +19,7 @@ import io.vertx.core.Vertx; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Publisher; /** * A client that connects to a specific ksqlDB server. @@ -90,6 +91,9 @@ public interface Client { */ CompletableFuture insertInto(String streamName, KsqlObject row); + CompletableFuture + streamInserts(String streamName, Publisher insertsPublisher); + /** * Terminates a push query with the specified query ID. * diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java new file mode 100644 index 000000000000..76af57980c8b --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client; + +public interface InsertAck { + + long seqNum(); + +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AcksPublisherImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AcksPublisherImpl.java new file mode 100644 index 000000000000..d6a1824b7715 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/AcksPublisherImpl.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.AcksPublisher; +import io.confluent.ksql.api.client.InsertAck; +import io.confluent.ksql.reactive.BufferedPublisher; +import io.vertx.core.Context; + +public class AcksPublisherImpl extends BufferedPublisher implements AcksPublisher { + + public AcksPublisherImpl(final Context context) { + super(context); + } + + @Override + public boolean isComplete() { + return super.isComplete(); + } + + @Override + public boolean isFailed() { + return super.isFailed(); + } + + public void handleError(final Exception e) { + sendError(e); + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index a839fe5fac8c..28244420d27b 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -18,6 +18,7 @@ import static io.netty.handler.codec.http.HttpHeaderNames.AUTHORIZATION; import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import io.confluent.ksql.api.client.AcksPublisher; import io.confluent.ksql.api.client.BatchedQueryResult; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; @@ -43,6 +44,7 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Publisher; public class ClientImpl implements Client { @@ -91,7 +93,8 @@ public CompletableFuture streamQuery( final Map properties ) { final CompletableFuture cf = new CompletableFuture<>(); - makeQueryRequest(sql, properties, cf, StreamQueryResponseHandler::new); + makeQueryRequest(sql, properties, cf, + (ctx, rp, fut, req) -> new StreamQueryResponseHandler(ctx, rp, fut)); return cf; } @@ -110,7 +113,7 @@ public BatchedQueryResult executeQuery( sql, properties, result, - (context, recordParser, cf) -> new ExecuteQueryResponseHandler( + (context, recordParser, cf, request) -> new ExecuteQueryResponseHandler( context, recordParser, cf, clientOptions.getExecuteQueryMaxResultRows()) ); return result; @@ -129,7 +132,31 @@ public CompletableFuture insertInto(final String streamName, final KsqlObj "/inserts-stream", requestBody, cf, - response -> handleResponse(response, cf, InsertsResponseHandler::new) + response -> handleResponse(response, cf, + (ctx, rp, fut, req) -> new InsertIntoResponseHandler(ctx, rp, fut)) + ); + + return cf; + } + + @Override + public CompletableFuture streamInserts( + final String streamName, + final Publisher insertsPublisher) { + final CompletableFuture cf = new CompletableFuture<>(); + + final Buffer requestBody = Buffer.buffer(); + final JsonObject params = new JsonObject().put("target", streamName); + requestBody.appendBuffer(params.toBuffer()).appendString("\n"); // TODO: is this extra newline needed? + + makeRequest( + "/inserts-stream", + requestBody, + cf, + response -> handleResponse(response, cf, + (ctx, rp, fut, req) -> + new StreamInsertsResponseHandler(ctx, rp, fut, req, insertsPublisher)), + false ); return cf; @@ -159,7 +186,7 @@ public void close() { @FunctionalInterface private interface ResponseHandlerSupplier> { - ResponseHandler get(Context ctx, RecordParser recordParser, T cf); + ResponseHandler get(Context ctx, RecordParser recordParser, T cf, HttpClientRequest request); } private > void makeQueryRequest( @@ -191,6 +218,15 @@ private > void makeRequest( final Buffer requestBody, final T cf, final Handler responseHandler) { + makeRequest(path, requestBody, cf, responseHandler, true); + } + + private > void makeRequest( + final String path, + final Buffer requestBody, + final T cf, + final Handler responseHandler, + final boolean endRequest) { HttpClientRequest request = httpClient.request(HttpMethod.POST, serverSocketAddress, clientOptions.getPort(), clientOptions.getHost(), path, @@ -199,7 +235,11 @@ private > void makeRequest( if (clientOptions.isUseBasicAuth()) { request = configureBasicAuth(request); } - request.end(requestBody); + if (endRequest) { + request.end(requestBody); + } else { + request.write(requestBody); + } } private HttpClientRequest configureBasicAuth(final HttpClientRequest request) { @@ -213,7 +253,7 @@ private static > void handleResponse( if (response.statusCode() == OK.code()) { final RecordParser recordParser = RecordParser.newDelimited("\n", response); final ResponseHandler responseHandler = - responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf); + responseHandlerSupplier.get(Vertx.currentContext(), recordParser, cf, response.request()); recordParser.handler(responseHandler::handleBodyBuffer); recordParser.endHandler(responseHandler::handleBodyEnd); diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertAckImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertAckImpl.java new file mode 100644 index 000000000000..77368d99c1c6 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertAckImpl.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.InsertAck; + +public class InsertAckImpl implements InsertAck { + + private final long num; + + InsertAckImpl(final long num) { + this.num = num; + } + + @Override + public long seqNum() { + return num; + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertsResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertIntoResponseHandler.java similarity index 94% rename from ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertsResponseHandler.java rename to ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertIntoResponseHandler.java index f4711445bee7..9f3b7002748a 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertsResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertIntoResponseHandler.java @@ -22,11 +22,11 @@ import io.vertx.core.parsetools.RecordParser; import java.util.concurrent.CompletableFuture; -public class InsertsResponseHandler extends ResponseHandler> { +public class InsertIntoResponseHandler extends ResponseHandler> { private int numAcks; - InsertsResponseHandler( + InsertIntoResponseHandler( final Context context, final RecordParser recordParser, final CompletableFuture cf) { super(context, recordParser, cf); } diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.java new file mode 100644 index 000000000000..063158f64782 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.AcksPublisher; +import io.confluent.ksql.api.client.InsertAck; +import io.confluent.ksql.api.client.KsqlClientException; +import io.confluent.ksql.api.client.KsqlObject; +import io.vertx.core.Context; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.json.JsonObject; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import io.vertx.core.parsetools.RecordParser; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import org.reactivestreams.Publisher; + +public class StreamInsertsResponseHandler + extends ResponseHandler> { + + private static final Logger log = LoggerFactory.getLogger(StreamInsertsResponseHandler.class); + + private final AcksPublisherImpl acksPublisher; + private boolean paused; + + StreamInsertsResponseHandler( + final Context context, + final RecordParser recordParser, + final CompletableFuture cf, + final HttpClientRequest request, + final Publisher insertsPublisher + ) { + super(context, recordParser, cf); + + Objects.requireNonNull(request); + insertsPublisher.subscribe(new StreamInsertsSubscriber(context, request)); + + this.acksPublisher = new AcksPublisherImpl(context); + cf.complete(acksPublisher); + } + + @Override + protected void doHandleBodyBuffer(final Buffer buff) { + final JsonObject jsonObject = new JsonObject(buff); + final long seqNum = jsonObject.getLong("seq"); + final String status = jsonObject.getString("status"); + if ("ok".equals(status)) { + final InsertAck ack = new InsertAckImpl(seqNum); + final boolean full = acksPublisher.accept(ack); + if (full && !paused) { + recordParser.pause(); + acksPublisher.drainHandler(this::publisherReceptive); + paused = true; + } + } else if ("error".equals(status)) { + acksPublisher.handleError(new KsqlClientException(String.format( + "Received error from /inserts-stream. Inserts sequence number: %d. " + + "Error code: %d. Message: %s", + seqNum, + jsonObject.getInteger("error_code"), + jsonObject.getString("message") + ))); + } else { + throw new IllegalStateException( + "Unrecognized status response from /inserts-stream: " + status); + } + } + + @Override + protected void doHandleException(final Throwable t) { + log.error(t); + acksPublisher.handleError(new Exception(t)); + } + + @Override + protected void doHandleBodyEnd() { + acksPublisher.complete(); + } + + private void publisherReceptive() { + checkContext(); + + paused = false; + recordParser.resume(); + } +} diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java new file mode 100644 index 000000000000..b79e2a7d2360 --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import io.confluent.ksql.api.client.KsqlObject; +import io.confluent.ksql.reactive.BaseSubscriber; +import io.vertx.core.Context; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.http.HttpClientRequest; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import java.util.Objects; +import org.reactivestreams.Subscription; + +public class StreamInsertsSubscriber extends BaseSubscriber { + + private static final Logger log = LoggerFactory.getLogger(StreamInsertsSubscriber.class); + + private static final int REQUEST_BATCH_SIZE = 200; + + private final HttpClientRequest httpRequest; + private int outstandingTokens; + private boolean drainHandlerSet; + + public StreamInsertsSubscriber(final Context context, final HttpClientRequest httpRequest) { + super(context); + this.httpRequest = Objects.requireNonNull(httpRequest); + } + + @Override + protected void afterSubscribe(final Subscription subscription) { + checkRequest(); + } + + @Override + protected void handleValue(final KsqlObject row) { + httpRequest.write(Buffer.buffer().appendString(row.toJsonString()).appendString("\n")); // TODO: is this extra newline needed? + outstandingTokens--; + + if (httpRequest.writeQueueFull()) { + if (!drainHandlerSet) { + httpRequest.drainHandler(this::httpRequestReceptive); + drainHandlerSet = true; + } else { + checkRequest(); + } + } + } + + @Override + protected void handleComplete() { + httpRequest.end(); + } + + @Override + protected void handleError(final Throwable t) { + log.error("Received error from streamInserts() publisher. Ending connection.", t); + httpRequest.end(); + } + + private void checkRequest() { + if (outstandingTokens == 0) { + outstandingTokens = REQUEST_BATCH_SIZE; + makeRequest(REQUEST_BATCH_SIZE); + } + } + + private void httpRequestReceptive(final Void v) { + drainHandlerSet = false; + checkRequest(); + } +} diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index 1fd18540475f..ffa2a8fa55ae 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -566,6 +566,21 @@ public void shouldHandleErrorFromInsertInto() { assertThat(e.getCause().getMessage(), containsString("Message: Error in processing inserts")); } + @Test + public void shouldStreamInserts() { + + } + + @Test + public void shouldHandleErrorResponseFromStreamInserts() { + + } + + @Test + public void shouldHandleErrorFromStreamInserts() { + + } + protected Client createJavaClient() { return Client.create(createJavaClientOptions(), vertx); } From ec060cb236925ec23c8b81bc94fd056114aab98e Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 17 Jun 2020 13:21:54 -0700 Subject: [PATCH 02/12] chore: add example publisher --- .../ksql/api/client/InsertsPublisher.java | 184 ++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java new file mode 100644 index 000000000000..220269deeb9c --- /dev/null +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java @@ -0,0 +1,184 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client; + +import java.util.ArrayDeque; +import java.util.Objects; +import java.util.Queue; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +public class InsertsPublisher implements Publisher { + + public static final int DEFAULT_BUFFER_MAX_SIZE = 200; + + private Subscriber subscriber; + private final Queue buffer = new ArrayDeque<>(); + private final int bufferMaxSize; + private long demand; + private Runnable drainHandler; + private volatile boolean cancelled; + private boolean complete; + private boolean shouldSendComplete; + private boolean sentComplete; + + /** + * Construct an InsertsPublisher + */ + public InsertsPublisher() { + this(DEFAULT_BUFFER_MAX_SIZE); + } + + /** + * Construct an InsertsPublisher + * + * @param bufferMaxSize Indicative max number of elements to store in the buffer. Note that this + * is not enforced, but it used to determine what to return from the accept + * method so the caller can stop sending more and set a drainHandler to be + * notified when the buffer is cleared + */ + public InsertsPublisher(final int bufferMaxSize) { + this.bufferMaxSize = bufferMaxSize; + } + + /** + * Provide a new row for insertion. The publisher will attempt to deliver it to server endpoint, + * assuming the streamInserts() request has been made. The publisher will buffer it internally + * if it can't deliver it immediately. + * + * @param row The element + * @return true if the internal buffer is 'full'. I.e. if number of elements is >= bufferMaxSize. + */ + public synchronized boolean accept(final KsqlObject row) { + if (complete || sentComplete) { + throw new IllegalStateException("Cannot call accept after complete is called"); + } + if (!cancelled && !isFailed()) { + if (demand == 0) { + buffer.add(row); + } else { + doOnNext(row); + } + } + return buffer.size() >= bufferMaxSize; + } + + /** + * If you set a drain handler. It will be called if, after delivery is attempted there are zero + * elements buffered internally and there is demand from the subscriber for more elements. Drain + * handlers are one shot handlers, after being called it will never be called more than once. + * + * @param handler The handler + */ + public synchronized void drainHandler(final Runnable handler) { + if (drainHandler != null) { + throw new IllegalStateException("drainHandler already set"); + } + this.drainHandler = Objects.requireNonNull(handler); + } + + /** + * Mark the incoming stream of elements as complete. This means onComplete will be called on any + * subscriber after any buffered messages have been delivered. Once complete has been called no + * further elements will be accepted + */ + public synchronized void complete() { + if (complete || isFailed()) { + return; + } + complete = true; + if (buffer.isEmpty() && subscriber != null) { + sendComplete(); + } else { + shouldSendComplete = true; + } + } + + @Override + public synchronized void subscribe(final Subscriber subscriber) { + if (this.subscriber != null) { + throw new IllegalStateException("Cannot subscribe a new subscriber as one is already present."); + } + + this.subscriber = subscriber; + subscriber.onSubscribe(new Subscription() { + @Override + public void request(final long l) { + doRequest(l); + } + + @Override + public void cancel() { + doCancel(); + } + }); + } + + private synchronized void doRequest(final long n) { + if (n <= 0) { + subscriber.onError((new IllegalArgumentException("Amount requested must be > 0"))); + } else if (demand + n < 1) { + // Catch overflow and set to "infinite" + demand = Long.MAX_VALUE; + maybeSend(); + } else { + demand += n; + maybeSend(); + } + } + + private synchronized void doCancel() { + cancelled = true; + subscriber = null; + } + + private void maybeSend() { + while (demand > 0 && !buffer.isEmpty()) { + final KsqlObject val = buffer.poll(); + doOnNext(val); + } + + if (buffer.isEmpty() && !isFailed()) { + if (shouldSendComplete) { + sendComplete(); + shouldSendComplete = false; + } else if (demand > 0 && drainHandler != null) { + drainHandler.run(); + drainHandler = null; + } + } + } + + private void doOnNext(final KsqlObject row) { + subscriber.onNext(row); + + // If demand == Long.MAX_VALUE this means "infinite demand" + if (demand != Long.MAX_VALUE) { + demand--; + } + } + + private void sendComplete() { + sentComplete = true; + subscriber.onComplete(); + } + + // TODO: is this necessary? probably not + private boolean isFailed() { + return false; + } +} From 309e874c5c1328b81dc30eea67a68d0d7a5a20ae Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 17 Jun 2020 13:22:32 -0700 Subject: [PATCH 03/12] chore: clean up isFailed() from example publisher --- .../confluent/ksql/api/client/InsertsPublisher.java | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java index 220269deeb9c..801128ff186b 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java @@ -67,7 +67,7 @@ public synchronized boolean accept(final KsqlObject row) { if (complete || sentComplete) { throw new IllegalStateException("Cannot call accept after complete is called"); } - if (!cancelled && !isFailed()) { + if (!cancelled) { if (demand == 0) { buffer.add(row); } else { @@ -97,7 +97,7 @@ public synchronized void drainHandler(final Runnable handler) { * further elements will be accepted */ public synchronized void complete() { - if (complete || isFailed()) { + if (complete) { return; } complete = true; @@ -152,7 +152,7 @@ private void maybeSend() { doOnNext(val); } - if (buffer.isEmpty() && !isFailed()) { + if (buffer.isEmpty()) { if (shouldSendComplete) { sendComplete(); shouldSendComplete = false; @@ -176,9 +176,4 @@ private void sendComplete() { sentComplete = true; subscriber.onComplete(); } - - // TODO: is this necessary? probably not - private boolean isFailed() { - return false; - } } From f56d070064651256681ab855f1c7f9d6613cf640 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 17 Jun 2020 15:24:04 -0700 Subject: [PATCH 04/12] test: add unit test --- .../confluent/ksql/api/client/ClientTest.java | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index ffa2a8fa55ae..f4c6a171a325 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -54,6 +54,7 @@ import java.util.stream.Collectors; import org.junit.Test; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -567,8 +568,41 @@ public void shouldHandleErrorFromInsertInto() { } @Test - public void shouldStreamInserts() { + public void shouldStreamInserts() throws Exception { + // Given: + final InsertsPublisher insertsPublisher = new InsertsPublisher(); + + // When: + final AcksPublisher acksPublisher = javaClient.streamInserts("test-stream", insertsPublisher).get(); + for (final KsqlObject row : INSERT_ROWS) { + insertsPublisher.accept(row); + } + + TestSubscriber acksSubscriber = subscribeAndWait(acksPublisher); + acksSubscriber.getSub().request(INSERT_ROWS.size()); + + // Then: + assertThatEventually(() -> testEndpoints.getInsertsSubscriber().getRowsInserted(), hasSize(INSERT_ROWS.size())); + for (int i = 0; i < INSERT_ROWS.size(); i++) { + assertThat(testEndpoints.getInsertsSubscriber().getRowsInserted().get(i), is(EXPECTED_INSERT_ROWS.get(i))); + } + assertThat(testEndpoints.getLastTarget(), is("test-stream")); + assertThat(testEndpoints.getInsertsSubscriber().isCompleted(), is(false)); + assertThat(testEndpoints.getInsertsSubscriber().isClosed(), is(false)); + assertThatEventually(acksSubscriber::getValues, hasSize(INSERT_ROWS.size())); + assertThat(acksSubscriber.getError(), is(nullValue())); + for (int i = 0; i < INSERT_ROWS.size(); i++) { + assertThat(acksSubscriber.getValues().get(i).seqNum(), is(i)); + } + + // When: + insertsPublisher.complete(); + + // Then: + assertThatEventually(() -> testEndpoints.getInsertsSubscriber().isCompleted(), is(true)); + assertThatEventually(() -> testEndpoints.getInsertsSubscriber().isClosed(), is(true)); + assertThatEventually(acksSubscriber::isCompleted, is(true)); } @Test From 1a61ed042b86ea3aa0c2cbca8377d4ce353d496b Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 17 Jun 2020 15:37:57 -0700 Subject: [PATCH 05/12] fix: switch to writing HTTP/2 frames --- .../java/io/confluent/ksql/api/client/impl/ClientImpl.java | 5 ++++- .../ksql/api/client/impl/StreamInsertsSubscriber.java | 2 +- .../test/java/io/confluent/ksql/api/client/ClientTest.java | 5 ++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 28244420d27b..a968a576ebc8 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -238,7 +238,10 @@ private > void makeRequest( if (endRequest) { request.end(requestBody); } else { - request.write(requestBody); + final HttpClientRequest finalRequest = request; + finalRequest.sendHead(version -> { + finalRequest.writeCustomFrame(0, 0, requestBody); + }); } } diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java index b79e2a7d2360..c0aae4d1e4e9 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java @@ -47,7 +47,7 @@ protected void afterSubscribe(final Subscription subscription) { @Override protected void handleValue(final KsqlObject row) { - httpRequest.write(Buffer.buffer().appendString(row.toJsonString()).appendString("\n")); // TODO: is this extra newline needed? + httpRequest.writeCustomFrame(0, 0, Buffer.buffer().appendString(row.toJsonString()).appendString("\n")); // TODO: is this extra newline needed? outstandingTokens--; if (httpRequest.writeQueueFull()) { diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index f4c6a171a325..879ce85c48fb 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -54,7 +54,6 @@ import java.util.stream.Collectors; import org.junit.Test; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -607,12 +606,12 @@ public void shouldStreamInserts() throws Exception { @Test public void shouldHandleErrorResponseFromStreamInserts() { - + // TODO } @Test public void shouldHandleErrorFromStreamInserts() { - + // TODO } protected Client createJavaClient() { From 4fb75c27cb8b6606c89a7b3bcb406a5f0a505168 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 17 Jun 2020 15:53:42 -0700 Subject: [PATCH 06/12] fix: fix failing test by disabling http2 cleartext upgrade --- .../main/java/io/confluent/ksql/api/client/impl/ClientImpl.java | 1 + .../src/test/java/io/confluent/ksql/api/client/ClientTest.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index a968a576ebc8..9dca39c12b8c 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -297,6 +297,7 @@ private static HttpClient createHttpClient(final Vertx vertx, final ClientOption .setSsl(clientOptions.isUseTls()) .setUseAlpn(clientOptions.isUseAlpn()) .setProtocolVersion(HttpVersion.HTTP_2) + .setHttp2ClearTextUpgrade(false) .setVerifyHost(clientOptions.isVerifyHost()) .setDefaultHost(clientOptions.getHost()) .setDefaultPort(clientOptions.getPort()); diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index 879ce85c48fb..ed3c3e21c638 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -592,7 +592,7 @@ public void shouldStreamInserts() throws Exception { assertThatEventually(acksSubscriber::getValues, hasSize(INSERT_ROWS.size())); assertThat(acksSubscriber.getError(), is(nullValue())); for (int i = 0; i < INSERT_ROWS.size(); i++) { - assertThat(acksSubscriber.getValues().get(i).seqNum(), is(i)); + assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i))); } // When: From 7189c4065abc5c7b3f9c63f60ad7d2544eb85f1a Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 17 Jun 2020 16:33:42 -0700 Subject: [PATCH 07/12] test: more unit tests --- .../confluent/ksql/api/client/ClientTest.java | 49 ++++++++++++++++--- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index ed3c3e21c638..d56a1179d931 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -586,32 +586,67 @@ public void shouldStreamInserts() throws Exception { assertThat(testEndpoints.getInsertsSubscriber().getRowsInserted().get(i), is(EXPECTED_INSERT_ROWS.get(i))); } assertThat(testEndpoints.getLastTarget(), is("test-stream")); - assertThat(testEndpoints.getInsertsSubscriber().isCompleted(), is(false)); - assertThat(testEndpoints.getInsertsSubscriber().isClosed(), is(false)); assertThatEventually(acksSubscriber::getValues, hasSize(INSERT_ROWS.size())); assertThat(acksSubscriber.getError(), is(nullValue())); for (int i = 0; i < INSERT_ROWS.size(); i++) { assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i))); } + assertThat(acksSubscriber.isCompleted(), is(false)); // When: insertsPublisher.complete(); // Then: - assertThatEventually(() -> testEndpoints.getInsertsSubscriber().isCompleted(), is(true)); - assertThatEventually(() -> testEndpoints.getInsertsSubscriber().isClosed(), is(true)); assertThatEventually(acksSubscriber::isCompleted, is(true)); } @Test public void shouldHandleErrorResponseFromStreamInserts() { - // TODO + // Given + KsqlApiException exception = new KsqlApiException("Cannot insert into a table", ERROR_CODE_BAD_REQUEST); + testEndpoints.setCreateInsertsSubscriberException(exception); + + // When + final Exception e = assertThrows( + ExecutionException.class, // thrown from .get() when the future completes exceptionally + () -> javaClient.streamInserts("a-table", new InsertsPublisher()).get() + ); + + // Then + assertThat(e.getCause(), instanceOf(KsqlClientException.class)); + assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); + assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); } @Test - public void shouldHandleErrorFromStreamInserts() { - // TODO + public void shouldHandleErrorFromStreamInserts() throws Exception { + // Given: + testEndpoints.setAcksBeforePublisherError(INSERT_ROWS.size() - 1); + final InsertsPublisher insertsPublisher = new InsertsPublisher(); + + // When: + final AcksPublisher acksPublisher = javaClient.streamInserts("test-stream", insertsPublisher).get(); + for (int i = 0; i < INSERT_ROWS.size(); i++) { + insertsPublisher.accept(INSERT_ROWS.get(i)); + } + + TestSubscriber acksSubscriber = subscribeAndWait(acksPublisher); + acksSubscriber.getSub().request(INSERT_ROWS.size() - 1); // Error is sent even if not requested + + // Then: + // No ack is emitted for the row that generates the error, but the row still counts as having been inserted + assertThatEventually(() -> testEndpoints.getInsertsSubscriber().getRowsInserted(), hasSize(INSERT_ROWS.size())); + for (int i = 0; i < INSERT_ROWS.size(); i++) { + assertThat(testEndpoints.getInsertsSubscriber().getRowsInserted().get(i), is(EXPECTED_INSERT_ROWS.get(i))); + } + assertThat(testEndpoints.getLastTarget(), is("test-stream")); + + assertThatEventually(acksSubscriber::getValues, hasSize(INSERT_ROWS.size() - 1)); + for (int i = 0; i < INSERT_ROWS.size() - 1; i++) { + assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i))); + } + assertThatEventually(acksSubscriber::getError, is(notNullValue())); } protected Client createJavaClient() { From 5de76cf9c8fea67519e9e3cb4741a0c4e2ff404f Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 17 Jun 2020 17:00:18 -0700 Subject: [PATCH 08/12] test: add integration test --- .../integration/ClientIntegrationTest.java | 81 ++++++++++++++++++- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index f41d62758b62..bd8a2df61cbb 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -35,10 +35,13 @@ import com.google.common.collect.Multimap; import io.confluent.common.utils.IntegrationTest; import io.confluent.ksql.GenericRow; +import io.confluent.ksql.api.client.AcksPublisher; import io.confluent.ksql.api.client.BatchedQueryResult; import io.confluent.ksql.api.client.Client; import io.confluent.ksql.api.client.ClientOptions; import io.confluent.ksql.api.client.ColumnType; +import io.confluent.ksql.api.client.InsertAck; +import io.confluent.ksql.api.client.InsertsPublisher; import io.confluent.ksql.api.client.KsqlArray; import io.confluent.ksql.api.client.KsqlClientException; import io.confluent.ksql.api.client.KsqlObject; @@ -112,6 +115,11 @@ public class ClientIntegrationTest { private static final String EMPTY_TEST_TOPIC = EMPTY_TEST_DATA_PROVIDER.topicName(); private static final String EMPTY_TEST_STREAM = EMPTY_TEST_DATA_PROVIDER.kstreamName(); + private static final TestDataProvider EMPTY_TEST_DATA_PROVIDER_2 = new TestDataProvider<>( + "EMPTY_STRUCTURED_TYPES_2", TEST_DATA_PROVIDER.schema(), ImmutableListMultimap.of()); + private static final String EMPTY_TEST_TOPIC_2 = EMPTY_TEST_DATA_PROVIDER_2.topicName(); + private static final String EMPTY_TEST_STREAM_2 = EMPTY_TEST_DATA_PROVIDER_2.kstreamName(); + private static final String PUSH_QUERY = "SELECT * FROM " + TEST_STREAM + " EMIT CHANGES;"; private static final String PULL_QUERY = "SELECT * from " + AGG_TABLE + " WHERE STR='" + AN_AGG_KEY + "';"; private static final int PUSH_QUERY_LIMIT_NUM_ROWS = 2; @@ -138,17 +146,16 @@ public class ClientIntegrationTest { @BeforeClass public static void setUpClass() { - TEST_HARNESS.ensureTopics(TEST_TOPIC); + TEST_HARNESS.ensureTopics(TEST_TOPIC, EMPTY_TEST_TOPIC, EMPTY_TEST_TOPIC_2); TEST_HARNESS.produceRows(TEST_TOPIC, TEST_DATA_PROVIDER, FormatFactory.JSON); RestIntegrationTestUtil.createStream(REST_APP, TEST_DATA_PROVIDER); + RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER); + RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER_2); makeKsqlRequest("CREATE TABLE " + AGG_TABLE + " AS " + "SELECT STR, LATEST_BY_OFFSET(LONG) AS LONG FROM " + TEST_STREAM + " GROUP BY STR;" ); - TEST_HARNESS.ensureTopics(EMPTY_TEST_TOPIC); - RestIntegrationTestUtil.createStream(REST_APP, EMPTY_TEST_DATA_PROVIDER); - TEST_HARNESS.verifyAvailableUniqueRows( AGG_TABLE, 4, // Only unique keys are counted @@ -560,6 +567,72 @@ public void shouldExecuteQueryWithProperties() { assertThat(row.getKsqlObject("MAP"), is(new KsqlObject().put("test_name", "shouldExecuteQueryWithProperties"))); } + @Test + public void shouldStreamInserts() throws Exception { + // Given + final InsertsPublisher insertsPublisher = new InsertsPublisher(); + final int numRows = 5; + + // When + final AcksPublisher acksPublisher = client.streamInserts(EMPTY_TEST_STREAM_2, insertsPublisher).get(); + + TestSubscriber acksSubscriber = subscribeAndWait(acksPublisher); + assertThat(acksSubscriber.getValues(), hasSize(0)); + acksSubscriber.getSub().request(numRows); + + for (int i = 0; i < numRows; i++) { + insertsPublisher.accept(new KsqlObject() + .put("STR", "TEST_" + i) + .put("LONG", i) + .put("DEC", new BigDecimal("13.31")) + .put("ARRAY", new KsqlArray().add("v_" + i)) + .put("MAP", new KsqlObject().put("k_" + i, "v_" + i))); + } + + // Then + assertThatEventually(acksSubscriber::getValues, hasSize(numRows)); + for (int i = 0; i < numRows; i++) { + assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i))); + } + assertThat(acksSubscriber.getError(), is(nullValue())); + assertThat(acksSubscriber.isCompleted(), is(false)); + + // Then: should receive new rows + final String query = "SELECT * FROM " + EMPTY_TEST_STREAM_2 + " EMIT CHANGES LIMIT " + numRows + ";"; + final List rows = client.executeQuery(query).get(); + + // Verify inserted rows are as expected + assertThat(rows, hasSize(numRows)); + for (int i = 0; i < numRows; i++) { + assertThat(rows.get(i).getString("STR"), is("TEST_" + i)); + assertThat(rows.get(i).getLong("LONG"), is(Long.valueOf(i))); + assertThat(rows.get(i).getDecimal("DEC"), is(new BigDecimal("13.31"))); + assertThat(rows.get(i).getKsqlArray("ARRAY"), is(new KsqlArray().add("v_" + i))); + assertThat(rows.get(i).getKsqlObject("MAP"), is(new KsqlObject().put("k_" + i, "v_" + i))); + } + + // When: end connection + insertsPublisher.complete(); + + // Then + assertThatEventually(acksSubscriber::isCompleted, is(true)); + assertThat(acksSubscriber.getError(), is(nullValue())); + } + + @Test + public void shouldHandleErrorResponseFromStreamInserts() { + // When + final Exception e = assertThrows( + ExecutionException.class, // thrown from .get() when the future completes exceptionally + () -> client.streamInserts(AGG_TABLE, new InsertsPublisher()).get() + ); + + // Then + assertThat(e.getCause(), instanceOf(KsqlClientException.class)); + assertThat(e.getCause().getMessage(), containsString("Received 400 response from server")); + assertThat(e.getCause().getMessage(), containsString("Cannot insert into a table")); + } + private Client createClient() { final ClientOptions clientOptions = ClientOptions.create() .setHost("localhost") From 627e0f442d67f63040ea2a5a6d74fbc1e92c4342 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 17 Jun 2020 21:01:16 -0700 Subject: [PATCH 09/12] chore: checkstyle and javadocs --- .../ksql/api/client/AcksPublisher.java | 5 +- .../io/confluent/ksql/api/client/Client.java | 15 +++++ .../confluent/ksql/api/client/InsertAck.java | 12 ++++ .../ksql/api/client/InsertsPublisher.java | 55 +++++++++++++------ .../ksql/api/client/impl/ClientImpl.java | 4 +- .../client/impl/StreamInsertsSubscriber.java | 5 +- .../confluent/ksql/api/client/ClientTest.java | 8 +++ .../integration/ClientIntegrationTest.java | 6 ++ 8 files changed, 88 insertions(+), 22 deletions(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java index 75194282fbf7..e12265b2a774 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/AcksPublisher.java @@ -18,7 +18,8 @@ import org.reactivestreams.Publisher; /** - * TODO + * A Reactive Streams Publisher that publishes server acknowledgments for rows inserted into + * an existing ksqlDB stream via {@link Client#streamInserts(String, Publisher)}. */ public interface AcksPublisher extends Publisher { @@ -26,7 +27,7 @@ public interface AcksPublisher extends Publisher { * Returns whether the {@code AcksPublisher} is complete. * *

An {@code AcksPublisher} is complete if the HTTP connection associated with this - * {@code insertsStream()} request has been ended gracefully. Once complete, the + * {@link Client#streamInserts} request has been ended gracefully. Once complete, the * {@code AcksPublisher} will continue to deliver any remaining rows, then call * {@code onComplete()} on the subscriber, if present. * diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java index 6a36ff8a462d..936e520db5c2 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/Client.java @@ -91,6 +91,21 @@ public interface Client { */ CompletableFuture insertInto(String streamName, KsqlObject row); + /** + * Inserts rows into a ksqlDB stream. Rows to insert are supplied by a + * {@code org.reactivestreams.Publisher} and server acknowledgments are exposed similarly. + * + *

The {@code CompletableFuture} will be failed if a non-200 response is received from the + * server. + * + *

See {@link InsertsPublisher} for an example publisher that may be passed an argument to + * this method. + * + * @param streamName name of the target stream + * @param insertsPublisher the publisher to provide rows to insert + * @return a future that completes once the initial server response is received, and contains a + * publisher that publishes server acknowledgments for inserted rows. + */ CompletableFuture streamInserts(String streamName, Publisher insertsPublisher); diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java index 76af57980c8b..bb6326fd87fd 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertAck.java @@ -15,8 +15,20 @@ package io.confluent.ksql.api.client; +import org.reactivestreams.Publisher; + +/** + * An acknowledgment from the ksqlDB server that a row has been successfully inserted into a + * ksqlDB stream. See {@link Client#streamInserts(String, Publisher)} for details. + */ public interface InsertAck { + /** + * Returns the corresponding sequence number for this acknowledgment. Sequence numbers start at + * zero for each new {@link Client#streamInserts} request. + * + * @return the sequence number + */ long seqNum(); } diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java index 801128ff186b..50edb82eef1a 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/InsertsPublisher.java @@ -22,8 +22,18 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +/** + * A {@code org.reactivestreams.Publisher} suitable for use with the + * {@link Client#streamInserts(String, Publisher)} method. Rows for insertion are passed to the + * publisher via the {@link #accept(KsqlObject)} method, and buffered for delivery once the + * {@link Client#streamInserts} request is made and the server-side subscriber has been subscribed. + */ public class InsertsPublisher implements Publisher { + /** + * The buffer max size indicator value used by the default constructor. See + * {@link #InsertsPublisher(int)} for how this value is used. + */ public static final int DEFAULT_BUFFER_MAX_SIZE = 200; private Subscriber subscriber; @@ -37,31 +47,34 @@ public class InsertsPublisher implements Publisher { private boolean sentComplete; /** - * Construct an InsertsPublisher + * Creates an {@code InsertsPublisher}. */ public InsertsPublisher() { this(DEFAULT_BUFFER_MAX_SIZE); } /** - * Construct an InsertsPublisher + * Creates an {@code InsertsPublisher}. * * @param bufferMaxSize Indicative max number of elements to store in the buffer. Note that this - * is not enforced, but it used to determine what to return from the accept - * method so the caller can stop sending more and set a drainHandler to be - * notified when the buffer is cleared + * value is not enforced, but it used to determine what to return from the + * {@link #accept(KsqlObject)} method so the caller can stop sending more + * rows and set a drainHandler to be notified when the buffer is cleared */ public InsertsPublisher(final int bufferMaxSize) { this.bufferMaxSize = bufferMaxSize; } /** - * Provide a new row for insertion. The publisher will attempt to deliver it to server endpoint, - * assuming the streamInserts() request has been made. The publisher will buffer it internally - * if it can't deliver it immediately. + * Provides a new row for insertion. The publisher will attempt to deliver it to server endpoint, + * once the {@link Client#streamInserts} request has been made. The publisher will buffer the row + * internally if it can't deliver it immediately. Note that the row will be buffered even if the + * buffer is 'full', i.e., if number of elements is at least {@code bufferMaxSize}, as the + * {@code bufferMaxSize} value is not a hard limit. See {@link #InsertsPublisher(int)} for more. * - * @param row The element - * @return true if the internal buffer is 'full'. I.e. if number of elements is >= bufferMaxSize. + * @param row the row to insert + * @return whether the internal buffer is 'full', i.e., if number of elements is at least + * {@code bufferMaxSize}. */ public synchronized boolean accept(final KsqlObject row) { if (complete || sentComplete) { @@ -78,11 +91,16 @@ public synchronized boolean accept(final KsqlObject row) { } /** - * If you set a drain handler. It will be called if, after delivery is attempted there are zero - * elements buffered internally and there is demand from the subscriber for more elements. Drain - * handlers are one shot handlers, after being called it will never be called more than once. + * Sets a drain handler on the publisher. The drain handler will be called if after a row is + * delivered there are zero elements buffered internally and there is demand from the subscriber + * for more elements. Drain handlers may be used in combination with the return value from + * {@link #accept(KsqlObject)} to ensure the publisher's buffer does not grow too large. + * + *

Drain handlers are one shot handlers; after a drain handler is called it + * will never be called again. Instead, the caller should set a new drain handler for subsequent + * use. * - * @param handler The handler + * @param handler the drain handler */ public synchronized void drainHandler(final Runnable handler) { if (drainHandler != null) { @@ -92,9 +110,9 @@ public synchronized void drainHandler(final Runnable handler) { } /** - * Mark the incoming stream of elements as complete. This means onComplete will be called on any - * subscriber after any buffered messages have been delivered. Once complete has been called no - * further elements will be accepted + * Marks the incoming stream of elements as complete. This means no further rows will be accepted + * by the publisher and the {@link Client#streamInserts} connection will be closed once any + * buffered rows have been delivered for insertion. */ public synchronized void complete() { if (complete) { @@ -111,7 +129,8 @@ public synchronized void complete() { @Override public synchronized void subscribe(final Subscriber subscriber) { if (this.subscriber != null) { - throw new IllegalStateException("Cannot subscribe a new subscriber as one is already present."); + throw new IllegalStateException( + "Cannot subscribe a new subscriber: A subscriber is already present."); } this.subscriber = subscriber; diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java index 9dca39c12b8c..6b7cffe2c5f6 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/ClientImpl.java @@ -46,7 +46,9 @@ import java.util.concurrent.CompletableFuture; import org.reactivestreams.Publisher; +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class ClientImpl implements Client { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private final ClientOptions clientOptions; private final Vertx vertx; @@ -147,7 +149,7 @@ public CompletableFuture streamInserts( final Buffer requestBody = Buffer.buffer(); final JsonObject params = new JsonObject().put("target", streamName); - requestBody.appendBuffer(params.toBuffer()).appendString("\n"); // TODO: is this extra newline needed? + requestBody.appendBuffer(params.toBuffer()).appendString("\n"); makeRequest( "/inserts-stream", diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java index c0aae4d1e4e9..4a97deba3632 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsSubscriber.java @@ -47,7 +47,10 @@ protected void afterSubscribe(final Subscription subscription) { @Override protected void handleValue(final KsqlObject row) { - httpRequest.writeCustomFrame(0, 0, Buffer.buffer().appendString(row.toJsonString()).appendString("\n")); // TODO: is this extra newline needed? + httpRequest.writeCustomFrame( + 0, 0, + Buffer.buffer().appendString(row.toJsonString()).appendString("\n") + ); outstandingTokens--; if (httpRequest.writeQueueFull()) { diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java index b04a6717e5cc..a0231a21fd63 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/ClientTest.java @@ -594,10 +594,15 @@ public void shouldStreamInserts() throws Exception { } assertThat(acksSubscriber.isCompleted(), is(false)); + assertThat(acksPublisher.isComplete(), is(false)); + assertThat(acksPublisher.isFailed(), is(false)); + // When: insertsPublisher.complete(); // Then: + assertThatEventually(acksPublisher::isComplete, is(true)); + assertThat(acksPublisher.isFailed(), is(false)); assertThatEventually(acksSubscriber::isCompleted, is(true)); } @@ -647,6 +652,9 @@ public void shouldHandleErrorFromStreamInserts() throws Exception { assertThat(acksSubscriber.getValues().get(i).seqNum(), is(Long.valueOf(i))); } assertThatEventually(acksSubscriber::getError, is(notNullValue())); + + assertThat(acksPublisher.isFailed(), is(true)); + assertThat(acksPublisher.isComplete(), is(false)); } protected Client createJavaClient() { diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java index 40e938efc064..da5d259140d5 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ClientIntegrationTest.java @@ -624,6 +624,9 @@ public void shouldStreamInserts() throws Exception { assertThat(acksSubscriber.getError(), is(nullValue())); assertThat(acksSubscriber.isCompleted(), is(false)); + assertThat(acksPublisher.isComplete(), is(false)); + assertThat(acksPublisher.isFailed(), is(false)); + // Then: should receive new rows final String query = "SELECT * FROM " + EMPTY_TEST_STREAM_2 + " EMIT CHANGES LIMIT " + numRows + ";"; final List rows = client.executeQuery(query).get(); @@ -645,6 +648,9 @@ public void shouldStreamInserts() throws Exception { // Then assertThatEventually(acksSubscriber::isCompleted, is(true)); assertThat(acksSubscriber.getError(), is(nullValue())); + + assertThat(acksPublisher.isComplete(), is(true)); + assertThat(acksPublisher.isFailed(), is(false)); } @Test From b2ec8e47f0804457548b3610f3313b0717b9de50 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 24 Jun 2020 11:54:23 -0700 Subject: [PATCH 10/12] chore: fix merge from master --- .../ksql/api/client/impl/StreamInsertsResponseHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.java index 063158f64782..50107a32d303 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/StreamInsertsResponseHandler.java @@ -17,8 +17,8 @@ import io.confluent.ksql.api.client.AcksPublisher; import io.confluent.ksql.api.client.InsertAck; -import io.confluent.ksql.api.client.KsqlClientException; import io.confluent.ksql.api.client.KsqlObject; +import io.confluent.ksql.api.client.exception.KsqlClientException; import io.vertx.core.Context; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientRequest; From e3cb408736d9374b42163e8ce991d7518181739a Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Wed, 24 Jun 2020 16:51:21 -0700 Subject: [PATCH 11/12] chore: add equals/hashCode/toString methods to InsertAckImpl --- .../ksql/api/client/impl/InsertAckImpl.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertAckImpl.java b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertAckImpl.java index 77368d99c1c6..d6c7f16a0f41 100644 --- a/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertAckImpl.java +++ b/ksqldb-api-client/src/main/java/io/confluent/ksql/api/client/impl/InsertAckImpl.java @@ -16,6 +16,7 @@ package io.confluent.ksql.api.client.impl; import io.confluent.ksql.api.client.InsertAck; +import java.util.Objects; public class InsertAckImpl implements InsertAck { @@ -29,4 +30,28 @@ public class InsertAckImpl implements InsertAck { public long seqNum() { return num; } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final InsertAckImpl insertAck = (InsertAckImpl) o; + return num == insertAck.num; + } + + @Override + public int hashCode() { + return Objects.hash(num); + } + + @Override + public String toString() { + return "InsertAckImpl{" + + "num=" + num + + '}'; + } } From 685d4e9cc07f297b9134b61d040b7e0323e6ec40 Mon Sep 17 00:00:00 2001 From: Victoria Xia Date: Thu, 25 Jun 2020 14:06:27 -0700 Subject: [PATCH 12/12] test: add EqualsTester test --- .../api/client/impl/InsertAckImplTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/InsertAckImplTest.java diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/InsertAckImplTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/InsertAckImplTest.java new file mode 100644 index 000000000000..aab11fd2335e --- /dev/null +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/impl/InsertAckImplTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.api.client.impl; + +import com.google.common.testing.EqualsTester; +import org.junit.Test; + +public class InsertAckImplTest { + + @Test + public void shouldImplementHashCodeAndEquals() { + new EqualsTester() + .addEqualityGroup( + new InsertAckImpl(0L), + new InsertAckImpl(0L) + ) + .addEqualityGroup( + new InsertAckImpl(1L) + ) + .testEquals(); + } + +} \ No newline at end of file