From 931fb07e1676d159376164617e140fe0ba3b3165 Mon Sep 17 00:00:00 2001 From: Alan Sheinberg Date: Fri, 4 Mar 2022 10:00:44 -0800 Subject: [PATCH] fix: Ensures response end handler is invoked just once --- .../ksql/api/server/QueryStreamHandler.java | 18 +++++++++ .../api/server/QueryStreamHandlerTest.java | 38 +++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java index 60699e8433d1..7257c930c87c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,6 +170,11 @@ private void handleQueryPublisher( final QueryResponseMetadata metadata; Optional completionMessage = Optional.empty(); Optional limitMessage = Optional.of("Limit Reached"); + // The end handler can be called twice if the connection is closed by the client. The + // call to response.end() resulting from queryPublisher.close() may result in a second + // call to the end handler, which will mess up metrics, so we ensure that this called just + // once by keeping track of the calls. + final AtomicBoolean endedResponse = new AtomicBoolean(false); if (queryPublisher.isPullQuery()) { metadata = new QueryResponseMetadata( @@ -180,6 +186,10 @@ private void handleQueryPublisher( // When response is complete, publisher should be closed routingContext.response().endHandler(v -> { + if (endedResponse.getAndSet(true)) { + log.warn("Connection already closed so just returning"); + return; + } queryPublisher.close(); metricsCallbackHolder.reportMetrics( routingContext.response().getStatusCode(), @@ -195,6 +205,10 @@ private void handleQueryPublisher( preparePushProjectionSchema(queryPublisher.geLogicalSchema())); routingContext.response().endHandler(v -> { + if (endedResponse.getAndSet(true)) { + log.warn("Connection already closed so just returning"); + return; + } queryPublisher.close(); metricsCallbackHolder.reportMetrics( routingContext.response().getStatusCode(), @@ -215,6 +229,10 @@ private void handleQueryPublisher( // When response is complete, publisher should be closed and query unregistered routingContext.response().endHandler(v -> { + if (endedResponse.getAndSet(true)) { + log.warn("Connection already closed so just returning"); + return; + } query.close(); metricsCallbackHolder.reportMetrics( routingContext.response().getStatusCode(), diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/QueryStreamHandlerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/QueryStreamHandlerTest.java index f889c55b1ef2..d810c917c61c 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/QueryStreamHandlerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/QueryStreamHandlerTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -155,4 +156,41 @@ public void shouldSucceed_pushQuery() { assertThat(subscriber.getValue(), notNullValue()); verify(pushQueryHolder).close(); } + + @Test + public void shouldSucceed_scalablePushQuery() { + // Given: + when(queryPublisher.isPullQuery()).thenReturn(false); + when(queryPublisher.isScalablePushQuery()).thenReturn(true); + final QueryStreamArgs req = new QueryStreamArgs("select * from foo emit changes;", + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + givenRequest(req); + + // When: + handler.handle(routingContext); + endHandler.getValue().handle(null); + + // Then: + assertThat(subscriber.getValue(), notNullValue()); + verify(queryPublisher).close(); + } + + @Test + public void verifyEndHandlerNotCalledTwice_scalablePushQuery() { + // Given: + when(queryPublisher.isPullQuery()).thenReturn(false); + when(queryPublisher.isScalablePushQuery()).thenReturn(true); + final QueryStreamArgs req = new QueryStreamArgs("select * from foo emit changes;", + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + givenRequest(req); + + // When: + handler.handle(routingContext); + endHandler.getValue().handle(null); + endHandler.getValue().handle(null); + + // Then: + assertThat(subscriber.getValue(), notNullValue()); + verify(queryPublisher, times(1)).close(); + } }