Skip to content

Commit

Permalink
Fix random subnet subscription (#4930)
Browse files Browse the repository at this point in the history
* Add randBetweenFn option to AttnetsService for unit test

* Add shuffleFn option

* Add back AttestationService unit test

* Reproduce and fix #4929

* Add metrics to attnetsService syncnetsService

* Address PR comments

* Unskip the last unit test in attestationService.test.ts
  • Loading branch information
twoeths authored Dec 22, 2022
1 parent 176b57e commit 3166e0e
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 54 deletions.
42 changes: 42 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,48 @@ export function createLodestarMetrics(
buckets: [0.1, 1, 10, 100],
}),

attnetsService: {
committeeSubnets: register.gauge({
name: "lodestar_attnets_service_committee_subnets_total",
help: "Count of committee subnets",
}),
subscriptionsCommittee: register.gauge({
name: "lodestar_attnets_service_committee_subscriptions_total",
help: "Count of committee subscriptions",
}),
subscriptionsRandom: register.gauge({
name: "lodestar_attnets_service_random_subscriptions_total",
help: "Count of random subscriptions",
}),
subscribeSubnets: register.gauge<"subnet" | "src">({
name: "lodestar_attnets_service_subscribe_subnets_total",
help: "Count of subscribe_subnets calls",
labelNames: ["subnet", "src"],
}),
unsubscribeSubnets: register.gauge<"subnet" | "src">({
name: "lodestar_attnets_service_unsubscribe_subnets_total",
help: "Count of unsubscribe_subnets calls",
labelNames: ["subnet", "src"],
}),
},

syncnetsService: {
subscriptionsCommittee: register.gauge({
name: "lodestar_syncnets_service_committee_subscriptions_total",
help: "Count of syncnet committee subscriptions",
}),
subscribeSubnets: register.gauge<"subnet">({
name: "lodestar_syncnets_service_subscribe_subnets_total",
help: "Count of syncnet subscribe_subnets calls",
labelNames: ["subnet"],
}),
unsubscribeSubnets: register.gauge<"subnet">({
name: "lodestar_syncnets_service_unsubscribe_subnets_total",
help: "Count of syncnet unsubscribe_subnets calls",
labelNames: ["subnet"],
}),
},

