From cf7ac08f18399f6dcc79bd542425f9ce7db1b1fa Mon Sep 17 00:00:00 2001 From: Vicky Papavasileiou Date: Thu, 9 Dec 2021 09:31:00 +0000 Subject: [PATCH] fix: fix flaky test (#8457) * remove logs from test --- ...ConsistencyOffsetVectorFunctionalTest.java | 5 +++-- .../ksql/api/impl/BlockingQueryPublisher.java | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ConsistencyOffsetVectorFunctionalTest.java b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ConsistencyOffsetVectorFunctionalTest.java index a617ed65a3dd..466afd10f33e 100644 --- a/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ConsistencyOffsetVectorFunctionalTest.java +++ b/ksqldb-api-client/src/test/java/io/confluent/ksql/api/client/integration/ConsistencyOffsetVectorFunctionalTest.java @@ -182,10 +182,10 @@ public void shouldRoundTripCVWhenPullQueryOnTableAsync() throws Exception { true ); + assertThatEventually(streamedQueryResult::isComplete, is(true)); assertThat(((ClientImpl)client).getSerializedConsistencyVector(), is(notNullValue())); final String serializedCV = ((ClientImpl)client).getSerializedConsistencyVector(); verifyConsistencyVector(serializedCV); - assertThatEventually(streamedQueryResult::isComplete, is(true)); } @Test @@ -212,7 +212,8 @@ public void shouldRoundTripCVWhenExecutePullQuery() throws Exception { // Then assertThat(batchedQueryResult.queryID().get(), is(nullValue())); - assertThat(((ClientImpl)client).getSerializedConsistencyVector(), is(notNullValue())); + assertThatEventually(() -> ((ClientImpl)client).getSerializedConsistencyVector(), + is(notNullValue())); final String serializedCV = ((ClientImpl)client).getSerializedConsistencyVector(); verifyConsistencyVector(serializedCV); } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java index 153a182af073..8bdd4d29cc61 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/BlockingQueryPublisher.java @@ -29,7 +29,6 @@ import io.vertx.core.WorkerExecutor; import java.util.List; import java.util.Objects; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,13 +58,11 @@ public class BlockingQueryPublisher extends BasePublisher { + if (isPullQuery) { + queryHandle.getConsistencyOffsetVector().ifPresent( + ((PullQueryQueue) queue)::putConsistencyVector); + maybeSend(); + } complete = true; // This allows us to hit the limit without having to queue one last row if (queue.isEmpty()) { @@ -90,6 +92,11 @@ public void setQueryHandle(final QueryHandle queryHandle, final boolean isPullQu // we should be returning a "Limit Reached" message as we do in the HTTP/1 endpoint when // we hit the limit, but for query completion, we should just end the response stream. this.queue.setCompletionHandler(() -> { + if (isPullQuery) { + queryHandle.getConsistencyOffsetVector().ifPresent( + ((PullQueryQueue) queue)::putConsistencyVector); + maybeSend(); + } complete = true; // This allows us to finish the query immediately if the query is already fully streamed. if (queue.isEmpty()) { @@ -148,7 +155,6 @@ protected void afterSubscribe() { executeOnWorker(queryHandle::start); } - private void executeOnWorker(final Runnable runnable) { workerExecutor.executeBlocking(p -> runnable.run(), false, ar -> { if (ar.failed()) { @@ -166,11 +172,6 @@ private void doSend() { while (getDemand() > 0 && !queue.isEmpty()) { if (num < SEND_MAX_BATCH_SIZE) { doOnNext(queue.poll()); - if (complete && isPullQuery && !addedCT.get()) { - queryHandle.getConsistencyOffsetVector().ifPresent( - ((PullQueryQueue)queue)::putConsistencyVector); - addedCT.set(true); - } if (complete && queue.isEmpty()) { ctx.runOnContext(v -> sendComplete()); }