Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix random subnet subscription #4930

Merged
merged 7 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,48 @@ export function createLodestarMetrics(
buckets: [0.1, 1, 10, 100],
}),

attnetsService: {
committeeSubnets: register.gauge({
name: "lodestar_attnets_service_committee_subnets_total",
help: "Count of committee subnets",
}),
subscriptionsCommittee: register.gauge({
name: "lodestar_attnets_service_committee_subscriptions_total",
help: "Count of committee subscriptions",
}),
subscriptionsRandom: register.gauge({
name: "lodestar_attnets_service_random_subscriptions_total",
help: "Count of random subscriptions",
}),
subscribeSubnets: register.gauge<"subnet" | "src">({
name: "lodestar_attnets_service_subscribe_subnets_total",
help: "Count of subscribe_subnets calls",
labelNames: ["subnet", "src"],
}),
unsubscribeSubnets: register.gauge<"subnet" | "src">({
name: "lodestar_attnets_service_unsubscribe_subnets_total",
help: "Count of unsubscribe_subnets calls",
labelNames: ["subnet", "src"],
}),
},

syncnetsService: {
subscriptionsCommittee: register.gauge({
name: "lodestar_syncnets_service_committee_subscriptions_total",
help: "Count of syncnet committee subscriptions",
}),
subscribeSubnets: register.gauge<"subnet">({
name: "lodestar_syncnets_service_subscribe_subnets_total",
help: "Count of syncnet subscribe_subnets calls",
labelNames: ["subnet"],
}),
unsubscribeSubnets: register.gauge<"subnet">({
name: "lodestar_syncnets_service_unsubscribe_subnets_total",
help: "Count of syncnet unsubscribe_subnets calls",
labelNames: ["subnet"],
}),
},

