Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: remove chain mocks #5582

Merged
merged 5 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,18 @@ export class NetworkCore implements INetworkCore {

// Must goodbye and disconnect before stopping libp2p
await this.peerManager.goodbyeAndDisconnectAllPeers();
this.logger.debug("network sent goodbye to all peers");
await this.peerManager.stop();
this.logger.debug("network peerManager closed");
await this.gossip.stop();

this.logger.debug("network gossip closed");
await this.reqResp.stop();
await this.reqResp.unregisterAllProtocols();

this.logger.debug("network reqResp closed");
this.attnetsService.stop();
this.syncnetsService.stop();
await this.libp2p.stop();
this.logger.debug("network lib2p closed");

this.closed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ export class WorkerNetworkCore implements INetworkCore {

async close(): Promise<void> {
await this.getApi().close();
this.modules.logger.debug("terminating network worker");
await Thread.terminate(this.modules.workerApi as unknown as Thread);
this.modules.logger.debug("terminated network worker");
}

async test(): Promise<void> {
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {PeerAction} from "./peers/index.js";
*/

export interface INetwork extends INetworkCorePublic {
readonly closed: boolean;
events: INetworkEventBus;

getConnectedPeers(): PeerIdStr[];
Expand Down
22 changes: 11 additions & 11 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type NetworkModules = {
config: BeaconConfig;
logger: LoggerNode;
chain: IBeaconChain;
signal: AbortSignal;
networkEventBus: NetworkEventBus;
aggregatorTracker: AggregatorTracker;
networkProcessor: NetworkProcessor;
Expand All @@ -54,7 +53,6 @@ export type NetworkInitModules = {
chain: IBeaconChain;
db: IBeaconDb;
getReqRespHandler: GetReqRespHandlerFn;
signal: AbortSignal;
// Optionally pass custom GossipHandlers, for testing
gossipHandlers?: GossipHandlers;
};
Expand All @@ -76,7 +74,8 @@ export class Network implements INetwork {
private readonly config: BeaconConfig;
private readonly clock: IClock;
private readonly chain: IBeaconChain;
private readonly signal: AbortSignal;
// Used only for sleep() statements
private readonly controller: AbortController;

// TODO: Review
private readonly networkProcessor: NetworkProcessor;
Expand All @@ -86,15 +85,14 @@ export class Network implements INetwork {
private subscribedToCoreTopics = false;
private connectedPeers = new Set<PeerIdStr>();
private regossipBlsChangesPromise: Promise<void> | null = null;
private closed = false;

constructor(modules: NetworkModules) {
this.peerId = modules.peerId;
this.config = modules.config;
this.logger = modules.logger;
this.chain = modules.chain;
this.clock = modules.chain.clock;
this.signal = modules.signal;
this.controller = new AbortController();
this.events = modules.networkEventBus;
this.networkProcessor = modules.networkProcessor;
this.core = modules.core;
Expand All @@ -105,7 +103,6 @@ export class Network implements INetwork {
this.chain.emitter.on(routes.events.EventType.head, this.onHead);
this.chain.emitter.on(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.on(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
modules.signal.addEventListener("abort", this.close.bind(this), {once: true});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a terrible idea, debugging tests I discovered that close() is being called twice:

  • first time through AbortController, not awaited, uncontrolled
  • second time via close() but since it's second time it has no effect

This broke the expected close flow from the caller, causing issues in subsequent tests

}

static async init({
Expand All @@ -115,7 +112,6 @@ export class Network implements INetwork {
metrics,
chain,
db,
signal,
gossipHandlers,
peerId,
peerStoreDir,
Expand Down Expand Up @@ -176,26 +172,30 @@ export class Network implements INetwork {
config,
logger,
chain,
signal,
networkEventBus: events,
aggregatorTracker,
networkProcessor,
core,
});
}

get closed(): boolean {
return this.controller.signal.aborted;
}

/** Destroy this instance. Can only be called once. */
async close(): Promise<void> {
if (this.closed) return;
// Used only for sleep() statements
this.controller.abort();

this.events.off(NetworkEvent.peerConnected, this.onPeerConnected);
this.events.off(NetworkEvent.peerDisconnected, this.onPeerDisconnected);
this.chain.emitter.off(routes.events.EventType.head, this.onHead);
this.chain.emitter.off(routes.events.EventType.lightClientFinalityUpdate, this.onLightClientFinalityUpdate);
this.chain.emitter.off(routes.events.EventType.lightClientOptimisticUpdate, this.onLightClientOptimisticUpdate);
await this.core.close();

this.closed = true;
this.logger.debug("network core closed");
}

async scrapeMetrics(): Promise<string> {
Expand Down Expand Up @@ -583,7 +583,7 @@ export class Network implements INetwork {
private waitOneThirdOfSlot = async (slot: number): Promise<void> => {
const secAtSlot = computeTimeAtSlot(this.config, slot + 1 / 3, this.chain.genesisTime);
const msToSlot = secAtSlot * 1000 - Date.now();
await sleep(msToSlot, this.signal);
await sleep(msToSlot, this.controller.signal);
};

private onHead = async (): Promise<void> => {
Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {BitArray} from "@chainsafe/ssz";
import {SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {BeaconConfig} from "@lodestar/config";
import {allForks, altair, phase0} from "@lodestar/types";
import {withTimeout} from "@lodestar/utils";
import {LoggerNode} from "@lodestar/logger/node";
import {GoodByeReasonCode, GOODBYE_KNOWN_CODES, Libp2pEvent} from "../../constants/index.js";
import {IClock} from "../../util/clock.js";
Expand Down Expand Up @@ -642,7 +643,8 @@ export class PeerManager {
this.metrics?.peerLongConnectionDisconnect.inc({reason});
}

await this.reqResp.sendGoodbye(peer, BigInt(goodbye));
// Wrap with shorter timeout than regular ReqResp requests to speed up shutdown
await withTimeout(() => this.reqResp.sendGoodbye(peer, BigInt(goodbye)), 1_000);
} catch (e) {
this.logger.verbose("Failed to send goodbye", {peer: prettyPrintPeerId(peer)}, e as Error);
} finally {
Expand Down
1 change: 0 additions & 1 deletion packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ export class BeaconNode {
peerId,
peerStoreDir,
getReqRespHandler: getReqRespHandlers({db, chain}),
signal,
});

const sync = new BeaconSync(opts.sync, {
Expand Down
138 changes: 38 additions & 100 deletions packages/beacon-node/test/e2e/network/gossipsub.test.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,25 @@
import sinon from "sinon";
import {expect} from "chai";
import {createBeaconConfig, createChainForkConfig, defaultChainConfig} from "@lodestar/config";
import {createChainForkConfig, defaultChainConfig} from "@lodestar/config";
import {sleep} from "@lodestar/utils";

import {computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {ssz} from "@lodestar/types";
import {getReqRespHandlers, Network, NetworkInitModules} from "../../../src/network/index.js";
import {defaultNetworkOptions, NetworkOptions} from "../../../src/network/options.js";
import {Network} from "../../../src/network/index.js";
import {GossipType, GossipHandlers} from "../../../src/network/gossip/index.js";
import {connect, onPeerConnect, getNetworkForTest} from "../../utils/network.js";

describe("gossipsub / main thread", function () {
runTests.bind(this)({useWorker: false});
});

import {MockBeaconChain, zeroProtoBlock} from "../../utils/mocks/chain/chain.js";
import {createNetworkModules, connect, onPeerConnect} from "../../utils/network.js";
import {generateState} from "../../utils/state.js";
import {StubbedBeaconDb} from "../../utils/stub/index.js";
import {testLogger} from "../../utils/logger.js";

const multiaddr = "/ip4/127.0.0.1/tcp/0";

const opts: NetworkOptions = {
...defaultNetworkOptions,
maxPeers: 1,
targetPeers: 1,
bootMultiaddrs: [],
localMultiaddrs: [],
discv5FirstQueryDelayMs: 0,
discv5: null,
skipParamsLog: true,
};

// Schedule all forks at ALTAIR_FORK_EPOCH to avoid generating the pubkeys cache
/* eslint-disable @typescript-eslint/naming-convention */
const config = createChainForkConfig({
...defaultChainConfig,
ALTAIR_FORK_EPOCH: 1,
BELLATRIX_FORK_EPOCH: 1,
CAPELLA_FORK_EPOCH: 1,
describe("gossipsub / worker", function () {
runTests.bind(this)({useWorker: true});
});
const START_SLOT = computeStartSlotAtEpoch(config.ALTAIR_FORK_EPOCH);

describe("gossipsub", function () {
if (this.timeout() < 15 * 1000) this.timeout(15 * 1000);
this.retries(2); // This test fail sometimes, with a 5% rate.
/* eslint-disable mocha/no-top-level-hooks */

function runTests(this: Mocha.Suite, {useWorker}: {useWorker: boolean}): void {
if (this.timeout() < 15 * 1000) this.timeout(150 * 1000);
this.retries(0); // This test fail sometimes, with a 5% rate.

const afterEachCallbacks: (() => Promise<void> | void)[] = [];
afterEach(async () => {
Expand All @@ -50,75 +29,34 @@ describe("gossipsub", function () {
}
});

// Schedule all forks at ALTAIR_FORK_EPOCH to avoid generating the pubkeys cache
/* eslint-disable @typescript-eslint/naming-convention */
const config = createChainForkConfig({
...defaultChainConfig,
ALTAIR_FORK_EPOCH: 1,
BELLATRIX_FORK_EPOCH: 1,
CAPELLA_FORK_EPOCH: 1,
});
const START_SLOT = computeStartSlotAtEpoch(config.ALTAIR_FORK_EPOCH);

// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
async function mockModules(gossipHandlersPartial?: Partial<GossipHandlers>) {
const controller = new AbortController();

const block = ssz.phase0.SignedBeaconBlock.defaultValue();
const state = generateState({
finalizedCheckpoint: {
epoch: 0,
root: ssz.phase0.BeaconBlock.hashTreeRoot(block.message),
},
});

const beaconConfig = createBeaconConfig(config, state.genesisValidatorsRoot);
const chain = new MockBeaconChain({
genesisTime: 0,
chainId: 0,
networkId: BigInt(0),
state,
config: beaconConfig,
});

chain.forkChoice.getHead = () => {
return {
...zeroProtoBlock,
slot: START_SLOT,
};
};

const db = new StubbedBeaconDb(config);
const gossipHandlers = gossipHandlersPartial as GossipHandlers;

const loggerA = testLogger("A");
const loggerB = testLogger("B");

const modules: Omit<NetworkInitModules, "opts" | "peerId" | "logger"> = {
config: beaconConfig,
chain,
db,
getReqRespHandler: getReqRespHandlers({db, chain}),
gossipHandlers,
signal: controller.signal,
metrics: null,
};
const netA = await Network.init({
...modules,
...(await createNetworkModules(multiaddr, undefined, opts)),
logger: loggerA,
});
const netB = await Network.init({
...modules,
...(await createNetworkModules(multiaddr, undefined, opts)),
logger: loggerB,
});
const [netA, closeA] = await getNetworkForTest("A", config, {opts: {useWorker}, gossipHandlersPartial});
const [netB, closeB] = await getNetworkForTest("B", config, {opts: {useWorker}, gossipHandlersPartial});

afterEachCallbacks.push(async () => {
await chain.close();
controller.abort();
await Promise.all([netA.close(), netB.close()]);
sinon.restore();
await closeA();
await closeB();
});

return {netA, netB, chain, controller};
return {netA, netB};
}

it("Publish and receive a voluntaryExit", async function () {
let onVoluntaryExit: (ve: Uint8Array) => void;
const onVoluntaryExitPromise = new Promise<Uint8Array>((resolve) => (onVoluntaryExit = resolve));

const {netA, netB, controller} = await mockModules({
const {netA, netB} = await mockModules({
[GossipType.voluntary_exit]: async ({serializedData}) => {
onVoluntaryExit(serializedData);
},
Expand All @@ -132,7 +70,7 @@ describe("gossipsub", function () {
await netB.subscribeGossipCoreTopics();

// Wait to have a peer connected to a topic
while (!controller.signal.aborted) {
while (!netA.closed) {
await sleep(500);
if (await hasSomeMeshPeer(netA)) {
break;
Expand All @@ -151,7 +89,7 @@ describe("gossipsub", function () {
let onBlsToExecutionChange: (blsToExec: Uint8Array) => void;
const onBlsToExecutionChangePromise = new Promise<Uint8Array>((resolve) => (onBlsToExecutionChange = resolve));

const {netA, netB, controller} = await mockModules({
const {netA, netB} = await mockModules({
[GossipType.bls_to_execution_change]: async ({serializedData}) => {
onBlsToExecutionChange(serializedData);
},
Expand All @@ -165,7 +103,7 @@ describe("gossipsub", function () {
await netB.subscribeGossipCoreTopics();

// Wait to have a peer connected to a topic
while (!controller.signal.aborted) {
while (!netA.closed) {
await sleep(500);
if (await hasSomeMeshPeer(netA)) {
break;
Expand All @@ -185,7 +123,7 @@ describe("gossipsub", function () {
(resolve) => (onLightClientOptimisticUpdate = resolve)
);

const {netA, netB, controller} = await mockModules({
const {netA, netB} = await mockModules({
[GossipType.light_client_optimistic_update]: async ({serializedData}) => {
onLightClientOptimisticUpdate(serializedData);
},
Expand All @@ -199,7 +137,7 @@ describe("gossipsub", function () {
await netB.subscribeGossipCoreTopics();

// Wait to have a peer connected to a topic
while (!controller.signal.aborted) {
while (!netA.closed) {
await sleep(500);
if (await hasSomeMeshPeer(netA)) {
break;
Expand All @@ -222,7 +160,7 @@ describe("gossipsub", function () {
(resolve) => (onLightClientFinalityUpdate = resolve)
);

const {netA, netB, controller} = await mockModules({
const {netA, netB} = await mockModules({
[GossipType.light_client_finality_update]: async ({serializedData}) => {
onLightClientFinalityUpdate(serializedData);
},
Expand All @@ -236,7 +174,7 @@ describe("gossipsub", function () {
await netB.subscribeGossipCoreTopics();

// Wait to have a peer connected to a topic
while (!controller.signal.aborted) {
while (!netA.closed) {
await sleep(500);
if (await hasSomeMeshPeer(netA)) {
break;
Expand All @@ -250,7 +188,7 @@ describe("gossipsub", function () {
const optimisticUpdate = await onLightClientFinalityUpdatePromise;
expect(optimisticUpdate).to.deep.equal(ssz.capella.LightClientFinalityUpdate.serialize(lightClientFinalityUpdate));
});
});
}

async function hasSomeMeshPeer(net: Network): Promise<boolean> {
return Object.values(await net.dumpMeshPeers()).some((peers) => peers.length > 0);
Expand Down
Loading