diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java index 4fcf4149d012..487a871cd2c6 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/QueryStreamHandler.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,7 @@ /** * Handles requests to the query-stream endpoint */ +@SuppressWarnings({"ClassDataAbstractionCoupling"}) public class QueryStreamHandler implements Handler { private static final Logger log = LoggerFactory.getLogger(QueryStreamHandler.class); @@ -172,6 +174,11 @@ private void handleQueryPublisher( Optional completionMessage = Optional.empty(); Optional limitMessage = Optional.of("Limit Reached"); boolean bufferOutput = false; + // The end handler can be called twice if the connection is closed by the client. The + // call to response.end() resulting from queryPublisher.close() may result in a second + // call to the end handler, which will mess up metrics, so we ensure that this called just + // once by keeping track of the calls. + final AtomicBoolean endedResponse = new AtomicBoolean(false); if (queryPublisher.isPullQuery()) { metadata = new QueryResponseMetadata( @@ -184,6 +191,10 @@ private void handleQueryPublisher( // When response is complete, publisher should be closed routingContext.response().endHandler(v -> { + if (endedResponse.getAndSet(true)) { + log.warn("Connection already closed so just returning"); + return; + } queryPublisher.close(); metricsCallbackHolder.reportMetrics( routingContext.response().getStatusCode(), @@ -199,6 +210,10 @@ private void handleQueryPublisher( preparePushProjectionSchema(queryPublisher.geLogicalSchema())); routingContext.response().endHandler(v -> { + if (endedResponse.getAndSet(true)) { + log.warn("Connection already closed so just returning"); + return; + } queryPublisher.close(); metricsCallbackHolder.reportMetrics( routingContext.response().getStatusCode(), @@ -219,6 +234,10 @@ private void handleQueryPublisher( // When response is complete, publisher should be closed and query unregistered routingContext.response().endHandler(v -> { + if (endedResponse.getAndSet(true)) { + log.warn("Connection already closed so just returning"); + return; + } query.close(); metricsCallbackHolder.reportMetrics( routingContext.response().getStatusCode(), diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/QueryStreamHandlerTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/QueryStreamHandlerTest.java index f889c55b1ef2..d810c917c61c 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/QueryStreamHandlerTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/QueryStreamHandlerTest.java @@ -19,6 +19,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -155,4 +156,41 @@ public void shouldSucceed_pushQuery() { assertThat(subscriber.getValue(), notNullValue()); verify(pushQueryHolder).close(); } + + @Test + public void shouldSucceed_scalablePushQuery() { + // Given: + when(queryPublisher.isPullQuery()).thenReturn(false); + when(queryPublisher.isScalablePushQuery()).thenReturn(true); + final QueryStreamArgs req = new QueryStreamArgs("select * from foo emit changes;", + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + givenRequest(req); + + // When: + handler.handle(routingContext); + endHandler.getValue().handle(null); + + // Then: + assertThat(subscriber.getValue(), notNullValue()); + verify(queryPublisher).close(); + } + + @Test + public void verifyEndHandlerNotCalledTwice_scalablePushQuery() { + // Given: + when(queryPublisher.isPullQuery()).thenReturn(false); + when(queryPublisher.isScalablePushQuery()).thenReturn(true); + final QueryStreamArgs req = new QueryStreamArgs("select * from foo emit changes;", + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + givenRequest(req); + + // When: + handler.handle(routingContext); + endHandler.getValue().handle(null); + endHandler.getValue().handle(null); + + // Then: + assertThat(subscriber.getValue(), notNullValue()); + verify(queryPublisher, times(1)).close(); + } }