regenQueue: {
length: register.gauge({
name: "lodestar_regen_queue_length",
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ export class Network implements INetwork {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any
void this.gossip.init((libp2p as any).components).catch((e) => this.logger.error(e));

this.attnetsService = new AttnetsService(config, chain, this.gossip, metadata, logger, opts);
this.syncnetsService = new SyncnetsService(config, chain, this.gossip, metadata, logger, opts);
this.attnetsService = new AttnetsService(config, chain, this.gossip, metadata, logger, metrics, opts);
this.syncnetsService = new SyncnetsService(config, chain, this.gossip, metadata, logger, metrics, opts);

this.peerManager = new PeerManager(
{
Expand Down
71 changes: 54 additions & 17 deletions packages/beacon-node/src/network/subnets/attnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import {Eth2Gossipsub, GossipType} from "../gossip/index.js";
import {MetadataController} from "../metadata.js";
import {SubnetMap, RequestedSubnet} from "../peers/utils/index.js";
import {getActiveForks} from "../forks.js";
import {IAttnetsService, CommitteeSubscription, SubnetsServiceOpts} from "./interface.js";
import {IMetrics} from "../../metrics/metrics.js";
import {IAttnetsService, CommitteeSubscription, SubnetsServiceOpts, RandBetweenFn, ShuffleFn} from "./interface.js";

/**
* The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random
Expand All @@ -25,6 +26,11 @@ const LAST_SEEN_VALIDATOR_TIMEOUT = 150;

const gossipType = GossipType.beacon_attestation;

enum SubnetSource {
committee = "committee",
random = "random",
}

/**
* Manage random (long lived) subnets and committee (short lived) subnets.
*/
Expand All @@ -47,12 +53,16 @@ export class AttnetsService implements IAttnetsService {
*/
private knownValidators = new Map<number, Slot>();

private randBetweenFn: RandBetweenFn;
private shuffleFn: ShuffleFn;

constructor(
private readonly config: IChainForkConfig,
private readonly chain: IBeaconChain,
private readonly gossip: Eth2Gossipsub,
private readonly metadata: MetadataController,
private readonly logger: ILogger,
private readonly metrics: IMetrics | null,
private readonly opts?: SubnetsServiceOpts
) {
// if subscribeAllSubnets, we act like we have >= ATTESTATION_SUBNET_COUNT validators connecting to this node
Expand All @@ -62,6 +72,12 @@ export class AttnetsService implements IAttnetsService {
this.committeeSubnets.request({subnet, toSlot: Infinity});
}
}

this.randBetweenFn = this.opts?.randBetweenFn ?? randBetween;
this.shuffleFn = this.opts?.shuffleFn ?? shuffle;
if (metrics) {
metrics.attnetsService.subscriptionsRandom.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}
}

start(): void {
Expand Down Expand Up @@ -105,7 +121,10 @@ export class AttnetsService implements IAttnetsService {

// Trigger gossip subscription first, in batch
if (subnetsToSubscribe.length > 0) {
this.subscribeToSubnets(subnetsToSubscribe.map((sub) => sub.subnet));
this.subscribeToSubnets(
subnetsToSubscribe.map((sub) => sub.subnet),
SubnetSource.committee
);
}
// Then, register the subscriptions
for (const subscription of subnetsToSubscribe) {
Expand Down Expand Up @@ -145,6 +164,10 @@ export class AttnetsService implements IAttnetsService {
*/
private onSlot = (slot: Slot): void => {
try {
// For node >= 64 validators, we should consistently subscribe to all subnets
// it's important to check random subnets first
// See https://github.com/ChainSafe/lodestar/issues/4929
this.unsubscribeExpiredRandomSubnets(slot);
this.unsubscribeExpiredCommitteeSubnets(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onSlot", {slot}, e as Error);
Expand All @@ -157,7 +180,6 @@ export class AttnetsService implements IAttnetsService {
private onEpoch = (epoch: Epoch): void => {
try {
const slot = computeStartSlotAtEpoch(epoch);
this.unsubscribeExpiredRandomSubnets(slot);
this.pruneExpiredKnownValidators(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onEpoch", {epoch}, e as Error);
Expand All @@ -170,7 +192,9 @@ export class AttnetsService implements IAttnetsService {
*/
private unsubscribeExpiredCommitteeSubnets(slot: Slot): void {
const expired = this.subscriptionsCommittee.getExpired(slot);
this.unsubscribeSubnets(expired, slot);
if (expired.length > 0) {
this.unsubscribeSubnets(expired, slot, SubnetSource.committee);
}
}

/**
Expand All @@ -182,16 +206,20 @@ export class AttnetsService implements IAttnetsService {
const expired = this.subscriptionsRandom.getExpired(slot);
const currentSlot = this.chain.clock.currentSlot;

if (expired.length === 0) {
return;
}

if (this.knownValidators.size * RANDOM_SUBNETS_PER_VALIDATOR >= ATTESTATION_SUBNET_COUNT) {
// Optimization: If we have to be subcribed to all subnets, no need to unsubscribe. Just extend the timeout
for (const subnet of expired) {
this.subscriptionsRandom.request({subnet, toSlot: randomSubscriptionSlotLen() + currentSlot});
this.subscriptionsRandom.request({subnet, toSlot: this.randomSubscriptionSlotLen() + currentSlot});
}
return;
}

// Prune subnets and re-subcribe to new ones
this.unsubscribeSubnets(expired, slot);
this.unsubscribeSubnets(expired, slot, SubnetSource.random);
this.rebalanceRandomSubnets();
}

Expand Down Expand Up @@ -233,15 +261,15 @@ export class AttnetsService implements IAttnetsService {
const activeSubnets = new Set(this.subscriptionsRandom.getActive(slot));
const allSubnets = Array.from({length: ATTESTATION_SUBNET_COUNT}, (_, i) => i);
const availableSubnets = allSubnets.filter((subnet) => !activeSubnets.has(subnet));
const subnetsToConnect = shuffle(availableSubnets).slice(0, subnetDiff);
const subnetsToConnect = this.shuffleFn(availableSubnets).slice(0, subnetDiff);

// Tell gossip to connect to the subnets if not connected already
this.subscribeToSubnets(subnetsToConnect);
this.subscribeToSubnets(subnetsToConnect, SubnetSource.random);

// Register these new subnets until some future slot
for (const subnet of subnetsToConnect) {
// the heartbeat will help connect to respective peers
this.subscriptionsRandom.request({subnet, toSlot: randomSubscriptionSlotLen() + slot});
this.subscriptionsRandom.request({subnet, toSlot: this.randomSubscriptionSlotLen() + slot});
}
}

Expand All @@ -254,7 +282,7 @@ export class AttnetsService implements IAttnetsService {
for (const subnet of toRemoveSubnets) {
this.subscriptionsRandom.delete(subnet);
}
this.unsubscribeSubnets(toRemoveSubnets, slot);
this.unsubscribeSubnets(toRemoveSubnets, slot, SubnetSource.random);
}

// If there has been a change update the local ENR bitfield
Expand All @@ -277,19 +305,20 @@ export class AttnetsService implements IAttnetsService {
}

/** Tigger a gossip subcription only if not already subscribed */
private subscribeToSubnets(subnets: number[]): void {
private subscribeToSubnets(subnets: number[], src: SubnetSource): void {
const forks = getActiveForks(this.config, this.chain.clock.currentEpoch);
for (const subnet of subnets) {
if (!this.subscriptionsCommittee.has(subnet) && !this.subscriptionsRandom.has(subnet)) {
for (const fork of forks) {
this.gossip.subscribeTopic({type: gossipType, fork, subnet});
}
this.metrics?.attnetsService.subscribeSubnets.inc({subnet, src});
}
}
}

/** Trigger a gossip un-subscrition only if no-one is still subscribed */
private unsubscribeSubnets(subnets: number[], slot: Slot): void {
private unsubscribeSubnets(subnets: number[], slot: Slot, src: SubnetSource): void {
// No need to unsubscribeTopic(). Return early to prevent repetitive extra work
if (this.opts?.subscribeAllSubnets) return;

Expand All @@ -302,13 +331,21 @@ export class AttnetsService implements IAttnetsService {
for (const fork of forks) {
this.gossip.unsubscribeTopic({type: gossipType, fork, subnet});
}
this.metrics?.attnetsService.unsubscribeSubnets.inc({subnet, src});
}
}
}
}

function randomSubscriptionSlotLen(): Slot {
return (
randBetween(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION, 2 * EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION) * SLOTS_PER_EPOCH
);
private randomSubscriptionSlotLen(): Slot {
return (
this.randBetweenFn(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION, 2 * EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION) *
SLOTS_PER_EPOCH
);
}

private onScrapeLodestarMetrics(metrics: IMetrics): void {
metrics.attnetsService.committeeSubnets.set(this.committeeSubnets.size);
metrics.attnetsService.subscriptionsCommittee.set(this.subscriptionsCommittee.size);
metrics.attnetsService.subscriptionsRandom.set(this.subscriptionsRandom.size);
}
}
6 changes: 6 additions & 0 deletions packages/beacon-node/src/network/subnets/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ export interface IAttnetsService extends ISubnetsService {
shouldProcess(subnet: number, slot: Slot): boolean;
}

export type RandBetweenFn = (min: number, max: number) => number;
export type ShuffleFn = <T>(arr: T[]) => T[];

export type SubnetsServiceOpts = {
subscribeAllSubnets?: boolean;
// For deterministic randomness in unit test after ESM prevents simple import mocking
randBetweenFn?: RandBetweenFn;
shuffleFn?: ShuffleFn;
};
18 changes: 15 additions & 3 deletions packages/beacon-node/src/network/subnets/syncnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {getActiveForks} from "../forks.js";
import {Eth2Gossipsub, GossipType} from "../gossip/index.js";
import {MetadataController} from "../metadata.js";
import {RequestedSubnet, SubnetMap} from "../peers/utils/index.js";
import {IMetrics} from "../../metrics/metrics.js";
import {CommitteeSubscription, ISubnetsService, SubnetsServiceOpts} from "./interface.js";

const gossipType = GossipType.sync_committee;
Expand All @@ -32,8 +33,13 @@ export class SyncnetsService implements ISubnetsService {
private readonly gossip: Eth2Gossipsub,
private readonly metadata: MetadataController,
private readonly logger: ILogger,
private readonly metrics: IMetrics | null,
private readonly opts?: SubnetsServiceOpts
) {}
) {
if (metrics) {
metrics.syncnetsService.subscriptionsCommittee.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}
}

start(): void {
this.chain.emitter.on(ChainEvent.clockEpoch, this.onEpoch);
Expand Down Expand Up @@ -120,6 +126,7 @@ export class SyncnetsService implements ISubnetsService {
for (const fork of forks) {
this.gossip.subscribeTopic({type: gossipType, fork, subnet});
}
this.metrics?.syncnetsService.subscribeSubnets.inc({subnet});
}
}
}
Expand All @@ -129,11 +136,16 @@ export class SyncnetsService implements ISubnetsService {
const forks = getActiveForks(this.config, this.chain.clock.currentEpoch);
for (const subnet of subnets) {
// No need to check if active in subscriptionsCommittee since we only have a single SubnetMap
for (const fork of forks) {
if (!this.opts?.subscribeAllSubnets) {
if (!this.opts?.subscribeAllSubnets) {
for (const fork of forks) {
this.gossip.unsubscribeTopic({type: gossipType, fork, subnet});
}
this.metrics?.syncnetsService.unsubscribeSubnets.inc({subnet});
}
}
}

private onScrapeLodestarMetrics(metrics: IMetrics): void {
metrics.syncnetsService.subscriptionsCommittee.set(this.subscriptionsCommittee.size);
}
}
Loading

0 comments on commit 3166e0e

Please sign in to comment.