regenQueue: {
length: register.gauge({
name: "lodestar_regen_queue_length",
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ export class Network implements INetwork {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any
void this.gossip.init((libp2p as any).components).catch((e) => this.logger.error(e));

this.attnetsService = new AttnetsService(config, chain, this.gossip, metadata, logger, opts);
this.syncnetsService = new SyncnetsService(config, chain, this.gossip, metadata, logger, opts);
this.attnetsService = new AttnetsService(config, chain, this.gossip, metadata, logger, metrics, opts);
this.syncnetsService = new SyncnetsService(config, chain, this.gossip, metadata, logger, metrics, opts);

this.peerManager = new PeerManager(
{
Expand Down
68 changes: 51 additions & 17 deletions packages/beacon-node/src/network/subnets/attnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import {Eth2Gossipsub, GossipType} from "../gossip/index.js";
import {MetadataController} from "../metadata.js";
import {SubnetMap, RequestedSubnet} from "../peers/utils/index.js";
import {getActiveForks} from "../forks.js";
import {IAttnetsService, CommitteeSubscription, SubnetsServiceOpts} from "./interface.js";
import {IMetrics} from "../../metrics/metrics.js";
import {IAttnetsService, CommitteeSubscription, SubnetsServiceOpts, RandBetweenFn, ShuffleFn} from "./interface.js";

/**
* The time (in slots) before a last seen validator is considered absent and we unsubscribe from the random
Expand All @@ -25,6 +26,11 @@ const LAST_SEEN_VALIDATOR_TIMEOUT = 150;

const gossipType = GossipType.beacon_attestation;

enum SubnetSource {
committee = "committee",
random = "random",
}

/**
* Manage random (long lived) subnets and committee (short lived) subnets.
*/
Expand All @@ -47,12 +53,16 @@ export class AttnetsService implements IAttnetsService {
*/
private knownValidators = new Map<number, Slot>();

private randBetweenFn: RandBetweenFn;
private shuffleFn: ShuffleFn;

constructor(
private readonly config: IChainForkConfig,
private readonly chain: IBeaconChain,
private readonly gossip: Eth2Gossipsub,
private readonly metadata: MetadataController,
private readonly logger: ILogger,
private readonly metrics: IMetrics | null,
private readonly opts?: SubnetsServiceOpts
) {
// if subscribeAllSubnets, we act like we have >= ATTESTATION_SUBNET_COUNT validators connecting to this node
Expand All @@ -62,6 +72,12 @@ export class AttnetsService implements IAttnetsService {
this.committeeSubnets.request({subnet, toSlot: Infinity});
}
}

this.randBetweenFn = this.opts?.randBetweenFn ?? randBetween;
this.shuffleFn = this.opts?.shuffleFn ?? shuffle;
if (metrics) {
metrics.attnetsService.subscriptionsRandom.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}
}

start(): void {
Expand Down Expand Up @@ -105,7 +121,10 @@ export class AttnetsService implements IAttnetsService {

// Trigger gossip subscription first, in batch
if (subnetsToSubscribe.length > 0) {
this.subscribeToSubnets(subnetsToSubscribe.map((sub) => sub.subnet));
this.subscribeToSubnets(
subnetsToSubscribe.map((sub) => sub.subnet),
SubnetSource.committee
);
}
// Then, register the subscriptions
for (const subscription of subnetsToSubscribe) {
Expand Down Expand Up @@ -145,6 +164,10 @@ export class AttnetsService implements IAttnetsService {
*/
private onSlot = (slot: Slot): void => {
try {
// For node >= 64 validators, we should consistently subscribe to all subnets
// it's important to check random subnets first
// See https://github.com/ChainSafe/lodestar/issues/4929
this.unsubscribeExpiredRandomSubnets(slot);
this.unsubscribeExpiredCommitteeSubnets(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onSlot", {slot}, e as Error);
Expand All @@ -157,7 +180,6 @@ export class AttnetsService implements IAttnetsService {
private onEpoch = (epoch: Epoch): void => {
try {
const slot = computeStartSlotAtEpoch(epoch);
this.unsubscribeExpiredRandomSubnets(slot);
this.pruneExpiredKnownValidators(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onEpoch", {epoch}, e as Error);
Expand All @@ -170,7 +192,8 @@ export class AttnetsService implements IAttnetsService {
*/
private unsubscribeExpiredCommitteeSubnets(slot: Slot): void {
const expired = this.subscriptionsCommittee.getExpired(slot);
this.unsubscribeSubnets(expired, slot);
if (expired.length === 0) return;
dapplion marked this conversation as resolved.
Show resolved Hide resolved
this.unsubscribeSubnets(expired, slot, SubnetSource.committee);
}

/**
Expand All @@ -182,16 +205,18 @@ export class AttnetsService implements IAttnetsService {
const expired = this.subscriptionsRandom.getExpired(slot);
const currentSlot = this.chain.clock.currentSlot;

if (expired.length === 0) return;
dapplion marked this conversation as resolved.
Show resolved Hide resolved

if (this.knownValidators.size * RANDOM_SUBNETS_PER_VALIDATOR >= ATTESTATION_SUBNET_COUNT) {
// Optimization: If we have to be subcribed to all subnets, no need to unsubscribe. Just extend the timeout
for (const subnet of expired) {
this.subscriptionsRandom.request({subnet, toSlot: randomSubscriptionSlotLen() + currentSlot});
this.subscriptionsRandom.request({subnet, toSlot: this.randomSubscriptionSlotLen() + currentSlot});
}
return;
}

// Prune subnets and re-subcribe to new ones
this.unsubscribeSubnets(expired, slot);
this.unsubscribeSubnets(expired, slot, SubnetSource.random);
this.rebalanceRandomSubnets();
}

Expand Down Expand Up @@ -233,15 +258,15 @@ export class AttnetsService implements IAttnetsService {
const activeSubnets = new Set(this.subscriptionsRandom.getActive(slot));
const allSubnets = Array.from({length: ATTESTATION_SUBNET_COUNT}, (_, i) => i);
const availableSubnets = allSubnets.filter((subnet) => !activeSubnets.has(subnet));
const subnetsToConnect = shuffle(availableSubnets).slice(0, subnetDiff);
const subnetsToConnect = this.shuffleFn(availableSubnets).slice(0, subnetDiff);

// Tell gossip to connect to the subnets if not connected already
this.subscribeToSubnets(subnetsToConnect);
this.subscribeToSubnets(subnetsToConnect, SubnetSource.random);

// Register these new subnets until some future slot
for (const subnet of subnetsToConnect) {
// the heartbeat will help connect to respective peers
this.subscriptionsRandom.request({subnet, toSlot: randomSubscriptionSlotLen() + slot});
this.subscriptionsRandom.request({subnet, toSlot: this.randomSubscriptionSlotLen() + slot});
}
}

Expand All @@ -254,7 +279,7 @@ export class AttnetsService implements IAttnetsService {
for (const subnet of toRemoveSubnets) {
this.subscriptionsRandom.delete(subnet);
}
this.unsubscribeSubnets(toRemoveSubnets, slot);
this.unsubscribeSubnets(toRemoveSubnets, slot, SubnetSource.random);
}

// If there has been a change update the local ENR bitfield
Expand All @@ -277,19 +302,20 @@ export class AttnetsService implements IAttnetsService {
}

/** Tigger a gossip subcription only if not already subscribed */
private subscribeToSubnets(subnets: number[]): void {
private subscribeToSubnets(subnets: number[], src: SubnetSource): void {
const forks = getActiveForks(this.config, this.chain.clock.currentEpoch);
for (const subnet of subnets) {
if (!this.subscriptionsCommittee.has(subnet) && !this.subscriptionsRandom.has(subnet)) {
for (const fork of forks) {
this.gossip.subscribeTopic({type: gossipType, fork, subnet});
}
this.metrics?.attnetsService.subscribeSubnets.inc({subnet, src});
}
}
}

/** Trigger a gossip un-subscrition only if no-one is still subscribed */
private unsubscribeSubnets(subnets: number[], slot: Slot): void {
private unsubscribeSubnets(subnets: number[], slot: Slot, src: SubnetSource): void {
// No need to unsubscribeTopic(). Return early to prevent repetitive extra work
if (this.opts?.subscribeAllSubnets) return;

Expand All @@ -302,13 +328,21 @@ export class AttnetsService implements IAttnetsService {
for (const fork of forks) {
this.gossip.unsubscribeTopic({type: gossipType, fork, subnet});
}
this.metrics?.attnetsService.unsubscribeSubnets.inc({subnet, src});
}
}
}
}

function randomSubscriptionSlotLen(): Slot {
return (
randBetween(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION, 2 * EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION) * SLOTS_PER_EPOCH
);
private randomSubscriptionSlotLen(): Slot {
return (
this.randBetweenFn(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION, 2 * EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION) *
SLOTS_PER_EPOCH
);
}

private onScrapeLodestarMetrics(metrics: IMetrics): void {
metrics.attnetsService.committeeSubnets.set(this.committeeSubnets.size);
metrics.attnetsService.subscriptionsCommittee.set(this.subscriptionsCommittee.size);
metrics.attnetsService.subscriptionsRandom.set(this.subscriptionsRandom.size);
}
}
6 changes: 6 additions & 0 deletions packages/beacon-node/src/network/subnets/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ export interface IAttnetsService extends ISubnetsService {
shouldProcess(subnet: number, slot: Slot): boolean;
}

export type RandBetweenFn = (min: number, max: number) => number;
export type ShuffleFn = <T>(arr: T[]) => T[];

export type SubnetsServiceOpts = {
subscribeAllSubnets?: boolean;
// For unit test
dapplion marked this conversation as resolved.
Show resolved Hide resolved
randBetweenFn?: RandBetweenFn;
shuffleFn?: ShuffleFn;
};
18 changes: 15 additions & 3 deletions packages/beacon-node/src/network/subnets/syncnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {getActiveForks} from "../forks.js";
import {Eth2Gossipsub, GossipType} from "../gossip/index.js";
import {MetadataController} from "../metadata.js";
import {RequestedSubnet, SubnetMap} from "../peers/utils/index.js";
import {IMetrics} from "../../metrics/metrics.js";
import {CommitteeSubscription, ISubnetsService, SubnetsServiceOpts} from "./interface.js";

const gossipType = GossipType.sync_committee;
Expand All @@ -32,8 +33,13 @@ export class SyncnetsService implements ISubnetsService {
private readonly gossip: Eth2Gossipsub,
private readonly metadata: MetadataController,
private readonly logger: ILogger,
private readonly metrics: IMetrics | null,
private readonly opts?: SubnetsServiceOpts
) {}
) {
if (metrics) {
metrics.syncnetsService.subscriptionsCommittee.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}
}

start(): void {
this.chain.emitter.on(ChainEvent.clockEpoch, this.onEpoch);
Expand Down Expand Up @@ -120,6 +126,7 @@ export class SyncnetsService implements ISubnetsService {
for (const fork of forks) {
this.gossip.subscribeTopic({type: gossipType, fork, subnet});
}
this.metrics?.syncnetsService.subscribeSubnets.inc({subnet});
}
}
}
Expand All @@ -129,11 +136,16 @@ export class SyncnetsService implements ISubnetsService {
const forks = getActiveForks(this.config, this.chain.clock.currentEpoch);
for (const subnet of subnets) {
// No need to check if active in subscriptionsCommittee since we only have a single SubnetMap
for (const fork of forks) {
if (!this.opts?.subscribeAllSubnets) {
if (!this.opts?.subscribeAllSubnets) {
for (const fork of forks) {
this.gossip.unsubscribeTopic({type: gossipType, fork, subnet});
}
this.metrics?.syncnetsService.unsubscribeSubnets.inc({subnet});
}
}
}

private onScrapeLodestarMetrics(metrics: IMetrics): void {
metrics.syncnetsService.subscriptionsCommittee.set(this.subscriptionsCommittee.size);
}
}
Loading