Skip to content

Commit

Permalink
fix: fix flaky test (confluentinc#8457)
Browse files Browse the repository at this point in the history
* remove logs from test
  • Loading branch information
vpapavas committed Dec 10, 2021
1 parent adaf1bd commit cf7ac08
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ public void shouldRoundTripCVWhenPullQueryOnTableAsync() throws Exception {
true
);

assertThatEventually(streamedQueryResult::isComplete, is(true));
assertThat(((ClientImpl)client).getSerializedConsistencyVector(), is(notNullValue()));
final String serializedCV = ((ClientImpl)client).getSerializedConsistencyVector();
verifyConsistencyVector(serializedCV);
assertThatEventually(streamedQueryResult::isComplete, is(true));
}

@Test
Expand All @@ -212,7 +212,8 @@ public void shouldRoundTripCVWhenExecutePullQuery() throws Exception {

// Then
assertThat(batchedQueryResult.queryID().get(), is(nullValue()));
assertThat(((ClientImpl)client).getSerializedConsistencyVector(), is(notNullValue()));
assertThatEventually(() -> ((ClientImpl)client).getSerializedConsistencyVector(),
is(notNullValue()));
final String serializedCV = ((ClientImpl)client).getSerializedConsistencyVector();
verifyConsistencyVector(serializedCV);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.vertx.core.WorkerExecutor;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -59,13 +58,11 @@ public class BlockingQueryPublisher extends BasePublisher<KeyValueMetadata<List<
private QueryId queryId;
private boolean complete;
private volatile boolean closed;
private AtomicBoolean addedCT;

public BlockingQueryPublisher(final Context ctx,
final WorkerExecutor workerExecutor) {
super(ctx);
this.workerExecutor = Objects.requireNonNull(workerExecutor);
this.addedCT = new AtomicBoolean(false);
}

public void setQueryHandle(final QueryHandle queryHandle, final boolean isPullQuery,
Expand All @@ -78,6 +75,11 @@ public void setQueryHandle(final QueryHandle queryHandle, final boolean isPullQu
this.queryId = queryHandle.getQueryId();
this.queue.setQueuedCallback(this::maybeSend);
this.queue.setLimitHandler(() -> {
if (isPullQuery) {
queryHandle.getConsistencyOffsetVector().ifPresent(
((PullQueryQueue) queue)::putConsistencyVector);
maybeSend();
}
complete = true;
// This allows us to hit the limit without having to queue one last row
if (queue.isEmpty()) {
Expand All @@ -90,6 +92,11 @@ public void setQueryHandle(final QueryHandle queryHandle, final boolean isPullQu
// we should be returning a "Limit Reached" message as we do in the HTTP/1 endpoint when
// we hit the limit, but for query completion, we should just end the response stream.
this.queue.setCompletionHandler(() -> {
if (isPullQuery) {
queryHandle.getConsistencyOffsetVector().ifPresent(
((PullQueryQueue) queue)::putConsistencyVector);
maybeSend();
}
complete = true;
// This allows us to finish the query immediately if the query is already fully streamed.
if (queue.isEmpty()) {
Expand Down Expand Up @@ -148,7 +155,6 @@ protected void afterSubscribe() {
executeOnWorker(queryHandle::start);
}


private void executeOnWorker(final Runnable runnable) {
workerExecutor.executeBlocking(p -> runnable.run(), false, ar -> {
if (ar.failed()) {
Expand All @@ -166,11 +172,6 @@ private void doSend() {
while (getDemand() > 0 && !queue.isEmpty()) {
if (num < SEND_MAX_BATCH_SIZE) {
doOnNext(queue.poll());
if (complete && isPullQuery && !addedCT.get()) {
queryHandle.getConsistencyOffsetVector().ifPresent(
((PullQueryQueue)queue)::putConsistencyVector);
addedCT.set(true);
}
if (complete && queue.isEmpty()) {
ctx.runOnContext(v -> sendComplete());
}
Expand Down

0 comments on commit cf7ac08

Please sign in to comment.