From 3ea65b68046cea34dbf7e282b14b9869e3dc176e Mon Sep 17 00:00:00 2001 From: Tomasz Nguyen Date: Tue, 26 Jan 2021 21:43:04 +0000 Subject: [PATCH] fix: remove mutable subscriber field on an endpoint This is actually a pretty scary bug. WSQueryEndpoint is a singleton, but each call to executeStreamQuery handles a new websocket connection. What ended up happening is whenever we handled a query, we updated the subscription on the singleton. As a result whenever a websocket closed, we only closed the query that was currently attached to WSQueryEndpoint and lexically closed in the lambda passed into ServerWebsocket.handleClose --- .../resources/streaming/WSQueryEndpoint.java | 39 +++++++++---------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java index 8e46239fb1ec..09f34b3e9229 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/WSQueryEndpoint.java @@ -81,8 +81,6 @@ public class WSQueryEndpoint { private final HARouting routing; private final Optional localCommands; - private WebSocketSubscriber subscriber; - // CHECKSTYLE_RULES.OFF: ParameterNumberCheck public WSQueryEndpoint( // CHECKSTYLE_RULES.ON: ParameterNumberCheck @@ -220,11 +218,6 @@ public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap r throw new IllegalArgumentException("Unexpected statement type " + statement); } - webSocket.closeHandler(v -> { - if (subscriber != null) { - subscriber.close(); - } - }); } catch (final TopicAuthorizationException e) { log.debug("Error processing request", e); SessionUtil.closeSilently( @@ -279,16 +272,29 @@ private PreparedStatement parseStatement(final KsqlRequest request) { } } + private void attachCloseHandler(final ServerWebSocket websocket, + final WebSocketSubscriber subscriber) { + websocket.closeHandler(v -> { + if (subscriber != null) { + subscriber.close(); + log.debug("Websocket {} closed, reason: {}, code: {}", + websocket.textHandlerID(), + websocket.closeReason(), + websocket.closeStatusCode()); + } + }); + } + private void handleQuery(final RequestContext info, final Query query, final long startTimeNanos) { final Map clientLocalProperties = info.request.getConfigOverrides(); final WebSocketSubscriber streamSubscriber = new WebSocketSubscriber<>(info.websocket); - this.subscriber = streamSubscriber; - final PreparedStatement statement = - PreparedStatement.of(info.request.getKsql(), query); + attachCloseHandler(info.websocket, streamSubscriber); + + final PreparedStatement statement = PreparedStatement.of(info.request.getKsql(), query); final ConfiguredStatement configured = ConfiguredStatement .of(statement, SessionConfig.of(ksqlConfig, clientLocalProperties)); @@ -328,7 +334,8 @@ private void handlePrintTopic(final RequestContext info, final PrintTopic printT final WebSocketSubscriber topicSubscriber = new WebSocketSubscriber<>(info.websocket); - this.subscriber = topicSubscriber; + + attachCloseHandler(info.websocket, topicSubscriber); topicPublisher.start( exec, @@ -339,16 +346,6 @@ private void handlePrintTopic(final RequestContext info, final PrintTopic printT ); } - private void handleUnsupportedStatement( - final RequestContext ignored, - final Statement statement - ) { - throw new IllegalArgumentException(String.format( - "Statement type `%s' not supported for this resource", - statement.getClass().getName() - )); - } - private static void startPushQueryPublisher( final KsqlEngine ksqlEngine, final ServiceContext serviceContext,