Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ksql 5716/close transient queries when network splits #6905

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ public class WSQueryEndpoint {
private final HARouting routing;
private final Optional<LocalCommands> localCommands;

private WebSocketSubscriber<?> subscriber;

// CHECKSTYLE_RULES.OFF: ParameterNumberCheck
public WSQueryEndpoint(
// CHECKSTYLE_RULES.ON: ParameterNumberCheck
Expand Down Expand Up @@ -220,11 +218,6 @@ public void executeStreamQuery(final ServerWebSocket webSocket, final MultiMap r
throw new IllegalArgumentException("Unexpected statement type " + statement);
}

webSocket.closeHandler(v -> {
if (subscriber != null) {
subscriber.close();
}
});
colinhicks marked this conversation as resolved.
Show resolved Hide resolved
} catch (final TopicAuthorizationException e) {
log.debug("Error processing request", e);
SessionUtil.closeSilently(
Expand Down Expand Up @@ -279,16 +272,29 @@ private PreparedStatement<?> parseStatement(final KsqlRequest request) {
}
}

private void attachCloseHandler(final ServerWebSocket websocket,
final WebSocketSubscriber<?> subscriber) {
websocket.closeHandler(v -> {
if (subscriber != null) {
subscriber.close();
log.debug("Websocket {} closed, reason: {}, code: {}",
websocket.textHandlerID(),
websocket.closeReason(),
websocket.closeStatusCode());
}
});
}

private void handleQuery(final RequestContext info, final Query query,
final long startTimeNanos) {
final Map<String, Object> clientLocalProperties = info.request.getConfigOverrides();

final WebSocketSubscriber<StreamedRow> streamSubscriber =
new WebSocketSubscriber<>(info.websocket);
this.subscriber = streamSubscriber;

final PreparedStatement<Query> statement =
PreparedStatement.of(info.request.getKsql(), query);
attachCloseHandler(info.websocket, streamSubscriber);

final PreparedStatement<Query> statement = PreparedStatement.of(info.request.getKsql(), query);

final ConfiguredStatement<Query> configured = ConfiguredStatement
.of(statement, SessionConfig.of(ksqlConfig, clientLocalProperties));
Expand Down Expand Up @@ -328,7 +334,8 @@ private void handlePrintTopic(final RequestContext info, final PrintTopic printT

final WebSocketSubscriber<String> topicSubscriber =
new WebSocketSubscriber<>(info.websocket);
this.subscriber = topicSubscriber;

attachCloseHandler(info.websocket, topicSubscriber);

topicPublisher.start(
exec,
Expand All @@ -339,16 +346,6 @@ private void handlePrintTopic(final RequestContext info, final PrintTopic printT
);
}

private void handleUnsupportedStatement(
final RequestContext ignored,
final Statement statement
) {
throw new IllegalArgumentException(String.format(
"Statement type `%s' not supported for this resource",
statement.getClass().getName()
));
}

private static void startPushQueryPublisher(
final KsqlEngine ksqlEngine,
final ServiceContext serviceContext,
Expand Down