Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify INetwork Interface #5187

Merged
merged 1 commit into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 11 additions & 36 deletions packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {QueuedStateRegenerator, RegenRequest} from "../../../chain/regen/index.j
import {GossipType} from "../../../network/index.js";
import {IBeaconDb} from "../../../db/interface.js";
import {ApiModules} from "../types.js";
import {formatNodePeer} from "../node/utils.js";

export function getLodestarApi({
chain,
Expand Down Expand Up @@ -60,22 +59,8 @@ export function getLodestarApi({
},

async getGossipQueueItems(gossipType: GossipType | string) {
const jobQueue = network.gossip.jobQueues[gossipType as GossipType];
if (jobQueue === undefined) {
throw Error(`Unknown gossipType ${gossipType}, known values: ${Object.keys(jobQueue).join(", ")}`);
}

return {
data: jobQueue.getItems().map((item) => {
const [topic, message, propagationSource, seenTimestampSec] = item.args;
return {
topic: topic,
propagationSource,
data: message.data,
addedTimeMs: item.addedTimeMs,
seenTimestampSec,
};
}),
data: await network.dumpGossipQueueItems(gossipType),
};
},

Expand Down Expand Up @@ -111,11 +96,13 @@ export function getLodestarApi({
},

async getGossipPeerScoreStats() {
return {data: Object.entries(network.gossip.dumpPeerScoreStats()).map(([peerId, stats]) => ({peerId, ...stats}))};
return {
data: Object.entries(await network.dumpGossipPeerScoreStats()).map(([peerId, stats]) => ({peerId, ...stats})),
};
},

async getLodestarPeerScoreStats() {
return {data: network.peerRpcScores.dumpPeerScoreStats()};
return {data: await network.dumpPeerScoreStats()};
},

async runGC() {
Expand All @@ -141,16 +128,11 @@ export function getLodestarApi({

async getPeers(filters) {
const {state, direction} = filters || {};
const peers = Array.from(network.getConnectionsByPeer().entries())
.map(([peerIdStr, connections]) => ({
...formatNodePeer(peerIdStr, connections),
agentVersion: network.getAgentVersion(peerIdStr),
}))
.filter(
(nodePeer) =>
(!state || state.length === 0 || state.includes(nodePeer.state)) &&
(!direction || direction.length === 0 || (nodePeer.direction && direction.includes(nodePeer.direction)))
);
const peers = (await network.dumpPeers()).filter(
(nodePeer) =>
(!state || state.length === 0 || state.includes(nodePeer.state)) &&
(!direction || direction.length === 0 || (nodePeer.direction && direction.includes(nodePeer.direction)))
);

return {
data: peers,
Expand All @@ -159,15 +141,8 @@ export function getLodestarApi({
},

async discv5GetKadValues() {
const discv5 = network.discv5();
if (!discv5) {
return {
data: [],
};
}

return {
data: (await discv5.kadValues()).map((enr) => enr.encodeTxt()) ?? [],
data: await network.dumpDiscv5KadValues(),
};
},

Expand Down
55 changes: 18 additions & 37 deletions packages/beacon-node/src/api/impl/node/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {routes, ServerApi} from "@lodestar/api";
import {ApiError} from "../errors.js";
import {ApiModules} from "../types.js";
import {ApiOptions} from "../../options.js";
import {formatNodePeer, getRevelantConnection} from "./utils.js";

export function getNodeApi(
opts: ApiOptions,
Expand All @@ -22,28 +21,26 @@ export function getNodeApi(
enr: enr?.encodeTxt() || "",
discoveryAddresses,
p2pAddresses: network.localMultiaddrs.map((m) => m.toString()),
metadata: network.metadata,
metadata: await network.getMetadata(),
},
};
},

async getPeer(peerIdStr) {
const connections = network.getConnectionsByPeer().get(peerIdStr);
if (!connections) {
const peer = await network.dumpPeer(peerIdStr);
if (!peer) {
throw new ApiError(404, "Node has not seen this peer");
}
return {data: formatNodePeer(peerIdStr, connections)};
return {data: peer};
},

async getPeers(filters) {
const {state, direction} = filters || {};
const peers = Array.from(network.getConnectionsByPeer().entries())
.map(([peerIdStr, connections]) => formatNodePeer(peerIdStr, connections))
.filter(
(nodePeer) =>
(!state || state.length === 0 || state.includes(nodePeer.state)) &&
(!direction || direction.length === 0 || (nodePeer.direction && direction.includes(nodePeer.direction)))
);
const peers = (await network.dumpPeers()).filter(
(nodePeer) =>
(!state || state.length === 0 || state.includes(nodePeer.state)) &&
(!direction || direction.length === 0 || (nodePeer.direction && direction.includes(nodePeer.direction)))
);

return {
data: peers,
Expand All @@ -53,35 +50,19 @@ export function getNodeApi(

async getPeerCount() {
// TODO: Implement disconnect count with on-disk persistence
let disconnected = 0;
let connecting = 0;
let connected = 0;
let disconnecting = 0;
const data = {
disconnected: 0,
connecting: 0,
connected: 0,
disconnecting: 0,
};

for (const connections of network.getConnectionsByPeer().values()) {
const relevantConnection = getRevelantConnection(connections);
switch (relevantConnection?.stat.status) {
case "OPEN":
connected++;
break;
case "CLOSING":
disconnecting++;
break;
case "CLOSED":
disconnected++;
break;
default:
connecting++;
}
for (const peer of await network.dumpPeers()) {
data[peer.state]++;
}

return {
data: {
disconnected,
connecting,
connected,
disconnecting,
},
data,
};
},

Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/api/impl/node/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {PeerStatus} from "../../../network/index.js";
* Format a list of connections from libp2p connections manager into the API's format NodePeer
*/
export function formatNodePeer(peerIdStr: string, connections: Connection[]): routes.node.NodePeer {
const conn = getRevelantConnection(connections);
const conn = getRelevantConnection(connections);

return {
peerId: conn ? conn.remotePeer.toString() : peerIdStr,
Expand All @@ -24,7 +24,7 @@ export function formatNodePeer(peerIdStr: string, connections: Connection[]): ro
* - Otherwise, the first closing connection
* - Otherwise, the first closed connection
*/
export function getRevelantConnection(connections: Connection[]): Connection | null {
export function getRelevantConnection(connections: Connection[]): Connection | null {
const byStatus = new Map<PeerStatus, Connection>();
for (const conn of connections) {
if (conn.stat.status === "OPEN") return conn;
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ export function getValidatorApi({
async prepareBeaconCommitteeSubnet(subscriptions) {
notWhileSyncing();

network.prepareBeaconCommitteeSubnet(
await network.prepareBeaconCommitteeSubnet(
subscriptions.map(({validatorIndex, slot, isAggregator, committeesAtSlot, committeeIndex}) => ({
validatorIndex: validatorIndex,
subnet: computeSubnetForCommitteesAtSlot(slot, committeesAtSlot, committeeIndex),
Expand Down Expand Up @@ -670,7 +670,7 @@ export function getValidatorApi({
}
}

network.prepareSyncCommitteeSubnets(subs);
await network.prepareSyncCommitteeSubnets(subs);

if (metrics) {
for (const subscription of subscriptions) {
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
GossipTypeMap,
ValidatorFnsByType,
GossipHandlers,
GossipBeaconNode,
} from "./interface.js";
import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic, getCoreTopicsAtFork} from "./topic.js";
import {DataTransformSnappy, fastMsgIdFn, msgIdFn, msgIdToStrFn} from "./encoding.js";
Expand Down Expand Up @@ -73,7 +74,7 @@ export type Eth2GossipsubOpts = {
*
* See https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
*/
export class Eth2Gossipsub extends GossipSub {
export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode {
readonly jobQueues: GossipJobQueues;
readonly scoreParams: Partial<PeerScoreParams>;
private readonly config: BeaconConfig;
Expand Down
15 changes: 15 additions & 0 deletions packages/beacon-node/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,21 @@ export type GossipModules = {
chain: IBeaconChain;
};

export type GossipBeaconNode = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this rather be an interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely could be an interface. I don't know that we have clear project-wide opinions on the use of one vs the either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO if it is implemented by a class within the project directly or even outside in an external library I would use interface otherwise should just use type

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, type here is a bit sloppy. Can clean it up in a future PR.
I'm continuing to do some work to clean this up, aiming to move the network to a worker, so will continue touching these pieces of code.

publishBeaconBlock(signedBlock: allForks.SignedBeaconBlock): Promise<void>;
publishSignedBeaconBlockAndBlobsSidecar(item: deneb.SignedBeaconBlockAndBlobsSidecar): Promise<void>;
publishBeaconAggregateAndProof(aggregateAndProof: phase0.SignedAggregateAndProof): Promise<number>;
publishBeaconAttestation(attestation: phase0.Attestation, subnet: number): Promise<number>;
publishVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): Promise<void>;
publishBlsToExecutionChange(blsToExecutionChange: capella.SignedBLSToExecutionChange): Promise<void>;
publishProposerSlashing(proposerSlashing: phase0.ProposerSlashing): Promise<void>;
publishAttesterSlashing(attesterSlashing: phase0.AttesterSlashing): Promise<void>;
publishSyncCommitteeSignature(signature: altair.SyncCommitteeMessage, subnet: number): Promise<void>;
publishContributionAndProof(contributionAndProof: altair.SignedContributionAndProof): Promise<void>;
publishLightClientFinalityUpdate(lightClientFinalityUpdate: allForks.LightClientFinalityUpdate): Promise<void>;
publishLightClientOptimisticUpdate(lightClientOptimisitcUpdate: allForks.LightClientOptimisticUpdate): Promise<void>;
};

/**
* Contains various methods for validation of incoming gossip topic data.
* The conditions for valid gossip topics and how they are handled are specified here:
Expand Down
49 changes: 26 additions & 23 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,64 @@ import {Multiaddr} from "@multiformats/multiaddr";
import {PeerId} from "@libp2p/interface-peer-id";
import {ConnectionManager} from "@libp2p/interface-connection-manager";
import {SignableENR} from "@chainsafe/discv5";
import {phase0} from "@lodestar/types";
import {altair, phase0} from "@lodestar/types";
import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/score";
import {routes} from "@lodestar/api";
import {BlockInput} from "../chain/blocks/types.js";
import {INetworkEventBus} from "./events.js";
import {Eth2Gossipsub} from "./gossip/index.js";
import {MetadataController} from "./metadata.js";
import {IPeerRpcScoreStore, PeerAction} from "./peers/index.js";
import {GossipBeaconNode} from "./gossip/index.js";
import {PeerAction, PeerScoreStats} from "./peers/index.js";
import {IReqRespBeaconNode} from "./reqresp/ReqRespBeaconNode.js";
import {IAttnetsService, SubnetsService, CommitteeSubscription} from "./subnets/index.js";
import {Discv5Worker} from "./discv5/index.js";
import {CommitteeSubscription} from "./subnets/index.js";

export type PeerSearchOptions = {
supportsProtocols?: string[];
count?: number;
};

export interface INetwork {
events: INetworkEventBus;
reqResp: IReqRespBeaconNode;
attnetsService: IAttnetsService;
syncnetsService: SubnetsService;
gossip: Eth2Gossipsub;
discv5(): Discv5Worker | undefined;
metadata: MetadataController;
peerRpcScores: IPeerRpcScoreStore;
/** Our network identity */
peerId: PeerId;
localMultiaddrs: Multiaddr[];

events: INetworkEventBus;
reqResp: IReqRespBeaconNode;
gossip: GossipBeaconNode;

getEnr(): Promise<SignableENR | undefined>;
getConnectionsByPeer(): Map<string, Connection[]>;
getMetadata(): Promise<altair.Metadata>;
getConnectedPeers(): PeerId[];
hasSomeConnectedPeer(): boolean;
getConnectedPeerCount(): number;

publishBeaconBlockMaybeBlobs(signedBlock: BlockInput): Promise<void>;
beaconBlocksMaybeBlobsByRange(peerId: PeerId, request: phase0.BeaconBlocksByRangeRequest): Promise<BlockInput[]>;
beaconBlocksMaybeBlobsByRoot(peerId: PeerId, request: phase0.BeaconBlocksByRootRequest): Promise<BlockInput[]>;

/** Subscribe, search peers, join long-lived attnets */
prepareBeaconCommitteeSubnet(subscriptions: CommitteeSubscription[]): void;
prepareBeaconCommitteeSubnet(subscriptions: CommitteeSubscription[]): Promise<void>;
/** Subscribe, search peers, join long-lived syncnets */
prepareSyncCommitteeSubnets(subscriptions: CommitteeSubscription[]): void;
reStatusPeers(peers: PeerId[]): void;
reportPeer(peer: PeerId, action: PeerAction, actionName: string): void;
prepareSyncCommitteeSubnets(subscriptions: CommitteeSubscription[]): Promise<void>;
reStatusPeers(peers: PeerId[]): Promise<void>;
reportPeer(peer: PeerId, action: PeerAction, actionName: string): Promise<void>;

// Gossip handler
subscribeGossipCoreTopics(): void;
unsubscribeGossipCoreTopics(): void;
subscribeGossipCoreTopics(): Promise<void>;
unsubscribeGossipCoreTopics(): Promise<void>;
isSubscribedToGossipCoreTopics(): boolean;

// Service
metrics(): Promise<string>;
close(): Promise<void>;

// Debug
connectToPeer(peer: PeerId, multiaddr: Multiaddr[]): Promise<void>;
disconnectPeer(peer: PeerId): Promise<void>;
getAgentVersion(peerIdStr: string): string;
dumpPeers(): Promise<routes.lodestar.LodestarNodePeer[]>;
dumpPeer(peerIdStr: string): Promise<routes.lodestar.LodestarNodePeer | undefined>;
dumpPeerScoreStats(): Promise<PeerScoreStats>;
dumpGossipPeerScoreStats(): Promise<PeerScoreStatsDump>;
dumpGossipQueueItems(gossipType: string): Promise<routes.lodestar.GossipQueueItem[]>;
dumpDiscv5KadValues(): Promise<string[]>;
}

export type PeerDirection = Connection["stat"]["direction"];
Expand Down
Loading