Skip to content

Commit

Permalink
feat: enable usage of a single instance of RequestLimiter for all RPC…
Browse files Browse the repository at this point in the history
… endpoints
  • Loading branch information
powerslider committed Sep 20, 2024
2 parents 978d048 + 7cc36ce commit f03a980
Show file tree
Hide file tree
Showing 15 changed files with 136 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,7 @@ protected LineaOptionsPluginConfiguration getConfigurationByKey(final String key

@Override
public synchronized void register(final BesuContext context) {
final PicoCLIOptions cmdlineOptions =
context
.getService(PicoCLIOptions.class)
.orElseThrow(
() ->
new IllegalStateException(
"Failed to obtain PicoCLI options from the BesuContext"));
final PicoCLIOptions cmdlineOptions = BesuServiceProvider.getPicoCLIOptionsService(context);

getLineaPluginConfigMap()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Consensys Software Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/

package net.consensys.linea.plugins;

import org.hyperledger.besu.plugin.BesuContext;
import org.hyperledger.besu.plugin.services.BesuService;
import org.hyperledger.besu.plugin.services.PicoCLIOptions;
import org.hyperledger.besu.plugin.services.TraceService;

public class BesuServiceProvider {

/**
* Initialize a service of type {@link BesuService}.
*
* @return the initialized {@link BesuService}.
*/
public static <T extends BesuService> T getBesuService(
final BesuContext context, final Class<T> clazz) {
return context
.getService(clazz)
.orElseThrow(
() ->
new RuntimeException(
"Unable to find given Besu service. Please ensure %s is registered."
.formatted(clazz.getName())));
}

public static TraceService getTraceService(final BesuContext context) {
return getBesuService(context, TraceService.class);
}

public static PicoCLIOptions getPicoCLIOptionsService(final BesuContext context) {
return getBesuService(context, PicoCLIOptions.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
import net.consensys.linea.corset.CorsetValidator;
import net.consensys.linea.plugins.BesuServiceProvider;
import org.hyperledger.besu.plugin.BesuContext;
import org.hyperledger.besu.plugin.BesuPlugin;
import org.hyperledger.besu.plugin.services.BesuEvents;
Expand All @@ -45,13 +46,7 @@ public Optional<String> getName() {

@Override
public void register(final BesuContext context) {
final PicoCLIOptions cmdlineOptions =
context
.getService(PicoCLIOptions.class)
.orElseThrow(
() ->
new IllegalStateException(
"Expecting a PicoCLI options to register CLI options with, but none found."));
final PicoCLIOptions cmdlineOptions = BesuServiceProvider.getPicoCLIOptionsService(context);

cmdlineOptions.addPicoCLIOptions(getName().get(), options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.hyperledger.besu.plugin.services.exception.PluginRpcEndpointException;
import org.hyperledger.besu.plugin.services.rpc.PluginRpcRequest;

public class RequestLimiter<T extends PluginRpcRequest, R> {
public class RequestLimiter {

private final Semaphore semaphore;

Expand All @@ -32,7 +32,7 @@ public RequestLimiter(int concurrentRequestsCount) {
this.semaphore = new Semaphore(concurrentRequestsCount);
}

public R execute(T request, Function<T, R> processingFunc) {
public <T extends PluginRpcRequest, R> R execute(T request, Function<T, R> processingFunc) {
if (!semaphore.tryAcquire()) {
throw new PluginRpcEndpointException(
RpcErrorType.INVALID_REQUEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,36 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package net.consensys.linea.plugins.rpc.linecounts;
package net.consensys.linea.plugins.rpc;

import com.google.common.base.MoreObjects;
import net.consensys.linea.plugins.LineaCliOptions;
import picocli.CommandLine;

class LineCountsEndpointCliOptions implements LineaCliOptions {
public class RpcCliOptions implements LineaCliOptions {

static final String CONFIG_KEY = "line-counts-endpoint-config";
public static final String CONFIG_KEY = "line-counts-endpoint-config";

static final String LINE_COUNTS_CONCURRENT_REQUESTS_LIMIT =
"--plugin-linea-line-counts-concurrent-requests-limit";
static final String RPC_CONCURRENT_REQUESTS_LIMIT =
"--plugin-linea-rpc-concurrent-requests-limit";

@CommandLine.Option(
required = true,
names = {LINE_COUNTS_CONCURRENT_REQUESTS_LIMIT},
names = {RPC_CONCURRENT_REQUESTS_LIMIT},
hidden = true,
paramLabel = "<REQUEST_COUNT_LIMIT>",
description = "Number of allowed concurrent requests")
private int concurrentRequestsLimit = 1;

private LineCountsEndpointCliOptions() {}
private RpcCliOptions() {}

/**
* Create Linea cli options.
*
* @return the Linea cli options
*/
static LineCountsEndpointCliOptions create() {
return new LineCountsEndpointCliOptions();
static RpcCliOptions create() {
return new RpcCliOptions();
}

/**
Expand All @@ -50,8 +50,8 @@ static LineCountsEndpointCliOptions create() {
* @param config the config
* @return the Linea cli options
*/
static LineCountsEndpointCliOptions fromConfig(final LineCountsEndpointConfiguration config) {
final LineCountsEndpointCliOptions options = create();
static RpcCliOptions fromConfig(final RpcConfiguration config) {
final RpcCliOptions options = create();
options.concurrentRequestsLimit = config.concurrentRequestsLimit();
return options;
}
Expand All @@ -62,16 +62,14 @@ static LineCountsEndpointCliOptions fromConfig(final LineCountsEndpointConfigura
* @return the Linea factory configuration
*/
@Override
public LineCountsEndpointConfiguration toDomainObject() {
return LineCountsEndpointConfiguration.builder()
.concurrentRequestsLimit(concurrentRequestsLimit)
.build();
public RpcConfiguration toDomainObject() {
return RpcConfiguration.builder().concurrentRequestsLimit(concurrentRequestsLimit).build();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add(LINE_COUNTS_CONCURRENT_REQUESTS_LIMIT, concurrentRequestsLimit)
.add(RPC_CONCURRENT_REQUESTS_LIMIT, concurrentRequestsLimit)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
* SPDX-License-Identifier: Apache-2.0
*/

package net.consensys.linea.plugins.rpc.linecounts;
package net.consensys.linea.plugins.rpc;

import lombok.Builder;
import net.consensys.linea.plugins.LineaOptionsConfiguration;

/** The Linea tracer configuration private to this repo. */
@Builder(toBuilder = true)
public record LineCountsEndpointConfiguration(int concurrentRequestsLimit)
implements LineaOptionsConfiguration {}
public record RpcConfiguration(int concurrentRequestsLimit) implements LineaOptionsConfiguration {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,7 @@

import java.security.InvalidParameterException;

import org.hyperledger.besu.plugin.BesuContext;
import org.hyperledger.besu.plugin.services.TraceService;

public class Requests {

/**
* Initialize the TraceService.
*
* @return the initialized TraceService.
*/
public static TraceService getTraceService(final BesuContext besuContext) {
return besuContext
.getService(TraceService.class)
.orElseThrow(
() ->
new RuntimeException(
"Unable to find trace service. Please ensure TraceService is registered."));
}
public class Validator {

public static void validatePluginRpcRequestParams(final Object[] rawParams) {
// params size should be one, because we expect an object containing all the needed request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import com.google.common.cache.CacheBuilder;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.consensys.linea.plugins.BesuServiceProvider;
import net.consensys.linea.plugins.rpc.RequestLimiter;
import net.consensys.linea.plugins.rpc.Requests;
import net.consensys.linea.plugins.rpc.Validator;
import net.consensys.linea.zktracer.ZkTracer;
import net.consensys.linea.zktracer.json.JsonConverter;
import org.hyperledger.besu.plugin.BesuContext;
Expand All @@ -34,22 +35,20 @@
@Slf4j
@RequiredArgsConstructor
public class GenerateLineCountsV2 {
static final String REQUEST_LIMIT_KEY = "trace-request-limit";

private static final JsonConverter CONVERTER = JsonConverter.builder().build();
private static final int CACHE_SIZE = 10_000;
private static final Cache<Long, Map<String, Integer>> CACHE =
CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();

private final RequestLimiter<PluginRpcRequest, LineCounts> requestLimiter;
private final RequestLimiter requestLimiter;

private TraceService traceService;

public GenerateLineCountsV2(
final BesuContext besuContext, final LineCountsEndpointConfiguration endpointConfiguration) {
this.traceService = Requests.getTraceService(besuContext);
this.requestLimiter =
RequestLimiter.<PluginRpcRequest, LineCounts>builder()
.concurrentRequestsCount(endpointConfiguration.concurrentRequestsLimit())
.build();
public GenerateLineCountsV2(final BesuContext besuContext, final RequestLimiter requestLimiter) {
this.traceService = BesuServiceProvider.getTraceService(besuContext);
this.requestLimiter = requestLimiter;
}

public String getNamespace() {
Expand Down Expand Up @@ -79,7 +78,7 @@ private LineCounts getLineCounts(PluginRpcRequest request) {

final Object[] rawParams = request.getParams();

Requests.validatePluginRpcRequestParams(rawParams);
Validator.validatePluginRpcRequestParams(rawParams);

final LineCountsRequestParams params =
CONVERTER.fromJson(CONVERTER.toJson(rawParams[0]), LineCountsRequestParams.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

import com.google.auto.service.AutoService;
import net.consensys.linea.plugins.AbstractLineaSharedOptionsPlugin;
import net.consensys.linea.plugins.rpc.RequestLimiter;
import net.consensys.linea.plugins.rpc.RequestLimiterDispatcher;
import net.consensys.linea.plugins.rpc.RpcCliOptions;
import net.consensys.linea.plugins.rpc.RpcConfiguration;
import org.hyperledger.besu.plugin.BesuContext;
import org.hyperledger.besu.plugin.BesuPlugin;
import org.hyperledger.besu.plugin.services.RpcEndpointService;
Expand Down Expand Up @@ -55,12 +59,15 @@ public void register(final BesuContext context) {
public void beforeExternalServices() {
super.beforeExternalServices();

final LineCountsEndpointConfiguration endpointConfiguration =
(LineCountsEndpointConfiguration)
getConfigurationByKey(LineCountsEndpointCliOptions.CONFIG_KEY).optionsConfig();
final RpcConfiguration rpcConfiguration =
(RpcConfiguration) getConfigurationByKey(RpcCliOptions.CONFIG_KEY).optionsConfig();

final GenerateLineCountsV2 method =
new GenerateLineCountsV2(besuContext, endpointConfiguration);
RequestLimiterDispatcher.setLimiter(
GenerateLineCountsV2.REQUEST_LIMIT_KEY, rpcConfiguration.concurrentRequestsLimit());
final RequestLimiter reqLimiter =
RequestLimiterDispatcher.getLimiter(GenerateLineCountsV2.REQUEST_LIMIT_KEY);

final GenerateLineCountsV2 method = new GenerateLineCountsV2(besuContext, reqLimiter);
createAndRegister(method, rpcEndpointService);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import com.google.common.base.Stopwatch;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import net.consensys.linea.plugins.BesuServiceProvider;
import net.consensys.linea.plugins.rpc.RequestLimiter;
import net.consensys.linea.plugins.rpc.Requests;
import net.consensys.linea.plugins.rpc.Validator;
import net.consensys.linea.zktracer.ZkTracer;
import net.consensys.linea.zktracer.json.JsonConverter;
import org.hyperledger.besu.plugin.BesuContext;
Expand All @@ -40,22 +41,23 @@
*/
@Slf4j
public class GenerateConflatedTracesV2 {
static final String REQUEST_LIMIT_KEY = "trace-request-limit";

private static final JsonConverter CONVERTER = JsonConverter.builder().build();
private static final String TRACE_FILE_EXTENSION = ".lt";
private static final String TRACE_TEMP_FILE_EXTENSION = ".lt.tmp";

private final RequestLimiter<PluginRpcRequest, TraceFile> requestLimiter;
private final RequestLimiter requestLimiter;

private final Path tracesOutputPath;
private TraceService traceService;
private final TraceService traceService;

public GenerateConflatedTracesV2(
final BesuContext besuContext, final TracesEndpointConfiguration endpointConfiguration) {
this.traceService = Requests.getTraceService(besuContext);
this.requestLimiter =
RequestLimiter.<PluginRpcRequest, TraceFile>builder()
.concurrentRequestsCount(endpointConfiguration.concurrentRequestsLimit())
.build();
final BesuContext besuContext,
final RequestLimiter requestLimiter,
final TracesEndpointConfiguration endpointConfiguration) {
this.traceService = BesuServiceProvider.getTraceService(besuContext);
this.requestLimiter = requestLimiter;

this.tracesOutputPath = Paths.get(endpointConfiguration.tracesOutputPath());
}
Expand Down Expand Up @@ -83,7 +85,7 @@ private TraceFile generateTraceFile(PluginRpcRequest request) {

final Object[] rawParams = request.getParams();

Requests.validatePluginRpcRequestParams(rawParams);
Validator.validatePluginRpcRequestParams(rawParams);

TraceRequestParams params =
CONVERTER.fromJson(CONVERTER.toJson(rawParams[0]), TraceRequestParams.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,6 @@ class TracesEndpointCliOptions implements LineaCliOptions {
description = "Path to where traces will be written")
private String tracesOutputPath = null;

@CommandLine.Option(
required = true,
names = {CONFLATED_TRACE_GENERATION_CONCURRENT_REQUESTS_LIMIT},
hidden = true,
paramLabel = "<REQUEST_COUNT_LIMIT>",
description = "Number of allowed concurrent requests")
private int concurrentRequestsLimit = 1;

private TracesEndpointCliOptions() {}

/**
Expand All @@ -64,7 +56,6 @@ static TracesEndpointCliOptions create() {
static TracesEndpointCliOptions fromConfig(final TracesEndpointConfiguration config) {
final TracesEndpointCliOptions options = create();
options.tracesOutputPath = config.tracesOutputPath();
options.concurrentRequestsLimit = config.concurrentRequestsLimit();
return options;
}

Expand All @@ -75,17 +66,13 @@ static TracesEndpointCliOptions fromConfig(final TracesEndpointConfiguration con
*/
@Override
public TracesEndpointConfiguration toDomainObject() {
return TracesEndpointConfiguration.builder()
.tracesOutputPath(tracesOutputPath)
.concurrentRequestsLimit(concurrentRequestsLimit)
.build();
return TracesEndpointConfiguration.builder().tracesOutputPath(tracesOutputPath).build();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add(CONFLATED_TRACE_GENERATION_TRACES_OUTPUT_PATH, tracesOutputPath)
.add(CONFLATED_TRACE_GENERATION_CONCURRENT_REQUESTS_LIMIT, concurrentRequestsLimit)
.toString();
}
}
Loading

0 comments on commit f03a980

Please sign in to comment.