Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON RPC client metrics #4127

Merged
merged 3 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions packages/api/src/client/utils/httpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ export class HttpClient implements IHttpClient {
// Attach global signal to this request's controller
const onGlobalSignalAbort = controller.abort.bind(controller);
const signalGlobal = this.getAbortSignal?.();
if (signalGlobal) {
signalGlobal.addEventListener("abort", onGlobalSignalAbort);
}
signalGlobal?.addEventListener("abort", onGlobalSignalAbort);

const routeId = opts.routeId; // TODO: Should default to "unknown"?
const timer = this.metrics?.requestTime.startTimer({routeId});
Expand Down Expand Up @@ -113,6 +111,8 @@ export class HttpClient implements IHttpClient {

return await getBody(res);
} catch (e) {
this.metrics?.requestErrors.inc({routeId});

if (isAbortedError(e as Error)) {
if (signalGlobal?.aborted) {
throw new ErrorAborted("REST client");
Expand All @@ -121,18 +121,14 @@ export class HttpClient implements IHttpClient {
} else {
throw Error("Unknown aborted error");
}
} else {
throw e;
}

this.metrics?.errors.inc({routeId});

throw e;
} finally {
timer?.();

clearTimeout(timeout);
if (signalGlobal) {
signalGlobal.removeEventListener("abort", onGlobalSignalAbort);
}
signalGlobal?.removeEventListener("abort", onGlobalSignalAbort);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/api/src/client/utils/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export type Metrics = {
requestTime: IHistogram<"routeId">;
errors: IGauge<"routeId">;
requestErrors: IGauge<"routeId">;
};

type LabelValues<T extends string> = Partial<Record<T, string | number>>;
Expand Down
3 changes: 2 additions & 1 deletion packages/lodestar/src/eth1/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ export class Eth1ForBlockProduction implements IEth1ForBlockProduction {
opts: Eth1Options,
modules: Eth1DepositDataTrackerModules & Eth1MergeBlockTrackerModules & {eth1Provider?: IEth1Provider}
) {
const eth1Provider = modules.eth1Provider || new Eth1Provider(modules.config, opts, modules.signal);
const eth1Provider =
modules.eth1Provider || new Eth1Provider(modules.config, opts, modules.signal, modules.metrics?.eth1HttpClient);

this.eth1DepositDataTracker = opts.disableEth1DepositDataTracker
? null
Expand Down
4 changes: 0 additions & 4 deletions packages/lodestar/src/eth1/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,3 @@ export interface IRpcPayload<P = IJson[]> {
method: string;
params: P;
}

export type ReqOpts = {
timeout?: number;
};
42 changes: 29 additions & 13 deletions packages/lodestar/src/eth1/provider/eth1Provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {Eth1Block, IEth1Provider} from "../interface.js";
import {Eth1Options} from "../options.js";
import {isValidAddress} from "../../util/address.js";
import {EthJsonRpcBlockRaw} from "../interface.js";
import {JsonRpcHttpClient} from "./jsonRpcHttpClient.js";
import {JsonRpcHttpClient, JsonRpcHttpClientMetrics, ReqOpts} from "./jsonRpcHttpClient.js";
import {isJsonRpcTruncatedError, quantityToNum, numToQuantity, dataToBytes} from "./utils.js";

/* eslint-disable @typescript-eslint/naming-convention */
Expand All @@ -37,6 +37,13 @@ interface IEthJsonRpcReturnTypes {
}[];
}

// Define static options once to prevent extra allocations
const getBlocksByNumberOpts: ReqOpts = {routeId: "getBlockByNumber_batched"};
const getBlockByNumberOpts: ReqOpts = {routeId: "getBlockByNumber"};
const getBlockByHashOpts: ReqOpts = {routeId: "getBlockByHash"};
const getBlockNumberOpts: ReqOpts = {routeId: "getBlockNumber"};
const getLogsOpts: ReqOpts = {routeId: "getLogs"};

export class Eth1Provider implements IEth1Provider {
readonly deployBlock: number;
private readonly depositContractAddress: string;
Expand All @@ -45,7 +52,8 @@ export class Eth1Provider implements IEth1Provider {
constructor(
config: Pick<IChainConfig, "DEPOSIT_CONTRACT_ADDRESS">,
opts: Pick<Eth1Options, "depositContractDeployBlock" | "providerUrls" | "jwtSecretHex">,
signal?: AbortSignal
signal?: AbortSignal,
metrics?: JsonRpcHttpClientMetrics | null
) {
this.deployBlock = opts.depositContractDeployBlock ?? 0;
this.depositContractAddress = toHexString(config.DEPOSIT_CONTRACT_ADDRESS);
Expand All @@ -54,6 +62,7 @@ export class Eth1Provider implements IEth1Provider {
// Don't fallback with is truncated error. Throw early and let the retry on this class handle it
shouldNotFallback: isJsonRpcTruncatedError,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics,
});
}

Expand Down Expand Up @@ -113,7 +122,8 @@ export class Eth1Provider implements IEth1Provider {
return Promise.all(
blockRanges.map(([from, to]) =>
this.rpc.fetchBatch<IEthJsonRpcReturnTypes[typeof method]>(
linspace(from, to).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]}))
linspace(from, to).map((blockNumber) => ({method, params: [numToQuantity(blockNumber), false]})),
getBlocksByNumberOpts
)
)
);
Expand All @@ -135,25 +145,28 @@ export class Eth1Provider implements IEth1Provider {
async getBlockByNumber(blockNumber: number | "latest"): Promise<EthJsonRpcBlockRaw | null> {
const method = "eth_getBlockByNumber";
const blockNumberHex = typeof blockNumber === "string" ? blockNumber : numToQuantity(blockNumber);
return await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>({
method,
return await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>(
// false = include only transaction roots, not full objects
params: [blockNumberHex, false],
});
{method, params: [blockNumberHex, false]},
getBlockByNumberOpts
);
}

async getBlockByHash(blockHashHex: string): Promise<EthJsonRpcBlockRaw | null> {
const method = "eth_getBlockByHash";
return await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>({
method,
return await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>(
// false = include only transaction roots, not full objects
params: [blockHashHex, false],
});
{method, params: [blockHashHex, false]},
getBlockByHashOpts
);
}

async getBlockNumber(): Promise<number> {
const method = "eth_blockNumber";
const blockNumberRaw = await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>({method, params: []});
const blockNumberRaw = await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>(
{method, params: []},
getBlockNumberOpts
);
return parseInt(blockNumberRaw, 16);
}

Expand All @@ -174,7 +187,10 @@ export class Eth1Provider implements IEth1Provider {
fromBlock: numToQuantity(options.fromBlock),
toBlock: numToQuantity(options.toBlock),
};
const logsRaw = await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>({method, params: [hexOptions]});
const logsRaw = await this.rpc.fetch<IEthJsonRpcReturnTypes[typeof method]>(
{method, params: [hexOptions]},
getLogsOpts
);
return logsRaw.map((logRaw) => ({
blockNumber: parseInt(logRaw.blockNumber, 16),
data: logRaw.data,
Expand Down
63 changes: 50 additions & 13 deletions packages/lodestar/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import fetch from "cross-fetch";

import {ErrorAborted, TimeoutError} from "@chainsafe/lodestar-utils";
import {IJson, IRpcPayload, ReqOpts} from "../interface.js";
import {IGauge, IHistogram} from "../../metrics/interface.js";
import {IJson, IRpcPayload} from "../interface.js";
import {encodeJwtToken} from "./jwt.js";
/**
* Limits the amount of response text printed with RPC or parsing errors
Expand All @@ -24,20 +25,36 @@ interface IRpcResponseError {
};
}

export type ReqOpts = {
timeout?: number;
// To label request metrics
routeId?: string;
};

export type JsonRpcHttpClientMetrics = {
requestTime: IHistogram<"routeId">;
requestErrors: IGauge<"routeId">;
requestUsedFallbackUrl: IGauge;
activeRequests: IGauge;
configUrlsCount: IGauge;
};

export interface IJsonRpcHttpClient {
fetch<R, P = IJson[]>(payload: IRpcPayload<P>, opts?: ReqOpts): Promise<R>;
fetchBatch<R>(rpcPayloadArr: IRpcPayload[], opts?: ReqOpts): Promise<R[]>;
}

export class JsonRpcHttpClient implements IJsonRpcHttpClient {
private id = 1;
private activeRequests = 0;
/**
* Optional: If provided, use this jwt secret to HS256 encode and add a jwt token in the
* request header which can be authenticated by the RPC server to provide access.
* A fresh token is generated on each requests as EL spec mandates the ELs to check
* the token freshness +-5 seconds (via `iat` property of the token claim)
*/
private jwtSecret?: Uint8Array;
private readonly jwtSecret?: Uint8Array;
private readonly metrics: JsonRpcHttpClientMetrics | null;

constructor(
private readonly urls: string[],
Expand All @@ -52,6 +69,7 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
* and it might deny responses to the RPC requests.
*/
jwtSecret?: Uint8Array;
metrics?: JsonRpcHttpClientMetrics | null;
}
) {
// Sanity check for all URLs to be properly defined. Otherwise it will error in loop on fetch
Expand All @@ -63,7 +81,17 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
throw Error(`JsonRpcHttpClient.urls[${i}] is empty or undefined: ${url}`);
}
}

this.jwtSecret = opts?.jwtSecret;
this.metrics = opts?.metrics ?? null;

// Set config metric gauges once

const metrics = this.metrics;
if (metrics) {
metrics.configUrlsCount.set(urls.length);
metrics.activeRequests.addCollect(() => metrics.activeRequests.set(this.activeRequests));
}
}

/**
Expand Down Expand Up @@ -91,9 +119,13 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
private async fetchJson<R, T = unknown>(json: T, opts?: ReqOpts): Promise<R> {
let lastError: Error | null = null;

for (const url of this.urls) {
for (let i = 0; i < this.urls.length; i++) {
if (i > 0) {
this.metrics?.requestUsedFallbackUrl.inc(1);
}

try {
return await this.fetchJsonOneUrl(url, json, opts);
return await this.fetchJsonOneUrl<R, T>(this.urls[i], json, opts);
} catch (e) {
if (this.opts?.shouldNotFallback?.(e as Error)) {
throw e;
Expand Down Expand Up @@ -124,15 +156,15 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
// - request to http://missing-url.com/ failed, reason: getaddrinfo ENOTFOUND missing-url.com

const controller = new AbortController();
const timeout = setTimeout(() => {
controller.abort();
}, opts?.timeout ?? this.opts?.timeout ?? REQUEST_TIMEOUT);
const timeout = setTimeout(() => controller.abort(), opts?.timeout ?? this.opts?.timeout ?? REQUEST_TIMEOUT);

const onParentSignalAbort = (): void => controller.abort();
this.opts?.signal?.addEventListener("abort", onParentSignalAbort, {once: true});

if (this.opts?.signal) {
this.opts.signal.addEventListener("abort", onParentSignalAbort, {once: true});
}
// Default to "unknown" to prevent mixing metrics with others.
const routeId = opts?.routeId ?? "unknown";
const timer = this.metrics?.requestTime.startTimer({routeId});
this.activeRequests++;

try {
const headers: Record<string, string> = {"Content-Type": "application/json"};
Expand All @@ -154,9 +186,6 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
body: JSON.stringify(json),
headers,
signal: controller.signal,
}).finally(() => {
clearTimeout(timeout);
this.opts?.signal?.removeEventListener("abort", onParentSignalAbort);
});

const body = await res.text();
Expand All @@ -168,6 +197,8 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {

return parseJson(body);
} catch (e) {
this.metrics?.requestErrors.inc({routeId});

if (controller.signal.aborted) {
// controller will abort on both parent signal abort + timeout of this specific request
if (this.opts?.signal?.aborted) {
Expand All @@ -178,6 +209,12 @@ export class JsonRpcHttpClient implements IJsonRpcHttpClient {
} else {
throw e;
}
} finally {
timer?.();
this.activeRequests--;

clearTimeout(timeout);
this.opts?.signal?.removeEventListener("abort", onParentSignalAbort);
}
}
}
Expand Down
44 changes: 23 additions & 21 deletions packages/lodestar/src/executionEngine/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import {
QUANTITY,
quantityToBigint,
} from "../eth1/provider/utils.js";
import {IJsonRpcHttpClient} from "../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, ReqOpts} from "../eth1/provider/jsonRpcHttpClient.js";
import {IMetrics} from "../metrics/index.js";
import {
ExecutePayloadStatus,
ExecutePayloadResponse,
Expand Down Expand Up @@ -46,6 +47,11 @@ export const defaultExecutionEngineHttpOpts: ExecutionEngineHttpOpts = {
timeout: 12000,
};

// Define static options once to prevent extra allocations
const notifyNewPayloadOpts: ReqOpts = {routeId: "notifyNewPayload"};
const forkchoiceUpdatedV1Opts: ReqOpts = {routeId: "forkchoiceUpdated"};
const getPayloadOpts: ReqOpts = {routeId: "getPayload"};

/**
* based on Ethereum JSON-RPC API and inherits the following properties of this standard:
* - Supported communication protocols (HTTP and WebSocket)
Expand All @@ -59,14 +65,13 @@ export class ExecutionEngineHttp implements IExecutionEngine {
readonly payloadIdCache = new PayloadIdCache();
private readonly rpc: IJsonRpcHttpClient;

constructor(opts: ExecutionEngineHttpOpts, signal: AbortSignal, rpc?: IJsonRpcHttpClient) {
this.rpc =
rpc ??
new JsonRpcHttpClient(opts.urls, {
signal,
timeout: opts.timeout,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
});
constructor(opts: ExecutionEngineHttpOpts, signal: AbortSignal, metrics?: IMetrics | null) {
this.rpc = new JsonRpcHttpClient(opts.urls, {
signal,
timeout: opts.timeout,
jwtSecret: opts.jwtSecretHex ? fromHex(opts.jwtSecretHex) : undefined,
metrics: metrics?.executionEnginerHttpClient,
});
}

/**
Expand Down Expand Up @@ -98,10 +103,10 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const method = "engine_newPayloadV1";
const serializedExecutionPayload = serializeExecutionPayload(executionPayload);
const {status, latestValidHash, validationError} = await this.rpc
.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>({
method,
params: [serializedExecutionPayload],
})
.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params: [serializedExecutionPayload]},
notifyNewPayloadOpts
)
// If there are errors by EL like connection refused, internal error, they need to be
// treated seperate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
Expand Down Expand Up @@ -210,10 +215,10 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const {
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
payloadId,
} = await this.rpc.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>({
method,
params: [{headBlockHash: headBlockHashData, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes],
});
} = await this.rpc.fetch<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params: [{headBlockHash: headBlockHashData, safeBlockHash, finalizedBlockHash}, apiPayloadAttributes]},
forkchoiceUpdatedV1Opts
);

switch (status) {
case ExecutePayloadStatus.VALID:
Expand Down Expand Up @@ -263,10 +268,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const executionPayloadRpc = await this.rpc.fetch<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({
method,
params: [payloadId],
});
>({method, params: [payloadId]}, getPayloadOpts);

return parseExecutionPayload(executionPayloadRpc);
}
Expand Down
Loading