Skip to content

Commit

Permalink
Refactor json-rpc WS handler [hyperledger#535]
Browse files Browse the repository at this point in the history
Signed-off-by: Diego López León <[email protected]>
Signed-off-by: Diego López León <[email protected]>
  • Loading branch information
diega committed Apr 26, 2022
1 parent 7fe18c4 commit e2645ba
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 256 deletions.
18 changes: 17 additions & 1 deletion besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.DefaultAuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.EngineAuthService;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.AuthenticatedJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.BaseJsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcProcessor;
import org.hyperledger.besu.ethereum.api.jsonrpc.health.HealthService;
import org.hyperledger.besu.ethereum.api.jsonrpc.health.LivenessCheck;
import org.hyperledger.besu.ethereum.api.jsonrpc.health.ReadinessCheck;
Expand Down Expand Up @@ -1119,10 +1123,22 @@ private WebSocketService createWebsocketService(
.values()
.forEach(websocketMethodsFactory::addMethods);

final JsonRpcProcessor jsonRpcProcessor;
if (authenticationService.isPresent()) {
jsonRpcProcessor =
new AuthenticatedJsonRpcProcessor(
new BaseJsonRpcProcessor(),
authenticationService.get(),
configuration.getRpcApisNoAuth());
} else {
jsonRpcProcessor = new BaseJsonRpcProcessor();
}
final JsonRpcExecutor jsonRpcExecutor =
new JsonRpcExecutor(jsonRpcProcessor, websocketMethodsFactory.methods());
final WebSocketRequestHandler websocketRequestHandler =
new WebSocketRequestHandler(
vertx,
websocketMethodsFactory.methods(),
jsonRpcExecutor,
besuController.getProtocolManager().ethContext().getScheduler(),
webSocketConfiguration.getTimeoutSec());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,30 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;

import static java.util.stream.Collectors.toList;
import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError.INVALID_REQUEST;

import org.hyperledger.besu.ethereum.api.handlers.IsAliveHandler;
import org.hyperledger.besu.ethereum.api.handlers.RpcMethodTimeoutException;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.execution.JsonRpcExecutor;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponseType;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcUnauthorizedResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketRpcRequest;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.User;
Expand All @@ -68,177 +55,109 @@ public class WebSocketRequestHandler {
.with(Feature.AUTO_CLOSE_TARGET);

private final Vertx vertx;
private final Map<String, JsonRpcMethod> methods;
private final JsonRpcExecutor jsonRpcExecutor;
final EthScheduler ethScheduler;
private final long timeoutSec;

public WebSocketRequestHandler(
final Vertx vertx,
final Map<String, JsonRpcMethod> methods,
final JsonRpcExecutor jsonRpcExecutor,
final EthScheduler ethScheduler,
final long timeoutSec) {
this.vertx = vertx;
this.methods = methods;
this.jsonRpcExecutor = jsonRpcExecutor;
this.ethScheduler = ethScheduler;
this.timeoutSec = timeoutSec;
}

// Only for testing
public void handle(final ServerWebSocket websocket, final String payload) {
handle(Optional.empty(), websocket, payload, Optional.empty(), Collections.emptyList());
}

public void handle(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final String payload,
final Optional<User> user,
final Collection<String> noAuthApiMethods) {
vertx.executeBlocking(
executeHandler(authenticationService, websocket, payload, user, noAuthApiMethods),
false,
resultHandler(websocket));
}

private Handler<Promise<Object>> executeHandler(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final String payload,
final Optional<User> user,
final Collection<String> noAuthApiMethods) {
return future -> {
final String json = payload.trim();
if (!json.isEmpty() && json.charAt(0) == '{') {
final ServerWebSocket websocket, final Buffer buffer, final Optional<User> user) {
if (buffer.length() == 0) {
replyToClient(websocket, errorResponse(null, JsonRpcError.INVALID_REQUEST));
} else {
try {
final JsonObject jsonRpcRequest = buffer.toJsonObject();
vertx
.<JsonRpcResponse>executeBlocking(
promise -> {
try {
final JsonRpcResponse jsonRpcResponse =
jsonRpcExecutor.execute(
user,
null,
null,
new IsAliveHandler(ethScheduler, timeoutSec),
jsonRpcRequest,
req -> {
final WebSocketRpcRequest websocketRequest =
req.mapTo(WebSocketRpcRequest.class);
websocketRequest.setConnectionId(websocket.textHandlerID());
return websocketRequest;
});
promise.complete(jsonRpcResponse);
} catch (RuntimeException e) {
promise.fail(e);
}
})
.onSuccess(jsonRpcResponse -> replyToClient(websocket, jsonRpcResponse))
.onFailure(
throwable -> {
try {
final Integer id = jsonRpcRequest.getInteger("id", null);
replyToClient(websocket, errorResponse(id, JsonRpcError.INTERNAL_ERROR));
} catch (ClassCastException idNotIntegerException) {
replyToClient(websocket, errorResponse(null, JsonRpcError.INTERNAL_ERROR));
}
});
} catch (DecodeException jsonObjectDecodeException) {
try {
handleSingleRequest(
authenticationService,
websocket,
user,
future,
getRequest(payload),
noAuthApiMethods);
} catch (final IllegalArgumentException | DecodeException e) {
LOG.debug("Error mapping json to WebSocketRpcRequest", e);
future.complete(new JsonRpcErrorResponse(null, JsonRpcError.INVALID_REQUEST));
return;
}
} else if (json.length() == 0) {
future.complete(errorResponse(null, INVALID_REQUEST));
return;
} else {
final JsonArray jsonArray = new JsonArray(json);
if (jsonArray.size() < 1) {
future.complete(errorResponse(null, INVALID_REQUEST));
return;
final JsonArray batchJsonRpcRequest = buffer.toJsonArray();
vertx
.<List<JsonRpcResponse>>executeBlocking(
promise -> {
List<JsonRpcResponse> responses = new ArrayList<>();
for (int i = 0; i < batchJsonRpcRequest.size(); i++) {
final JsonObject jsonRequest;
try {
jsonRequest = batchJsonRpcRequest.getJsonObject(i);
} catch (ClassCastException e) {
responses.add(new JsonRpcErrorResponse(null, INVALID_REQUEST));
continue;
}
responses.add(
jsonRpcExecutor.execute(
user,
null,
null,
new IsAliveHandler(ethScheduler, timeoutSec),
jsonRequest,
req -> {
final WebSocketRpcRequest websocketRequest =
req.mapTo(WebSocketRpcRequest.class);
websocketRequest.setConnectionId(websocket.textHandlerID());
return websocketRequest;
}));
}
promise.complete(responses);
})
.onSuccess(
jsonRpcBatchResponse -> {
final JsonRpcResponse[] completed =
jsonRpcBatchResponse.stream()
.filter(
jsonRpcResponse ->
jsonRpcResponse.getType() != JsonRpcResponseType.NONE)
.toArray(JsonRpcResponse[]::new);
replyToClient(websocket, completed);
})
.onFailure(
throwable ->
replyToClient(websocket, errorResponse(null, JsonRpcError.INTERNAL_ERROR)));
} catch (RuntimeException jsonArrayDecodeException) {
replyToClient(websocket, errorResponse(null, JsonRpcError.INTERNAL_ERROR));
}
// handle batch request
LOG.debug("batch request size {}", jsonArray.size());
handleJsonBatchRequest(authenticationService, websocket, jsonArray, user, noAuthApiMethods);
}
};
}

private JsonRpcResponse process(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final Optional<User> user,
final WebSocketRpcRequest requestBody,
final Collection<String> noAuthApiMethods) {

if (!methods.containsKey(requestBody.getMethod())) {
LOG.debug("Can't find method {}", requestBody.getMethod());
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.METHOD_NOT_FOUND);
}
final JsonRpcMethod method = methods.get(requestBody.getMethod());
try {
LOG.debug("WS-RPC request -> {}", requestBody.getMethod());
requestBody.setConnectionId(websocket.textHandlerID());
if (authenticationService.isEmpty()
|| (authenticationService.isPresent()
&& authenticationService.get().isPermitted(user, method, noAuthApiMethods))) {
final JsonRpcRequestContext requestContext =
new JsonRpcRequestContext(
requestBody, user, new IsAliveHandler(ethScheduler, timeoutSec));
return method.response(requestContext);
} else {
return new JsonRpcUnauthorizedResponse(requestBody.getId(), JsonRpcError.UNAUTHORIZED);
}
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.INVALID_PARAMS);
} catch (final RpcMethodTimeoutException e) {
LOG.error(JsonRpcError.TIMEOUT_ERROR.getMessage(), e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.TIMEOUT_ERROR);
} catch (final Exception e) {
LOG.error(JsonRpcError.INTERNAL_ERROR.getMessage(), e);
return new JsonRpcErrorResponse(requestBody.getId(), JsonRpcError.INTERNAL_ERROR);
}
}

private void handleSingleRequest(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final Optional<User> user,
final Promise<Object> future,
final WebSocketRpcRequest requestBody,
final Collection<String> noAuthApiMethods) {
future.complete(process(authenticationService, websocket, user, requestBody, noAuthApiMethods));
}

@SuppressWarnings("rawtypes")
private void handleJsonBatchRequest(
final Optional<AuthenticationService> authenticationService,
final ServerWebSocket websocket,
final JsonArray jsonArray,
final Optional<User> user,
final Collection<String> noAuthApiMethods) {
// Interpret json as rpc request
final List<Future> responses =
jsonArray.stream()
.map(
obj -> {
if (!(obj instanceof JsonObject)) {
return Future.succeededFuture(errorResponse(null, INVALID_REQUEST));
}

final JsonObject req = (JsonObject) obj;
return vertx.<JsonRpcResponse>executeBlocking(
future ->
future.complete(
process(
authenticationService,
websocket,
user,
getRequest(req.toString()),
noAuthApiMethods)));
})
.collect(toList());

CompositeFuture.all(responses)
.onComplete(
(res) -> {
final JsonRpcResponse[] completed =
res.result().list().stream()
.map(JsonRpcResponse.class::cast)
.filter(this::isNonEmptyResponses)
.toArray(JsonRpcResponse[]::new);

replyToClient(websocket, completed);
});
}

private WebSocketRpcRequest getRequest(final String payload) {
return Json.decodeValue(payload, WebSocketRpcRequest.class);
}

private Handler<AsyncResult<Object>> resultHandler(final ServerWebSocket websocket) {
return result -> {
if (result.succeeded()) {
replyToClient(websocket, result.result());
} else {
replyToClient(websocket, new JsonRpcErrorResponse(null, JsonRpcError.INTERNAL_ERROR));
}
};
}

private void replyToClient(final ServerWebSocket websocket, final Object result) {
Expand All @@ -253,8 +172,4 @@ private void replyToClient(final ServerWebSocket websocket, final Object result)
private JsonRpcResponse errorResponse(final Object id, final JsonRpcError error) {
return new JsonRpcErrorResponse(id, error);
}

private boolean isNonEmptyResponses(final JsonRpcResponse result) {
return result.getType() != JsonRpcResponseType.NONE;
}
}
Loading

0 comments on commit e2645ba

Please sign in to comment.