Skip to content

Commit

Permalink
fix: remove mutable subscriber field on an endpoint
Browse files Browse the repository at this point in the history
This is actually a pretty scary bug. WSQueryEndpoint is a singleton, but each call to executeStreamQuery handles a new websocket connection. What ended up happening is whenever we handled a query, we updated the subscription on the singleton. As a result whenever a websocket closed, we only closed the query that was currently attached to WSQueryEndpoint and lexically closed in the lambda passed into ServerWebsocket.handleClose
  • Loading branch information
swist committed Jan 27, 2021
1 parent f211cc9 commit 3ea65b6
Showing 1 changed file with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ public class WSQueryEndpoint {
private final HARouting routing;
private final Optional<LocalCommands> localCommands;

private WebSocketSubscriber<?> subscriber;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public WSQueryEndpoint(
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
Expand Down Expand Up @@ -220,11 +218,6 @@ public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap r
throw new IllegalArgumentException("Unexpected statement type " + statement);
}

webSocket.closeHandler(v -> {
if (subscriber != null) {
subscriber.close();
}
});
} catch (final TopicAuthorizationException e) {
log.debug("Error processing request", e);
SessionUtil.closeSilently(
Expand Down Expand Up @@ -279,16 +272,29 @@ private PreparedStatement<?> parseStatement(final KsqlRequest request) {
}
}

private void attachCloseHandler(final ServerWebSocket websocket,
final WebSocketSubscriber<?> subscriber) {
websocket.closeHandler(v -> {
if (subscriber != null) {
subscriber.close();
log.debug("Websocket {} closed, reason: {}, code: {}",
websocket.textHandlerID(),
websocket.closeReason(),
websocket.closeStatusCode());
}
});
}

private void handleQuery(final RequestContext info, final Query query,
final long startTimeNanos) {
final Map<String, Object> clientLocalProperties = info.request.getConfigOverrides();

final WebSocketSubscriber<StreamedRow> streamSubscriber =
new WebSocketSubscriber<>(info.websocket);
this.subscriber = streamSubscriber;

final PreparedStatement<Query> statement =
PreparedStatement.of(info.request.getKsql(), query);
attachCloseHandler(info.websocket, streamSubscriber);

final PreparedStatement<Query> statement = PreparedStatement.of(info.request.getKsql(), query);

final ConfiguredStatement<Query> configured = ConfiguredStatement
.of(statement, SessionConfig.of(ksqlConfig, clientLocalProperties));
Expand Down Expand Up @@ -328,7 +334,8 @@ private void handlePrintTopic(final RequestContext info, final PrintTopic printT

final WebSocketSubscriber<String> topicSubscriber =
new WebSocketSubscriber<>(info.websocket);
this.subscriber = topicSubscriber;

attachCloseHandler(info.websocket, topicSubscriber);

topicPublisher.start(
exec,
Expand All @@ -339,16 +346,6 @@ private void handlePrintTopic(final RequestContext info, final PrintTopic printT
);
}

private void handleUnsupportedStatement(
final RequestContext ignored,
final Statement statement
) {
throw new IllegalArgumentException(String.format(
"Statement type `%s' not supported for this resource",
statement.getClass().getName()
));
}

private static void startPushQueryPublisher(
final KsqlEngine ksqlEngine,
final ServiceContext serviceContext,
Expand Down

0 comments on commit 3ea65b6

Please sign in to comment.