Skip to content

Commit

Permalink
Disable writeHandlers by default
Browse files Browse the repository at this point in the history
Closes eclipse-vertx#4625

Signed-off-by: Thomas Segismont <[email protected]>
  • Loading branch information
tsegismont committed Feb 28, 2023
1 parent b6d6aae commit 5b8b4c9
Show file tree
Hide file tree
Showing 20 changed files with 368 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, HttpSer
obj.setPerMessageWebSocketCompressionSupported((Boolean)member.getValue());
}
break;
case "registerWebSocketWriteHandlers":
if (member.getValue() instanceof Boolean) {
obj.setRegisterWebSocketWriteHandlers((Boolean)member.getValue());
}
break;
case "tracingPolicy":
if (member.getValue() instanceof String) {
obj.setTracingPolicy(io.vertx.core.tracing.TracingPolicy.valueOf((String)member.getValue()));
Expand Down Expand Up @@ -177,6 +182,7 @@ static void toJson(HttpServerOptions obj, java.util.Map<String, Object> json) {
json.put("maxWebSocketMessageSize", obj.getMaxWebSocketMessageSize());
json.put("perFrameWebSocketCompressionSupported", obj.getPerFrameWebSocketCompressionSupported());
json.put("perMessageWebSocketCompressionSupported", obj.getPerMessageWebSocketCompressionSupported());
json.put("registerWebSocketWriteHandlers", obj.isRegisterWebSocketWriteHandlers());
if (obj.getTracingPolicy() != null) {
json.put("tracingPolicy", obj.getTracingPolicy().name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setAllowOriginHeader((Boolean)member.getValue());
}
break;
case "registerWriteHandlers":
if (member.getValue() instanceof Boolean) {
obj.setRegisterWriteHandlers((Boolean)member.getValue());
}
break;
case "subProtocols":
if (member.getValue() instanceof JsonArray) {
java.util.ArrayList<java.lang.String> list = new java.util.ArrayList<>();
Expand All @@ -50,6 +55,7 @@ public static void toJson(WebSocketConnectOptions obj, JsonObject json) {

public static void toJson(WebSocketConnectOptions obj, java.util.Map<String, Object> json) {
json.put("allowOriginHeader", obj.getAllowOriginHeader());
json.put("registerWriteHandlers", obj.isRegisterWriteHandlers());
if (obj.getSubProtocols() != null) {
JsonArray array = new JsonArray();
obj.getSubProtocols().forEach(item -> array.add(item));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NetClie
obj.setReconnectInterval(((Number)member.getValue()).longValue());
}
break;
case "registerWriteHandler":
if (member.getValue() instanceof Boolean) {
obj.setRegisterWriteHandler((Boolean)member.getValue());
}
break;
}
}
}
Expand All @@ -64,5 +69,6 @@ static void toJson(NetClientOptions obj, java.util.Map<String, Object> json) {
}
json.put("reconnectAttempts", obj.getReconnectAttempts());
json.put("reconnectInterval", obj.getReconnectInterval());
json.put("registerWriteHandler", obj.isRegisterWriteHandler());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json, NetServ
obj.setProxyProtocolTimeoutUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "registerWriteHandler":
if (member.getValue() instanceof Boolean) {
obj.setRegisterWriteHandler((Boolean)member.getValue());
}
break;
case "sni":
if (member.getValue() instanceof Boolean) {
obj.setSni((Boolean)member.getValue());
Expand Down Expand Up @@ -81,6 +86,7 @@ static void toJson(NetServerOptions obj, java.util.Map<String, Object> json) {
if (obj.getProxyProtocolTimeoutUnit() != null) {
json.put("proxyProtocolTimeoutUnit", obj.getProxyProtocolTimeoutUnit().name());
}
json.put("registerWriteHandler", obj.isRegisterWriteHandler());
json.put("sni", obj.isSni());
json.put("useProxyProtocol", obj.isUseProxyProtocol());
}
Expand Down
66 changes: 63 additions & 3 deletions src/main/java/io/vertx/core/http/HttpServerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,31 @@
package io.vertx.core.http;

import io.netty.handler.codec.compression.CompressionOptions;
import io.netty.handler.logging.ByteBufFormat;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.annotations.Unstable;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.*;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.OpenSSLEngineOptions;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.SSLEngineOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.core.tracing.TracingPolicy;
import io.netty.handler.logging.ByteBufFormat;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -149,6 +163,11 @@ public class HttpServerOptions extends NetServerOptions {
*/
public static final TracingPolicy DEFAULT_TRACING_POLICY = TracingPolicy.ALWAYS;

/**
* Whether write-handlers for server websockets should be registered by default = false.
*/
public static final boolean DEFAULT_REGISTER_WEBSOCKET_WRITE_HANDLERS = false;

private boolean compressionSupported;
private int compressionLevel;
private List<CompressionOptions> compressors;
Expand All @@ -173,6 +192,7 @@ public class HttpServerOptions extends NetServerOptions {
private boolean webSocketPreferredClientNoContext;
private int webSocketClosingTimeout;
private TracingPolicy tracingPolicy;
private boolean registerWebSocketWriteHandlers;

/**
* Default constructor
Expand Down Expand Up @@ -214,6 +234,7 @@ public HttpServerOptions(HttpServerOptions other) {
this.webSocketAllowServerNoContext = other.webSocketAllowServerNoContext;
this.webSocketClosingTimeout = other.webSocketClosingTimeout;
this.tracingPolicy = other.tracingPolicy;
this.registerWebSocketWriteHandlers = other.registerWebSocketWriteHandlers;
}

/**
Expand Down Expand Up @@ -262,6 +283,7 @@ private void init() {
webSocketAllowServerNoContext = DEFAULT_WEBSOCKET_ALLOW_SERVER_NO_CONTEXT;
webSocketClosingTimeout = DEFAULT_WEBSOCKET_CLOSING_TIMEOUT;
tracingPolicy = DEFAULT_TRACING_POLICY;
registerWebSocketWriteHandlers = DEFAULT_REGISTER_WEBSOCKET_WRITE_HANDLERS;
}

@Override
Expand Down Expand Up @@ -1030,4 +1052,42 @@ public HttpServerOptions setTracingPolicy(TracingPolicy tracingPolicy) {
this.tracingPolicy = tracingPolicy;
return this;
}

/**
* @return {@code false}, does not apply to HTTP servers
*/
@Override
public boolean isRegisterWriteHandler() {
return false;
}

/**
* Has no effect on HTTP server options.
*/
@Override
public HttpServerOptions setRegisterWriteHandler(boolean registerWriteHandler) {
return this;
}

/**
* @return {@code true} if write-handlers for server websockets should be registered on the {@link io.vertx.core.eventbus.EventBus}, otherwise {@code false}
*/
public boolean isRegisterWebSocketWriteHandlers() {
return registerWebSocketWriteHandlers;
}

/**
* Whether write-handlers for server websockets should be registered on the {@link io.vertx.core.eventbus.EventBus}.
* <p>
* Defaults to {@code false}.
*
* @param registerWebSocketWriteHandlers true to register write-handlers
* @return a reference to this, so the API can be used fluently
* @see WebSocketBase#textHandlerID()
* @see WebSocketBase#binaryHandlerID()
*/
public HttpServerOptions setRegisterWebSocketWriteHandlers(boolean registerWebSocketWriteHandlers) {
this.registerWebSocketWriteHandlers = registerWebSocketWriteHandlers;
return this;
}
}
20 changes: 17 additions & 3 deletions src/main/java/io/vertx/core/http/WebSocketBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

package io.vertx.core.http;

import io.vertx.codegen.annotations.*;
import io.vertx.codegen.annotations.CacheReturn;
import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down Expand Up @@ -63,24 +67,34 @@ public interface WebSocketBase extends ReadStream<Buffer>, WriteStream<Buffer> {
WebSocketBase drainHandler(Handler<Void> handler);

/**
* When a {@code WebSocket} is created it automatically registers an event handler with the event bus - the ID of that
* When a {@code WebSocket} is created, it may register an event handler with the event bus - the ID of that
* handler is given by this method.
* <p>
* By default, no handler is registered, the feature must be enabled via {@link WebSocketConnectOptions#setRegisterWriteHandlers(boolean)} or {@link HttpServerOptions#setRegisterWebSocketWriteHandlers(boolean)}.
* <p>
* Given this ID, a different event loop can send a binary frame to that event handler using the event bus and
* that buffer will be received by this instance in its own event loop and written to the underlying connection. This
* allows you to write data to other WebSockets which are owned by different event loops.
*
* @return the binary handler id
* @see WebSocketConnectOptions#setRegisterWriteHandlers(boolean)
* @see HttpServerOptions#setRegisterWebSocketWriteHandlers(boolean)
*/
String binaryHandlerID();

/**
* When a {@code WebSocket} is created it automatically registers an event handler with the eventbus, the ID of that
* When a {@code WebSocket} is created, it may register an event handler with the eventbus, the ID of that
* handler is given by {@code textHandlerID}.
* <p>
* By default, no handler is registered, the feature must be enabled via {@link WebSocketConnectOptions#setRegisterWriteHandlers(boolean)} or {@link HttpServerOptions#setRegisterWebSocketWriteHandlers(boolean)}.
* <p>
* Given this ID, a different event loop can send a text frame to that event handler using the event bus and
* that buffer will be received by this instance in its own event loop and written to the underlying connection. This
* allows you to write data to other WebSockets which are owned by different event loops.
*
* @return the text handler id
* @see WebSocketConnectOptions#setRegisterWriteHandlers(boolean)
* @see HttpServerOptions#setRegisterWebSocketWriteHandlers(boolean)
*/
String textHandlerID();

Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/vertx/core/http/WebSocketConnectOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,23 @@ public class WebSocketConnectOptions extends RequestOptions {
*/
public static final boolean DEFAULT_ALLOW_ORIGIN_HEADER = true;

/**
* Whether write-handlers should be registered by default = false.
*/
public static final boolean DEFAULT_REGISTER_WRITE_HANDLERS = false;

private ProxyOptions proxyOptions;
private WebsocketVersion version;
private List<String> subProtocols;
private boolean allowOriginHeader;
private boolean registerWriteHandlers;

public WebSocketConnectOptions() {
proxyOptions = DEFAULT_PROXY_OPTIONS;
version = DEFAULT_VERSION;
subProtocols = DEFAULT_SUB_PROTOCOLS;
allowOriginHeader = DEFAULT_ALLOW_ORIGIN_HEADER;
registerWriteHandlers = DEFAULT_REGISTER_WRITE_HANDLERS;
}

public WebSocketConnectOptions(WebSocketConnectOptions other) {
Expand All @@ -66,6 +73,7 @@ public WebSocketConnectOptions(WebSocketConnectOptions other) {
this.version = other.version;
this.subProtocols = other.subProtocols;
this.allowOriginHeader = other.allowOriginHeader;
this.registerWriteHandlers = other.registerWriteHandlers;
}

public WebSocketConnectOptions(JsonObject json) {
Expand Down Expand Up @@ -235,4 +243,26 @@ public JsonObject toJson() {
WebSocketConnectOptionsConverter.toJson(this, json);
return json;
}

/**
* @return {@code true} if write-handlers should be registered on the {@link io.vertx.core.eventbus.EventBus}, otherwise {@code false}
*/
public boolean isRegisterWriteHandlers() {
return registerWriteHandlers;
}

/**
* Whether write-handlers should be registered on the {@link io.vertx.core.eventbus.EventBus}.
* <p>
* Defaults to {@code false}.
*
* @param registerWriteHandlers true to register write-handlers
* @return a reference to this, so the API can be used fluently
* @see WebSocketBase#textHandlerID()
* @see WebSocketBase#binaryHandlerID()
*/
public WebSocketConnectOptions setRegisterWriteHandlers(boolean registerWriteHandlers) {
this.registerWriteHandlers = registerWriteHandlers;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.WebSocket07FrameDecoder;
import io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
Expand Down Expand Up @@ -190,7 +190,7 @@ public long concurrency() {
*/
public NetSocketInternal toNetSocket() {
removeChannelHandlers();
NetSocketImpl socket = new NetSocketImpl(context, chctx, null, metrics());
NetSocketImpl socket = new NetSocketImpl(context, chctx, null, metrics(), false);
socket.metric(metric());
evictionHandler.handle(null);
chctx.pipeline().replace("handler", "handler", VertxHandler.create(ctx -> socket));
Expand Down Expand Up @@ -946,6 +946,7 @@ synchronized void toWebSocket(
WebsocketVersion vers,
List<String> subProtocols,
long handshakeTimeout,
boolean registerWriteHandlers,
int maxWebSocketFrameSize,
Promise<WebSocket> promise) {
try {
Expand Down Expand Up @@ -1008,7 +1009,8 @@ synchronized void toWebSocket(
version != WebSocketVersion.V00,
options.getWebSocketClosingTimeout(),
options.getMaxWebSocketFrameSize(),
options.getMaxWebSocketMessageSize());
options.getMaxWebSocketMessageSize(),
registerWriteHandlers);
w.subProtocol(handshaker.actualSubprotocol());
return w;
});
Expand Down
Loading

0 comments on commit 5b8b4c9

Please sign in to comment.