Skip to content

Commit

Permalink
Add JSON RPC client metrics (#4127)
Browse files Browse the repository at this point in the history
* Add JsonRpcHttpClient metrics

* JsonRpcHttpClient metrics add routeId

* Update restApiClient metrics definition
  • Loading branch information
dapplion committed Jun 7, 2022
1 parent 56a6bb4 commit d716b07
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 66 deletions.
16 changes: 6 additions & 10 deletions packages/api/src/client/utils/httpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ export class HttpClient implements IHttpClient {
// Attach global signal to this request's controller
const onGlobalSignalAbort = controller.abort.bind(controller);
const signalGlobal = this.getAbortSignal?.();
if (signalGlobal) {
signalGlobal.addEventListener("abort", onGlobalSignalAbort);
}
signalGlobal?.addEventListener("abort", onGlobalSignalAbort);

const routeId = opts.routeId; // TODO: Should default to "unknown"?
const timer = this.metrics?.requestTime.startTimer({routeId});
Expand Down Expand Up @@ -113,6 +111,8 @@ export class HttpClient implements IHttpClient {

return await getBody(res);
} catch (e) {
this.metrics?.requestErrors.inc({routeId});

if (isAbortedError(e as Error)) {
if (signalGlobal?.aborted) {
throw new ErrorAborted("REST client");
Expand All @@ -121,18 +121,14 @@ export class HttpClient implements IHttpClient {
} else {
throw Error("Unknown aborted error");
}
} else {
throw e;
}

this.metrics?.errors.inc({routeId});

throw e;
} finally {
timer?.();

clearTimeout(timeout);
if (signalGlobal) {
signalGlobal.removeEventListener("abort", onGlobalSignalAbort);
}
signalGlobal?.removeEventListener("abort", onGlobalSignalAbort);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/client/utils/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export type Metrics = {
requestTime: IHistogram<"routeId">;
errors: IGauge<"routeId">;
requestErrors: IGauge<"routeId">;
};

type LabelValues<T extends string> = Partial<Record<T, string | number>>;
Expand Down
3 changes: 2 additions & 1 deletion packages/lodestar/src/eth1/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ export class Eth1ForBlockProduction implements IEth1ForBlockProduction {
opts: Eth1Options,
modules: Eth1DepositDataTrackerModules & Eth1MergeBlockTrackerModules & {eth1Provider?: IEth1Provider}
) {
const eth1Provider = modules.eth1Provider || new Eth1Provider(modules.config, opts, modules.signal);
const eth1Provider =
modules.eth1Provider || new Eth1Provider(modules.config, opts, modules.signal, modules.metrics?.eth1HttpClient);

this.eth1DepositDataTracker = opts.disableEth1DepositDataTracker
? null
Expand Down
4 changes: 0 additions & 4 deletions packages/lodestar/src/eth1/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,3 @@ export interface IRpcPayload<P = IJson[]> {
method: string;
params: P;
}

export type ReqOpts = {
timeout?: number;
};
42 changes: 29 additions & 13 deletions packages/lodestar/src/eth1/provider/eth1Provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {Eth1Block, IEth1Provider} from "../interface.js";
import {Eth1Options} from "../options.js";
import {isValidAddress} from "../../util/address.js";
import {EthJsonRpcBlockRaw} from "../interface.js";
import {JsonRpcHttpClient} from "./jsonRpcHttpClient.js";
import {JsonRpcHttpClient, JsonRpcHttpClientMetrics, ReqOpts} from "./jsonRpcHttpClient.js";
import {isJsonRpcTruncatedError, quantityToNum, numToQuantity, dataToBytes} from "./utils.js";

/* eslint-disable @typescript-eslint/naming-convention */
Expand All @@ -37,6 +37,13 @@ interface IEthJsonRpcReturnTypes {
}[];
}

// Define static options once to prevent extra allocations
const getBlocksByNumberOpts: ReqOpts = {routeId: "getBlockByNumber_batched"};
const getBlockByNumberOpts: ReqOpts = {routeId: "getBlockByNumber"};
const getBlockByHashOpts: ReqOpts = {routeId: "getBlockByHash"};
const getBlockNumberOpts: ReqOpts = {routeId: "getBlockNumber"};
const getLogsOpts: ReqOpts = {routeId: "getLogs"};

export class Eth1Provider implements IEth1Provider {
readonly deployBlock: number;
private readonly depositContractAddress: string;
Expand All @@ -45,7 +52,8 @@ export class Eth1Provider implements IEth1Provider {
constructor(
config: Pick<IChainConfig, "DEPOSIT_CONTRACT_ADDRESS">,
opts: Pick<Eth1Options, "depositContractDeployBlock" | "providerUrls" | "jwtSecretHex">,
signal?: AbortSignal
signal?: AbortSignal,
metrics?: JsonRpcHttpClientMetrics | null
) {
this.deployBlock = opts.depositContractDeployBlock ?? 0;
this.depositContractAddress = toHexString(config.DEPOSIT_CONTRACT_ADDRESS);
Expand All @@ -54,6 +62,7 @@ export class Eth1Provider implements IEth1Provider {
// Don't fallback with is truncated error. Throw early and let the retry on this class handle it
shouldNotFallback: isJsonRpcTruncatedError,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics,
});
}

Expand Down Expand Up @@ -113,7 +122,8 @@ export class Eth1Provider implements IEth1Provider {
return Promise.all(
blockRanges.map(([from, to]) =>
this.rpc.fetchBatch<IEthJsonRpcReturnTypes[typeof method]>(
linspace(from, to).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]}))
linspace(from, to).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]})),
getBlocksByNumberOpts
)
)
);
Expand All @@ -135,25 +145,28 @@ export class Eth1Provider implements IEth1Provider {
async getBlockByNumber(blockNumber: number | "latest"): Promise<EthJsonRpcBlockRaw | null> {
const method = "eth_getBlockByNumber";
const blockNumberHex = typeof blockNumber === "string" ? blockNumber : numToQuantity(blockNumber);
return await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>({
method,
return await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>(
// false = include only transaction roots, not full objects
params: [blockNumberHex, false],
});
{method, params: [blockNumberHex, false]},
getBlockByNumberOpts
);
}

async getBlockByHash(blockHashHex: string): Promise<EthJsonRpcBlockRaw | null> {
const method = "eth_getBlockByHash";
return await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>({
method,
return await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>(
// false = include only transaction roots, not full objects
params: [blockHashHex, false],
});
{method, params: [blockHashHex, false]},
getBlockByHashOpts
);
}

async getBlockNumber(): Promise<number> {
const method = "eth_blockNumber";
const blockNumberRaw = await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>({method, params: []});
const blockNumberRaw = await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>(
{method, params: []},
getBlockNumberOpts
);
return parseInt(blockNumberRaw, 16);
}

Expand All @@ -174,7 +187,10 @@ export class Eth1Provider implements IEth1Provider {
fromBlock: numToQuantity(options.fromBlock),
toBlock: numToQuantity(options.toBlock),
};
const logsRaw = await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>({method, params: [hexOptions]});
const logsRaw = await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>(
{method, params: [hexOptions]},
getLogsOpts
);
return logsRaw.map((logRaw) => ({
blockNumber: parseInt(logRaw.blockNumber, 16),
data: logRaw.data,
Expand Down
63 changes: 50 additions & 13 deletions packages/lodestar/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import fetch from "cross-fetch";

import {ErrorAborted, TimeoutError} from "@chainsafe/lodestar-utils";
import {IJson, IRpcPayload, ReqOpts} from "../interface.js";
import {IGauge, IHistogram} from "../../metrics/interface.js";
import {IJson, IRpcPayload} from "../interface.js";
import {encodeJwtToken} from "./jwt.js";
/**
* Limits the amount of response text printed with RPC or parsing errors
Expand All @@ -24,20 +25,36 @@ interface IRpcResponseError {
};
}

export type ReqOpts = {
timeout?: number;
// To label request metrics
routeId?: string;
};

export type JsonRpcHttpClientMetrics = {
requestTime: IHistogram<"routeId">;
requestErrors: IGauge<"routeId">;
requestUsedFallbackUrl: IGauge;
activeRequests: IGauge;
configUrlsCount: IGauge;
};

export interface IJsonRpcHttpClient {
fetch<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchBatch<R>(rpcPayloadArr: IRpcPayload[], opts?: ReqOpts): Promise<R[]>;
}

export class JsonRpcHttpClient implements IJsonRpcHttpClient {
private id = 1;
private activeRequests = 0;
/**
* Optional: If provided, use this jwt secret to HS256 encode and add a jwt token in the
* request header which can be authenticated by the RPC server to provide access.
* A fresh token is generated on each requests as EL spec mandates the ELs to check
* the token freshness +-5 seconds (via `iat` property of the token claim)
*/
private jwtSecret?: Uint8Array;
private readonly jwtSecret?: Uint8Array;
private readonly metrics: JsonRpcHttpClientMetrics | null;

constructor(
private readonly urls: string[],
Expand All @@ -52,6 +69,7 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* and it might deny responses to the RPC requests.
*/
jwtSecret?: Uint8Array;
metrics?: JsonRpcHttpClientMetrics | null;
}
) {
// Sanity check for all URLs to be properly defined. Otherwise it will error in loop on fetch
Expand All @@ -63,7 +81,17 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
throw Error(`JsonRpcHttpClient.urls[${i}] is empty or undefined: ${url}`);
}
}

this.jwtSecret = opts?.jwtSecret;
this.metrics = opts?.metrics ?? null;

// Set config metric gauges once

const metrics = this.metrics;
if (metrics) {
metrics.configUrlsCount.set(urls.length);
metrics.activeRequests.addCollect(() => metrics.activeRequests.set(this.activeRequests));
}
}

/**
Expand Down Expand Up @@ -91,9 +119,13 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
private async fetchJson<R, T = unknown>(json: T, opts?: ReqOpts): Promise<R> {
let lastError: Error | null = null;

for (const url of this.urls) {
for (let i = 0; i < this.urls.length; i++) {
if (i > 0) {
this.metrics?.requestUsedFallbackUrl.inc(1);
}

try {
return await this.fetchJsonOneUrl(url, json, opts);
return await this.fetchJsonOneUrl<R, T>(this.urls[i], json, opts);
} catch (e) {
if (this.opts?.shouldNotFallback?.(e as Error)) {
throw e;
Expand Down Expand Up @@ -124,15 +156,15 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
// - request to http://missing-url.com/ failed, reason: getaddrinfo ENOTFOUND missing-url.com

const controller = new AbortController();
const timeout = setTimeout(() => {
controller.abort();
}, opts?.timeout ?? this.opts?.timeout ?? REQUEST_TIMEOUT);
const timeout = setTimeout(() => controller.abort(), opts?.timeout ?? this.opts?.timeout ?? REQUEST_TIMEOUT);

const onParentSignalAbort = (): void => controller.abort();
this.opts?.signal?.addEventListener("abort", onParentSignalAbort, {once: true});

if (this.opts?.signal) {
this.opts.signal.addEventListener("abort", onParentSignalAbort, {once: true});
}
// Default to "unknown" to prevent mixing metrics with others.
const routeId = opts?.routeId ?? "unknown";
const timer = this.metrics?.requestTime.startTimer({routeId});
this.activeRequests++;

try {
const headers: Record<string, string> = {"Content-Type": "application/json"};
Expand All @@ -154,9 +186,6 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
body: JSON.stringify(json),
headers,
signal: controller.signal,
}).finally(() => {
clearTimeout(timeout);
this.opts?.signal?.removeEventListener("abort", onParentSignalAbort);
});

const body = await res.text();
Expand All @@ -168,6 +197,8 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {

return parseJson(body);
} catch (e) {
this.metrics?.requestErrors.inc({routeId});

if (controller.signal.aborted) {
// controller will abort on both parent signal abort + timeout of this specific request
if (this.opts?.signal?.aborted) {
Expand All @@ -178,6 +209,12 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
} else {
throw e;
}
} finally {
timer?.();
this.activeRequests--;

clearTimeout(timeout);
this.opts?.signal?.removeEventListener("abort", onParentSignalAbort);
}
}
}
Expand Down
44 changes: 23 additions & 21 deletions packages/lodestar/src/executionEngine/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
QUANTITY,
quantityToBigint,
} from "../eth1/provider/utils.js";
import {IJsonRpcHttpClient} from "../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, ReqOpts} from "../eth1/provider/jsonRpcHttpClient.js";
import {IMetrics} from "../metrics/index.js";
import {
ExecutePayloadStatus,
ExecutePayloadResponse,
Expand Down Expand Up @@ -46,6 +47,11 @@ export const defaultExecutionEngineHttpOpts: ExecutionEngineHttpOpts = {
timeout: 12000,
};

// Define static options once to prevent extra allocations
const notifyNewPayloadOpts: ReqOpts = {routeId: "notifyNewPayload"};
const forkchoiceUpdatedV1Opts: ReqOpts = {routeId: "forkchoiceUpdated"};
const getPayloadOpts: ReqOpts = {routeId: "getPayload"};

/**
* based on Ethereum JSON-RPC API and inherits the following properties of this standard:
* - Supported communication protocols (HTTP and WebSocket)
Expand All @@ -59,14 +65,13 @@ export class ExecutionEngineHttp implements IExecutionEngine {
readonly payloadIdCache = new PayloadIdCache();
private readonly rpc: IJsonRpcHttpClient;

constructor(opts: ExecutionEngineHttpOpts, signal: AbortSignal, rpc?: IJsonRpcHttpClient) {
this.rpc =
rpc ??
new JsonRpcHttpClient(opts.urls, {
signal,
timeout: opts.timeout,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
});
constructor(opts: ExecutionEngineHttpOpts, signal: AbortSignal, metrics?: IMetrics | null) {
this.rpc = new JsonRpcHttpClient(opts.urls, {
signal,
timeout: opts.timeout,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics?.executionEnginerHttpClient,
});
}

/**
Expand Down Expand Up @@ -98,10 +103,10 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const method = "engine_newPayloadV1";
const serializedExecutionPayload = serializeExecutionPayload(executionPayload);
const {status, latestValidHash, validationError} = await this.rpc
.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>({
method,
params: [serializedExecutionPayload],
})
.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params: [serializedExecutionPayload]},
notifyNewPayloadOpts
)
// If there are errors by EL like connection refused, internal error, they need to be
// treated seperate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
Expand Down Expand Up @@ -210,10 +215,10 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const {
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
payloadId,
} = await this.rpc.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>({
method,
params: [{headBlockHash: headBlockHashData, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes],
});
} = await this.rpc.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params: [{headBlockHash: headBlockHashData, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes]},
forkchoiceUpdatedV1Opts
);

switch (status) {
case ExecutePayloadStatus.VALID:
Expand Down Expand Up @@ -263,10 +268,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const executionPayloadRpc = await this.rpc.fetch<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({
method,
params: [payloadId],
});
>({method, params: [payloadId]}, getPayloadOpts);

return parseExecutionPayload(executionPayloadRpc);
}
Expand Down
Loading

0 comments on commit d716b07

Please sign in to comment.