From 3774e6a5c2987af592577b97449f994974350597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 5 Feb 2020 12:33:37 +0100 Subject: [PATCH 01/24] fix bug where justified blocks are archived --- .../src/db/api/beacon/repositories/blockArchive.ts | 8 +++++++- packages/lodestar/src/tasks/tasks/archiveBlocks.ts | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts b/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts index e59cfdfd4d3..38cb0dc8f68 100644 --- a/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts +++ b/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts @@ -1,4 +1,4 @@ -import {BeaconBlock} from "@chainsafe/eth2.0-types"; +import {BeaconBlock, Root} from "@chainsafe/eth2.0-types"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {BulkRepository} from "../repository"; import {IDatabaseController} from "../../../controller"; @@ -16,6 +16,12 @@ export class BlockArchiveRepository extends BulkRepository { ) { super(config, db, Bucket.blockArchive, config.types.BeaconBlock); } + + public async getByRoot(root: Root): Promise { + return this.get( + await this.db.get(encodeKey(Bucket.blockRootRefs, root)) + ); + } public async addMany(blocks: BeaconBlock[]): Promise { await this.db.batchPut( diff --git a/packages/lodestar/src/tasks/tasks/archiveBlocks.ts b/packages/lodestar/src/tasks/tasks/archiveBlocks.ts index 99c6dfaf2e6..17099b2008b 100644 --- a/packages/lodestar/src/tasks/tasks/archiveBlocks.ts +++ b/packages/lodestar/src/tasks/tasks/archiveBlocks.ts @@ -35,7 +35,7 @@ export class ArchiveBlocksTask implements ITask { public async run(): Promise { const blocks = (await this.db.block.getAll()).filter( (block) => - computeEpochAtSlot(this.config, block.slot) <= this.finalizedCheckpoint.epoch + computeEpochAtSlot(this.config, block.slot) < this.finalizedCheckpoint.epoch ); this.logger.info(`Started archiving ${blocks.length} block ` +`(finalized epoch #${this.finalizedCheckpoint.epoch})...` From b650ae45e40cd6f0d4e6ac33931ae71d87374b09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 5 Feb 2020 19:57:18 +0100 Subject: [PATCH 02/24] ensure better gossip typesafety --- .../lodestar/src/network/gossip/gossip.ts | 106 +++++++++++------- .../lodestar/src/network/gossip/interface.ts | 13 ++- packages/lodestar/src/sync/regular.ts | 36 +++--- 3 files changed, 89 insertions(+), 66 deletions(-) diff --git a/packages/lodestar/src/network/gossip/gossip.ts b/packages/lodestar/src/network/gossip/gossip.ts index 7dbbf643f71..230ab87a660 100644 --- a/packages/lodestar/src/network/gossip/gossip.ts +++ b/packages/lodestar/src/network/gossip/gossip.ts @@ -4,26 +4,43 @@ */ import {EventEmitter} from "events"; -//@ts-ignore import {promisify} from "es6-promisify"; import LibP2p from "libp2p"; //@ts-ignore import Gossipsub from "libp2p-gossipsub"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {ATTESTATION_SUBNET_COUNT} from "../../constants"; -import {ILogger, LogLevel} from "@chainsafe/eth2.0-utils/lib/logger"; -import {getGossipTopic,} from "./utils"; +import {ILogger, LogLevel} from "@chainsafe/eth2.0-utils/lib/logger"; +import {getAttestationSubnet, getGossipTopic,} from "./utils"; import {INetworkOptions} from "../options"; -import {GossipEventEmitter, IGossip, IGossipEvents, IGossipSub, IGossipModules, IGossipMessageValidator, IGossipMessage} - from "./interface"; +import { + GossipEventEmitter, + IGossip, + IGossipEvents, + IGossipMessage, + IGossipMessageValidator, + IGossipModules, + IGossipSub +} from "./interface"; import {GossipEvent} from "./constants"; -import {publishBlock, getIncomingBlockHandler} from "./handlers/block"; -import {publishCommiteeAttestation, getCommitteeAttestationHandler, getIncomingAttestationHandler} - from "./handlers/attestation"; -import {publishAttesterSlashing, getIncomingAttesterSlashingHandler} from "./handlers/attesterSlashing"; -import {publishProposerSlashing, getIncomingProposerSlashingHandler} from "./handlers/proposerSlashing"; -import {publishVoluntaryExit, getIncomingVoluntaryExitHandler} from "./handlers/voluntaryExit"; -import {publishAggregatedAttestation, getIncomingAggregateAndProofHandler} from "./handlers/aggregateAndProof"; +import {getIncomingBlockHandler, publishBlock} from "./handlers/block"; +import { + getCommitteeAttestationHandler, + getIncomingAttestationHandler, + publishCommiteeAttestation +} from "./handlers/attestation"; +import {getIncomingAttesterSlashingHandler, publishAttesterSlashing} from "./handlers/attesterSlashing"; +import {getIncomingProposerSlashingHandler, publishProposerSlashing} from "./handlers/proposerSlashing"; +import {getIncomingVoluntaryExitHandler, publishVoluntaryExit} from "./handlers/voluntaryExit"; +import {getIncomingAggregateAndProofHandler, publishAggregatedAttestation} from "./handlers/aggregateAndProof"; +import { + AggregateAndProof, + Attestation, + AttesterSlashing, + BeaconBlock, + ProposerSlashing, + VoluntaryExit +} from "@chainsafe/eth2.0-types"; export type GossipHandlerFn = (this: Gossip, msg: IGossipMessage) => void; @@ -76,43 +93,50 @@ export class Gossip extends (EventEmitter as { new(): GossipEventEmitter }) impl public publishAttesterSlashing = publishAttesterSlashing; - // @ts-ignore - public on(event: keyof IGossipEvents, listener: Function): void { - if(this.listenerCount(event) === 0 && !event.startsWith("gossipsub")) { - this.pubsub.subscribe(getGossipTopic(event as GossipEvent, "ssz")); - } - // @ts-ignore - super.on(event, listener); + public subscribeToBlock(callback: (block: BeaconBlock) => void): void { + this.subscribe(GossipEvent.BLOCK, callback); } - // @ts-ignore - public once(event: keyof IGossipEvents, listener: Function): void { - if(this.listenerCount(event) === 0 && !event.startsWith("gossipsub")) { - this.pubsub.subscribe(getGossipTopic(event as GossipEvent, "ssz")); - } - // @ts-ignore - super.once(event, (args: unknown[]) => { - this.pubsub.unsubscribe(getGossipTopic(event as GossipEvent, "ssz")); - listener(args); - }); + public subscribeToAggregateAndProof(callback: (aggregate: AggregateAndProof) => void): void { + this.subscribe(GossipEvent.AGGREGATE_AND_PROOF, callback); } - // @ts-ignore - public removeListener(event: keyof IGossipEvents, listener: Function): void { - // @ts-ignore - super.on(event, listener); - if(this.listenerCount(event) === 0 && !event.startsWith("gossipsub")) { - this.pubsub.unsubscribe(getGossipTopic(event as GossipEvent, "ssz")); + public subscribeToAttestation(callback: (attestation: Attestation) => void): void { + this.subscribe(GossipEvent.ATTESTATION, callback); + } + + public subscribeToVoluntaryExit(callback: (voluntaryExit: VoluntaryExit) => void): void { + this.subscribe(GossipEvent.VOLUNTARY_EXIT, callback); + } + + public subscribeToProposerSlashing(callback: (slashing: ProposerSlashing) => void): void { + this.subscribe(GossipEvent.PROPOSER_SLASHING, callback); + } + + public subscribeToAttesterSlashing(callback: (slashing: AttesterSlashing) => void): void { + this.subscribe(GossipEvent.ATTESTER_SLASHING, callback); + } + + public subscribeToAttestationSubnet(subnet: string, callback: (block: BeaconBlock) => void): void { + this.subscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet]])); + } + + public unsubscribeFromAttestationSubnet(subnet: string, callback: (block: BeaconBlock) => void): void { + this.unsubscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet]])); + } + + public unsubscribe(event: keyof IGossipEvents, listener: unknown, params: Map = new Map()): void { + if(this.listenerCount(event) === 1 && !event.startsWith("gossipsub")) { + this.pubsub.unsubscribe(getGossipTopic(event as GossipEvent, "ssz", params)); } + this.removeListener(event, listener as (...args: unknown[]) => void); } - // @ts-ignore - public removeAllListeners(event: keyof IGossipEvents): void { - // @ts-ignore - super.removeAllListeners(event); - if(!event.startsWith("gossipsub")) { - this.pubsub.unsubscribe(getGossipTopic(event as GossipEvent, "ssz")); + private subscribe(event: keyof IGossipEvents, listener: unknown, params: Map = new Map()): void { + if(this.listenerCount(event) === 0 && !event.startsWith("gossipsub")) { + this.pubsub.subscribe(getGossipTopic(event as GossipEvent, "ssz", params)); } + this.on(event, listener as (...args: unknown[]) => void); } private registerHandlers(): Map { diff --git a/packages/lodestar/src/network/gossip/interface.ts b/packages/lodestar/src/network/gossip/interface.ts index 0ea57fb077f..a80d5da0393 100644 --- a/packages/lodestar/src/network/gossip/interface.ts +++ b/packages/lodestar/src/network/gossip/interface.ts @@ -26,7 +26,7 @@ export interface IGossipEvents { [GossipEvent.VOLUNTARY_EXIT]: (voluntaryExit: VoluntaryExit) => void; [GossipEvent.PROPOSER_SLASHING]: (proposerSlashing: ProposerSlashing) => void; [GossipEvent.ATTESTER_SLASHING]: (attesterSlashing: AttesterSlashing) => void; - ["gossipsub:heartbeat"]: void; + ["gossipsub:heartbeat"]: () => void; } export type GossipEventEmitter = StrictEventEmitter; @@ -45,13 +45,22 @@ export interface IGossipSub extends EventEmitter { unsubscribe(topic: string): void; } -export interface IGossip extends GossipEventEmitter, IService { +export interface IGossip extends IService { publishBlock(block: BeaconBlock): Promise; publishCommiteeAttestation(attestation: Attestation): Promise; publishAggregatedAttestation(aggregateAndProof: AggregateAndProof): Promise; publishVoluntaryExit(voluntaryExit: VoluntaryExit): Promise; publishAttesterSlashing(attesterSlashing: AttesterSlashing): Promise; publishProposerSlashing(proposerSlashing: ProposerSlashing): Promise; + subscribeToBlock(callback: (block: BeaconBlock) => void): void; + subscribeToAggregateAndProof(callback: (aggregate: AggregateAndProof) => void): void; + subscribeToAttestation(callback: (attestation: Attestation) => void): void; + subscribeToVoluntaryExit(callback: (voluntaryExit: VoluntaryExit) => void): void; + subscribeToProposerSlashing(callback: (slashing: ProposerSlashing) => void): void; + subscribeToAttesterSlashing(callback: (slashing: AttesterSlashing) => void): void; + subscribeToAttestationSubnet(subnet: string, callback: (block: BeaconBlock) => void): void; + unsubscribeFromAttestationSubnet(subnet: string, callback: (block: BeaconBlock) => void): void; + unsubscribe(event: keyof IGossipEvents, listener: unknown, params?: Map): void; } export interface IGossipMessageValidator { diff --git a/packages/lodestar/src/sync/regular.ts b/packages/lodestar/src/sync/regular.ts index 2588524c0b1..ed30a432b5b 100644 --- a/packages/lodestar/src/sync/regular.ts +++ b/packages/lodestar/src/sync/regular.ts @@ -36,14 +36,13 @@ export class RegularSync { public async start(): Promise { this.logger.verbose("regular sync start"); - this.network.gossip.on(GossipEvent.BLOCK, this.receiveBlock); - this.network.gossip.on(GossipEvent.ATTESTATION_SUBNET, this.receiveCommitteeAttestation); - this.network.gossip.on(GossipEvent.AGGREGATE_AND_PROOF, this.receiveAggregateAndProof); + this.network.gossip.subscribeToBlock(this.receiveBlock); + this.network.gossip.subscribeToAggregateAndProof(this.receiveAggregateAndProof); // For interop only, will be removed prior to mainnet - this.network.gossip.on(GossipEvent.ATTESTATION, this.receiveAttestation); - this.network.gossip.on(GossipEvent.VOLUNTARY_EXIT, this.receiveVoluntaryExit); - this.network.gossip.on(GossipEvent.PROPOSER_SLASHING, this.receiveProposerSlashing); - this.network.gossip.on(GossipEvent.ATTESTER_SLASHING, this.receiveAttesterSlashing); + this.network.gossip.subscribeToAttestation(this.receiveAttestation); + this.network.gossip.subscribeToVoluntaryExit(this.receiveVoluntaryExit); + this.network.gossip.subscribeToProposerSlashing(this.receiveProposerSlashing); + this.network.gossip.subscribeToAttesterSlashing(this.receiveAttesterSlashing); this.chain.on("processedBlock", this.onProcessedBlock); this.chain.on("processedAttestation", this.onProcessedAttestation); this.chain.on("unknownBlockRoot", this.onUnknownBlockRoot); @@ -52,13 +51,12 @@ export class RegularSync { public async stop(): Promise { this.logger.verbose("regular sync stop"); - this.network.gossip.removeListener(GossipEvent.BLOCK, this.receiveBlock); - this.network.gossip.removeListener(GossipEvent.ATTESTATION_SUBNET, this.receiveCommitteeAttestation); - this.network.gossip.removeListener(GossipEvent.AGGREGATE_AND_PROOF, this.receiveAggregateAndProof); - this.network.gossip.removeListener(GossipEvent.ATTESTATION, this.receiveAttestation); - this.network.gossip.removeListener(GossipEvent.VOLUNTARY_EXIT, this.receiveVoluntaryExit); - this.network.gossip.removeListener(GossipEvent.PROPOSER_SLASHING, this.receiveProposerSlashing); - this.network.gossip.removeListener(GossipEvent.ATTESTER_SLASHING, this.receiveAttesterSlashing); + this.network.gossip.unsubscribe(GossipEvent.BLOCK, this.receiveBlock); + this.network.gossip.unsubscribe(GossipEvent.AGGREGATE_AND_PROOF, this.receiveAggregateAndProof); + this.network.gossip.unsubscribe(GossipEvent.ATTESTATION, this.receiveAttestation); + this.network.gossip.unsubscribe(GossipEvent.VOLUNTARY_EXIT, this.receiveVoluntaryExit); + this.network.gossip.unsubscribe(GossipEvent.PROPOSER_SLASHING, this.receiveProposerSlashing); + this.network.gossip.unsubscribe(GossipEvent.ATTESTER_SLASHING, this.receiveAttesterSlashing); this.chain.removeListener("processedBlock", this.onProcessedBlock); this.chain.removeListener("processedAttestation", this.onProcessedAttestation); this.chain.removeListener("unknownBlockRoot", this.onUnknownBlockRoot); @@ -71,15 +69,7 @@ export class RegularSync { public receiveCommitteeAttestation = async (attestationSubnet: {attestation: Attestation; subnet: number}): Promise => { - const attestation = attestationSubnet.attestation; - - // to see if we need special process for these unaggregated attestations - // not in the spec atm - // send attestation on to other modules - await Promise.all([ - this.opPool.attestations.receive(attestation), - this.chain.receiveAttestation(attestation), - ]); + await this.opPool.attestations.receive(attestationSubnet.attestation); }; public receiveAggregateAndProof = async (aggregate: AggregateAndProof): Promise => { From 4d4d3ff9fe6341bab7ba23d607687edb5d40adc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 5 Feb 2020 19:57:32 +0100 Subject: [PATCH 03/24] add hex utils --- packages/lodestar/src/util/hex.ts | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 packages/lodestar/src/util/hex.ts diff --git a/packages/lodestar/src/util/hex.ts b/packages/lodestar/src/util/hex.ts new file mode 100644 index 00000000000..273eb64b37b --- /dev/null +++ b/packages/lodestar/src/util/hex.ts @@ -0,0 +1,7 @@ +export function bufferToHex(buffer: Buffer): string { + return "0x" + buffer.toString("hex") +} + +export function hexToBuffer(v: string): Buffer { + return Buffer.from(v.replace("0x", "")) +} \ No newline at end of file From 5737d3cc75843fe137fc86ac4ae53b2ba7b8d177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 5 Feb 2020 20:10:22 +0100 Subject: [PATCH 04/24] add additional validator params --- packages/eth2.0-params/src/interface.ts | 2 ++ packages/eth2.0-params/src/presets/mainnet.ts | 3 +++ packages/eth2.0-params/src/presets/minimal.ts | 2 ++ 3 files changed, 7 insertions(+) diff --git a/packages/eth2.0-params/src/interface.ts b/packages/eth2.0-params/src/interface.ts index 63aa654530a..70fc0f63d17 100644 --- a/packages/eth2.0-params/src/interface.ts +++ b/packages/eth2.0-params/src/interface.ts @@ -13,6 +13,8 @@ export interface IBeaconParams { MIN_GENESIS_TIME: number; MIN_GENESIS_ACTIVE_VALIDATOR_COUNT: number; TARGET_AGGREGATORS_PER_COMMITTEE: number; + RANDOM_SUBNETS_PER_VALIDATOR: number; + EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION: number; // Deposit contract DEPOSIT_CONTRACT_ADDRESS: number; diff --git a/packages/eth2.0-params/src/presets/mainnet.ts b/packages/eth2.0-params/src/presets/mainnet.ts index ca87c8c1804..821d15ffb1f 100644 --- a/packages/eth2.0-params/src/presets/mainnet.ts +++ b/packages/eth2.0-params/src/presets/mainnet.ts @@ -13,6 +13,9 @@ export const SHUFFLE_ROUND_COUNT = 90; export const MIN_GENESIS_ACTIVE_VALIDATOR_COUNT = 2 ** 14; export const MIN_GENESIS_TIME = 1578009600; export const TARGET_AGGREGATORS_PER_COMMITTEE = 16; +export const RANDOM_SUBNETS_PER_VALIDATOR = 1; +export const EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION = 256; + // Deposit contract export const DEPOSIT_CONTRACT_ADDRESS = 0; diff --git a/packages/eth2.0-params/src/presets/minimal.ts b/packages/eth2.0-params/src/presets/minimal.ts index 22eb4862a46..a0789a4986b 100644 --- a/packages/eth2.0-params/src/presets/minimal.ts +++ b/packages/eth2.0-params/src/presets/minimal.ts @@ -13,6 +13,8 @@ export const SHUFFLE_ROUND_COUNT = 10; // CUSTOMIZED export const MIN_GENESIS_TIME = 1578009600; export const MIN_GENESIS_ACTIVE_VALIDATOR_COUNT = 64; export const TARGET_AGGREGATORS_PER_COMMITTEE = 16; +export const RANDOM_SUBNETS_PER_VALIDATOR = 1; +export const EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION = 256; // Deposit contract export const DEPOSIT_CONTRACT_ADDRESS = 0; From 8653461b359e4fb0f76cebd0d25224340c00b2f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 5 Feb 2020 20:14:17 +0100 Subject: [PATCH 05/24] add eth2.0-params CHANGELOG.md --- packages/eth2.0-params/CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 packages/eth2.0-params/CHANGELOG.md diff --git a/packages/eth2.0-params/CHANGELOG.md b/packages/eth2.0-params/CHANGELOG.md new file mode 100644 index 00000000000..47a827ea39f --- /dev/null +++ b/packages/eth2.0-params/CHANGELOG.md @@ -0,0 +1,11 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.2.0] - unreleased +### Added +- `RANDOM_SUBNETS_PER_VALIDATOR` and `EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION` constant params +- TODO: check what else is added \ No newline at end of file From a9705bac68d7cf0d36aecd425942825b6ecaf230 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 5 Feb 2020 20:35:31 +0100 Subject: [PATCH 06/24] add task for joining random interop subnets --- packages/lodestar/src/tasks/index.ts | 20 +++++-- .../tasks/tasks/interopSubnetsJoiningTask.ts | 55 +++++++++++++++++++ 2 files changed, 71 insertions(+), 4 deletions(-) create mode 100644 packages/lodestar/src/tasks/tasks/interopSubnetsJoiningTask.ts diff --git a/packages/lodestar/src/tasks/index.ts b/packages/lodestar/src/tasks/index.ts index 8f005b737c7..f69cf20364e 100644 --- a/packages/lodestar/src/tasks/index.ts +++ b/packages/lodestar/src/tasks/index.ts @@ -8,12 +8,17 @@ import {IBeaconDb} from "../db/api"; import {IBeaconChain} from "../chain"; import {Checkpoint} from "@chainsafe/eth2.0-types"; import {ArchiveBlocksTask} from "./tasks/archiveBlocks"; -import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; +import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; +import {Sync} from "../sync"; +import {InteropSubnetsJoiningTask} from "./tasks/interopSubnetsJoiningTask"; +import {INetwork} from "../network"; export interface ITasksModules { db: IBeaconDb; logger: ILogger; chain: IBeaconChain; + sync: Sync; + network: INetwork; } /** @@ -23,11 +28,10 @@ export interface ITasksModules { export class TasksService implements IService { private readonly config: IBeaconConfig; - private readonly db: IBeaconDb; - private readonly chain: IBeaconChain; - + private readonly sync: Sync; + private readonly network: INetwork; private readonly logger: ILogger; public constructor(config: IBeaconConfig, modules: ITasksModules) { @@ -35,18 +39,26 @@ export class TasksService implements IService { this.db = modules.db; this.chain = modules.chain; this.logger = modules.logger; + this.sync = modules.sync; + this.network = modules.network; } public async start(): Promise { this.chain.on("finalizedCheckpoint", this.handleFinalizedCheckpointChores); + this.sync.on("regularSyncStarted", this.handleRegularSyncStartedTasks); } public async stop(): Promise { this.chain.removeListener("finalizedCheckpoint", this.handleFinalizedCheckpointChores); + this.sync.removeListener("regularSyncStarted", this.handleRegularSyncStartedTasks); } private handleFinalizedCheckpointChores = async (finalizedCheckpoint: Checkpoint): Promise => { new ArchiveBlocksTask(this.config, {db: this.db, logger: this.logger}, finalizedCheckpoint).run(); }; + + private handleRegularSyncStartedTasks = async (): Promise => { + new InteropSubnetsJoiningTask(this.config, {network: this.network}); + }; } \ No newline at end of file diff --git a/packages/lodestar/src/tasks/tasks/interopSubnetsJoiningTask.ts b/packages/lodestar/src/tasks/tasks/interopSubnetsJoiningTask.ts new file mode 100644 index 00000000000..e00e57f58dd --- /dev/null +++ b/packages/lodestar/src/tasks/tasks/interopSubnetsJoiningTask.ts @@ -0,0 +1,55 @@ +import {ITask} from "../interface"; +import {INetwork} from "../../network"; +import {IBeaconConfig} from "@chainsafe/eth2.0-config"; +import {ATTESTATION_SUBNET_COUNT} from "../../constants"; +import {randBetween} from "@chainsafe/eth2.0-utils"; + +export interface IInteropSubnetsJoiningModules { + network: INetwork; +} + +export class InteropSubnetsJoiningTask implements ITask { + + private readonly config: IBeaconConfig; + private readonly network: INetwork; + + public constructor(config: IBeaconConfig, modules: IInteropSubnetsJoiningModules) { + this.config = config; + this.network = modules.network; + } + + public async run(): Promise { + for (let i = 0; i < this.config.params.RANDOM_SUBNETS_PER_VALIDATOR; i++) { + this.subscribeToRandomSubnet(); + } + } + + //TODO: handle cleanup and unsubscribing + + /** + * @return choosen subnet + */ + private subscribeToRandomSubnet(): number { + const subnet = randBetween(0, ATTESTATION_SUBNET_COUNT); + this.network.gossip.subscribeToAttestationSubnet( + subnet, + this.handleWireAttestation + ); + setTimeout( + this.handleChangeSubnets, + this.config.params.EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION * this.config.params.SLOTS_PER_EPOCH * this.config.params.SECONDS_PER_SLOT * 1000, + subnet + ); + return subnet; + } + + private handleChangeSubnets = (subnet: number) => { + this.network.gossip.unsubscribeFromAttestationSubnet(subnet, this.handleWireAttestation); + this.subscribeToRandomSubnet(); + }; + + private handleWireAttestation = () => { + //ignore random committee attestations + }; + +} \ No newline at end of file From e0133c5ea314fee2d56b8ff69fe693ba3c9e578c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 5 Feb 2020 20:36:05 +0100 Subject: [PATCH 07/24] add random number function to eth2 utils --- packages/eth2.0-utils/src/math.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/packages/eth2.0-utils/src/math.ts b/packages/eth2.0-utils/src/math.ts index 32c81341943..80d069fdfbf 100644 --- a/packages/eth2.0-utils/src/math.ts +++ b/packages/eth2.0-utils/src/math.ts @@ -43,3 +43,18 @@ export function bigIntSqrt(n: bigint): bigint { } return x; } + +/** + * Renerates a random integer between min (included) and max (excluded). + */ +export function randBetween(min: number, max: number): number { + return Math.floor(Math.random() * (max - min)) + min; +} + +/** + * Wraps randBetween and returns a bigNumber. + * @returns {bigint} + */ +export function randBetweenBigInt(min: number, max: number): bigint { + return BigInt(randBetween(min, max)); +} \ No newline at end of file From f81eb2c9ccdf618c359ddf51bc9a93f3a7e9c76d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 5 Feb 2020 20:36:30 +0100 Subject: [PATCH 08/24] improve gossip typesafety --- packages/lodestar/src/network/gossip/gossip.ts | 8 ++++---- packages/lodestar/src/network/gossip/interface.ts | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/lodestar/src/network/gossip/gossip.ts b/packages/lodestar/src/network/gossip/gossip.ts index 230ab87a660..5aae6cdc803 100644 --- a/packages/lodestar/src/network/gossip/gossip.ts +++ b/packages/lodestar/src/network/gossip/gossip.ts @@ -117,12 +117,12 @@ export class Gossip extends (EventEmitter as { new(): GossipEventEmitter }) impl this.subscribe(GossipEvent.ATTESTER_SLASHING, callback); } - public subscribeToAttestationSubnet(subnet: string, callback: (block: BeaconBlock) => void): void { - this.subscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet]])); + public subscribeToAttestationSubnet(subnet: number|string, callback: (block: BeaconBlock) => void): void { + this.subscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet.toString()]])); } - public unsubscribeFromAttestationSubnet(subnet: string, callback: (block: BeaconBlock) => void): void { - this.unsubscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet]])); + public unsubscribeFromAttestationSubnet(subnet: number|string, callback: (block: BeaconBlock) => void): void { + this.unsubscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet.toString()]])); } public unsubscribe(event: keyof IGossipEvents, listener: unknown, params: Map = new Map()): void { diff --git a/packages/lodestar/src/network/gossip/interface.ts b/packages/lodestar/src/network/gossip/interface.ts index a80d5da0393..ff3063c371e 100644 --- a/packages/lodestar/src/network/gossip/interface.ts +++ b/packages/lodestar/src/network/gossip/interface.ts @@ -58,8 +58,8 @@ export interface IGossip extends IService { subscribeToVoluntaryExit(callback: (voluntaryExit: VoluntaryExit) => void): void; subscribeToProposerSlashing(callback: (slashing: ProposerSlashing) => void): void; subscribeToAttesterSlashing(callback: (slashing: AttesterSlashing) => void): void; - subscribeToAttestationSubnet(subnet: string, callback: (block: BeaconBlock) => void): void; - unsubscribeFromAttestationSubnet(subnet: string, callback: (block: BeaconBlock) => void): void; + subscribeToAttestationSubnet(subnet: number|string, callback: (block: BeaconBlock) => void): void; + unsubscribeFromAttestationSubnet(subnet: number|string, callback: (block: BeaconBlock) => void): void; unsubscribe(event: keyof IGossipEvents, listener: unknown, params?: Map): void; } From f852d362657e2d05ab882a5c4d5ff4b0f6042a6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Mon, 10 Feb 2020 10:55:54 +0100 Subject: [PATCH 09/24] small fixes --- .../src/db/api/beacon/repositories/blockArchive.ts | 2 +- packages/lodestar/src/network/gossip/gossip.ts | 14 +++++++------- packages/lodestar/src/network/gossip/interface.ts | 4 ++-- packages/lodestar/src/node/nodejs.ts | 2 ++ .../src/tasks/tasks/interopSubnetsJoiningTask.ts | 9 ++++++--- packages/lodestar/src/util/hex.ts | 4 ++-- 6 files changed, 20 insertions(+), 15 deletions(-) diff --git a/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts b/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts index 8236bad55b1..5c4e7f936f4 100644 --- a/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts +++ b/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts @@ -1,4 +1,4 @@ -import {SignedBeaconBlock, Slot} from "@chainsafe/eth2.0-types"; +import {Root, SignedBeaconBlock, Slot} from "@chainsafe/eth2.0-types"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {BulkRepository} from "../repository"; import {IDatabaseController} from "../../../controller"; diff --git a/packages/lodestar/src/network/gossip/gossip.ts b/packages/lodestar/src/network/gossip/gossip.ts index 5aae6cdc803..a2b942a7e54 100644 --- a/packages/lodestar/src/network/gossip/gossip.ts +++ b/packages/lodestar/src/network/gossip/gossip.ts @@ -11,7 +11,7 @@ import Gossipsub from "libp2p-gossipsub"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {ATTESTATION_SUBNET_COUNT} from "../../constants"; import {ILogger, LogLevel} from "@chainsafe/eth2.0-utils/lib/logger"; -import {getAttestationSubnet, getGossipTopic,} from "./utils"; +import {getGossipTopic,} from "./utils"; import {INetworkOptions} from "../options"; import { GossipEventEmitter, @@ -37,9 +37,9 @@ import { AggregateAndProof, Attestation, AttesterSlashing, - BeaconBlock, ProposerSlashing, - VoluntaryExit + SignedBeaconBlock, + SignedVoluntaryExit } from "@chainsafe/eth2.0-types"; export type GossipHandlerFn = (this: Gossip, msg: IGossipMessage) => void; @@ -93,7 +93,7 @@ export class Gossip extends (EventEmitter as { new(): GossipEventEmitter }) impl public publishAttesterSlashing = publishAttesterSlashing; - public subscribeToBlock(callback: (block: BeaconBlock) => void): void { + public subscribeToBlock(callback: (block: SignedBeaconBlock) => void): void { this.subscribe(GossipEvent.BLOCK, callback); } @@ -105,7 +105,7 @@ export class Gossip extends (EventEmitter as { new(): GossipEventEmitter }) impl this.subscribe(GossipEvent.ATTESTATION, callback); } - public subscribeToVoluntaryExit(callback: (voluntaryExit: VoluntaryExit) => void): void { + public subscribeToVoluntaryExit(callback: (signed: SignedVoluntaryExit) => void): void { this.subscribe(GossipEvent.VOLUNTARY_EXIT, callback); } @@ -117,11 +117,11 @@ export class Gossip extends (EventEmitter as { new(): GossipEventEmitter }) impl this.subscribe(GossipEvent.ATTESTER_SLASHING, callback); } - public subscribeToAttestationSubnet(subnet: number|string, callback: (block: BeaconBlock) => void): void { + public subscribeToAttestationSubnet(subnet: number|string, callback: (attestation: Attestation) => void): void { this.subscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet.toString()]])); } - public unsubscribeFromAttestationSubnet(subnet: number|string, callback: (block: BeaconBlock) => void): void { + public unsubscribeFromAttestationSubnet(subnet: number|string, callback: (attestation: Attestation) => void): void { this.unsubscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet.toString()]])); } diff --git a/packages/lodestar/src/network/gossip/interface.ts b/packages/lodestar/src/network/gossip/interface.ts index 05acad8f104..0a8a8bf0939 100644 --- a/packages/lodestar/src/network/gossip/interface.ts +++ b/packages/lodestar/src/network/gossip/interface.ts @@ -58,8 +58,8 @@ export interface IGossip extends IService { subscribeToVoluntaryExit(callback: (voluntaryExit: SignedVoluntaryExit) => void): void; subscribeToProposerSlashing(callback: (slashing: ProposerSlashing) => void): void; subscribeToAttesterSlashing(callback: (slashing: AttesterSlashing) => void): void; - subscribeToAttestationSubnet(subnet: number|string, callback: (block: BeaconBlock) => void): void; - unsubscribeFromAttestationSubnet(subnet: number|string, callback: (block: BeaconBlock) => void): void; + subscribeToAttestationSubnet(subnet: number|string, callback: (attestation: Attestation) => void): void; + unsubscribeFromAttestationSubnet(subnet: number|string, callback: (attestation: Attestation) => void): void; unsubscribe(event: keyof IGossipEvents, listener: unknown, params?: Map): void; } diff --git a/packages/lodestar/src/node/nodejs.ts b/packages/lodestar/src/node/nodejs.ts index e8941a7bd07..04adc9530b7 100644 --- a/packages/lodestar/src/node/nodejs.ts +++ b/packages/lodestar/src/node/nodejs.ts @@ -138,6 +138,8 @@ export class BeaconNode { { db: this.db, chain: this.chain, + sync: this.sync, + network: this.network, logger: this.logger.child(this.conf.logger.chores) } ); diff --git a/packages/lodestar/src/tasks/tasks/interopSubnetsJoiningTask.ts b/packages/lodestar/src/tasks/tasks/interopSubnetsJoiningTask.ts index e00e57f58dd..cbda2b6f47a 100644 --- a/packages/lodestar/src/tasks/tasks/interopSubnetsJoiningTask.ts +++ b/packages/lodestar/src/tasks/tasks/interopSubnetsJoiningTask.ts @@ -37,18 +37,21 @@ export class InteropSubnetsJoiningTask implements ITask { ); setTimeout( this.handleChangeSubnets, - this.config.params.EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION * this.config.params.SLOTS_PER_EPOCH * this.config.params.SECONDS_PER_SLOT * 1000, + this.config.params.EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION + * this.config.params.SLOTS_PER_EPOCH + * this.config.params.SECONDS_PER_SLOT + * 1000, subnet ); return subnet; } - private handleChangeSubnets = (subnet: number) => { + private handleChangeSubnets = (subnet: number): void => { this.network.gossip.unsubscribeFromAttestationSubnet(subnet, this.handleWireAttestation); this.subscribeToRandomSubnet(); }; - private handleWireAttestation = () => { + private handleWireAttestation = (): void => { //ignore random committee attestations }; diff --git a/packages/lodestar/src/util/hex.ts b/packages/lodestar/src/util/hex.ts index 273eb64b37b..a6bf8c2ea66 100644 --- a/packages/lodestar/src/util/hex.ts +++ b/packages/lodestar/src/util/hex.ts @@ -1,7 +1,7 @@ export function bufferToHex(buffer: Buffer): string { - return "0x" + buffer.toString("hex") + return "0x" + buffer.toString("hex"); } export function hexToBuffer(v: string): Buffer { - return Buffer.from(v.replace("0x", "")) + return Buffer.from(v.replace("0x", "")); } \ No newline at end of file From a869a865fbddc4469536f21bda180fb633477d19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Mon, 10 Feb 2020 12:57:17 +0100 Subject: [PATCH 10/24] beacon clock --- packages/lodestar/src/chain/chain.ts | 6 ++ .../lodestar/src/chain/clock/interface.ts | 8 ++ .../src/chain/clock/local/LocalClock.ts | 80 +++++++++++++++++++ .../test/unit/chain/clock/local.test.ts | 33 ++++++++ 4 files changed, 127 insertions(+) create mode 100644 packages/lodestar/src/chain/clock/interface.ts create mode 100644 packages/lodestar/src/chain/clock/local/LocalClock.ts create mode 100644 packages/lodestar/test/unit/chain/clock/local.test.ts diff --git a/packages/lodestar/src/chain/chain.ts b/packages/lodestar/src/chain/chain.ts index bf3415ecc8b..076ad3eaa5e 100644 --- a/packages/lodestar/src/chain/chain.ts +++ b/packages/lodestar/src/chain/chain.ts @@ -42,6 +42,8 @@ import {ProgressiveMerkleTree, toHex} from "@chainsafe/eth2.0-utils"; import {MerkleTreeSerialization} from "../util/serialization"; import {AttestationProcessor} from "./attestation"; import {sleep} from "../util/sleep"; +import {IBeaconClock} from "./clock/interface"; +import {LocalClock} from "./clock/local/LocalClock"; export interface IBeaconChainModules { config: IBeaconConfig; @@ -62,6 +64,7 @@ export class BeaconChain extends (EventEmitter as { new(): ChainEventEmitter }) public chain: string; public _latestState: BeaconState = null; public forkChoice: ILMDGHOST; + public clock: IBeaconClock; public chainId: uint16; public networkId: uint64; @@ -106,12 +109,15 @@ export class BeaconChain extends (EventEmitter as { new(): ChainEventEmitter }) } this.latestState = state; this.logger.info("Chain started, waiting blocks and attestations"); + this.clock = new LocalClock(this.config, this.latestState.genesisTime); + await this.clock.start(); this.isPollingBlocks = true; this.pollBlock(); } public async stop(): Promise { await this.forkChoice.stop(); + await this.clock.stop(); this.eth1.removeListener("block", this.checkGenesis); this.isPollingBlocks = false; this.logger.warn(`Discarding ${this.blockProcessingQueue.size} blocks from queue...`); diff --git a/packages/lodestar/src/chain/clock/interface.ts b/packages/lodestar/src/chain/clock/interface.ts new file mode 100644 index 00000000000..6ef6f26225a --- /dev/null +++ b/packages/lodestar/src/chain/clock/interface.ts @@ -0,0 +1,8 @@ +import {Slot} from "@chainsafe/eth2.0-types"; +import {IService} from "../../node"; + +export interface IBeaconClock extends IService { + getCurrentSlot(): Slot; + onNewSlot(cb: (slot: Slot) => void): void; + onNewEpoch(cb: (slot: Slot) => void): void; +} \ No newline at end of file diff --git a/packages/lodestar/src/chain/clock/local/LocalClock.ts b/packages/lodestar/src/chain/clock/local/LocalClock.ts new file mode 100644 index 00000000000..c823fe788ab --- /dev/null +++ b/packages/lodestar/src/chain/clock/local/LocalClock.ts @@ -0,0 +1,80 @@ +import {IBeaconClock} from "../interface"; +import {IBeaconConfig} from "@chainsafe/eth2.0-config"; +import {computeEpochAtSlot, getCurrentSlot} from "@chainsafe/eth2.0-state-transition"; +import {Epoch, Slot} from "@chainsafe/eth2.0-types"; + +type NewSlotCallback = (slot: Slot) => void; +type NewEpochCallback = (epoch: Epoch) => void; + +export class LocalClock implements IBeaconClock { + + private readonly config: IBeaconConfig; + private readonly genesisTime: number; + private currentSlot: number; + private isRunning: boolean; + private newSlotCallbacks: NewSlotCallback[] = []; + private newEpochCallbacks: NewEpochCallback[] = []; + + + public constructor(config: IBeaconConfig, genesisTime: number) { + this.config = config; + this.genesisTime = genesisTime; + //this assumes clock time is trusted + this.currentSlot = getCurrentSlot(this.config, this.genesisTime); + } + + public async start(): Promise { + this.isRunning = true; + const diffTillNextSlot = this.getDiffTillNextSlot(); + setTimeout( + this.updateSlot, + diffTillNextSlot + ); + } + public async stop(): Promise { + this.isRunning = false; + } + + public getCurrentSlot(): number { + return this.currentSlot; + } + + public onNewEpoch(cb: NewEpochCallback): void { + if(cb) { + this.newEpochCallbacks.push(cb); + } + } + + public onNewSlot(cb: NewSlotCallback): void { + if(cb) { + this.newSlotCallbacks.push(cb); + } + } + + private updateSlot = () => { + if(!this.isRunning) { + return; + } + const previousSlot = this.currentSlot; + this.currentSlot++; + this.newSlotCallbacks.forEach((cb) => { + cb(this.currentSlot); + }); + const currentEpoch = computeEpochAtSlot(this.config, this.currentSlot); + if(computeEpochAtSlot(this.config, previousSlot) < currentEpoch) { + this.newEpochCallbacks.forEach((cb) => { + cb(currentEpoch); + }) + } + //recursively invoke update slot + setTimeout( + this.updateSlot, + this.getDiffTillNextSlot() + ); + }; + + private getDiffTillNextSlot(): number { + const diffInSeconds = (Date.now() / 1000) - this.genesisTime; + return (this.config.params.SECONDS_PER_SLOT - diffInSeconds % this.config.params.SECONDS_PER_SLOT) * 1000; + } +} \ No newline at end of file diff --git a/packages/lodestar/test/unit/chain/clock/local.test.ts b/packages/lodestar/test/unit/chain/clock/local.test.ts new file mode 100644 index 00000000000..1e6b66964df --- /dev/null +++ b/packages/lodestar/test/unit/chain/clock/local.test.ts @@ -0,0 +1,33 @@ +import {describe, it} from "mocha"; +import {LocalClock} from "../../../../src/chain/clock/local/LocalClock"; +import {config} from "@chainsafe/eth2.0-config/lib/presets/minimal"; +import sinon from "sinon"; +import {expect} from "chai"; + +describe("LocalClock", function() { + + const sandbox = sinon.createSandbox(); + + it("Should notify on new slot", async function () { + const realClock = sandbox.useFakeTimers(); + const clock = new LocalClock(config, Math.round(new Date().getTime() / 1000)); + const spy = sinon.spy(); + clock.onNewSlot(spy); + await clock.start(); + realClock.tick(config.params.SECONDS_PER_SLOT * 1000); + expect(spy.calledOnce).to.be.true; + await clock.stop(); + }); + + it("Should notify on new epoch", async function () { + const realClock = sandbox.useFakeTimers(); + const clock = new LocalClock(config, Math.round(new Date().getTime() / 1000)); + const spy = sinon.spy(); + clock.onNewEpoch(spy); + await clock.start(); + realClock.tick(config.params.SLOTS_PER_EPOCH * config.params.SECONDS_PER_SLOT * 1000); + expect(spy.calledOnce).to.be.true; + await clock.stop(); + }) + +}); \ No newline at end of file From 59a5c8b5d0418299eea58392c01e37b77e6f61ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Mon, 10 Feb 2020 13:39:43 +0100 Subject: [PATCH 11/24] committee attestation collector --- .../src/chain/clock/local/LocalClock.ts | 118 +++++++++--------- packages/lodestar/src/chain/interface.ts | 2 + .../src/sync/subnet/attestation-collector.ts | 67 ++++++++++ .../sync/subnet/attestation-collector.test.ts | 74 +++++++++++ 4 files changed, 202 insertions(+), 59 deletions(-) create mode 100644 packages/lodestar/src/sync/subnet/attestation-collector.ts create mode 100644 packages/lodestar/test/unit/sync/subnet/attestation-collector.test.ts diff --git a/packages/lodestar/src/chain/clock/local/LocalClock.ts b/packages/lodestar/src/chain/clock/local/LocalClock.ts index c823fe788ab..bd2d243177d 100644 --- a/packages/lodestar/src/chain/clock/local/LocalClock.ts +++ b/packages/lodestar/src/chain/clock/local/LocalClock.ts @@ -8,73 +8,73 @@ type NewEpochCallback = (epoch: Epoch) => void; export class LocalClock implements IBeaconClock { - private readonly config: IBeaconConfig; - private readonly genesisTime: number; - private currentSlot: number; - private isRunning: boolean; - private newSlotCallbacks: NewSlotCallback[] = []; - private newEpochCallbacks: NewEpochCallback[] = []; + private readonly config: IBeaconConfig; + private readonly genesisTime: number; + private currentSlot: number; + private isRunning: boolean; + private newSlotCallbacks: NewSlotCallback[] = []; + private newEpochCallbacks: NewEpochCallback[] = []; - public constructor(config: IBeaconConfig, genesisTime: number) { - this.config = config; - this.genesisTime = genesisTime; - //this assumes clock time is trusted - this.currentSlot = getCurrentSlot(this.config, this.genesisTime); - } + public constructor(config: IBeaconConfig, genesisTime: number) { + this.config = config; + this.genesisTime = genesisTime; + //this assumes clock time is trusted + this.currentSlot = getCurrentSlot(this.config, this.genesisTime); + } - public async start(): Promise { - this.isRunning = true; - const diffTillNextSlot = this.getDiffTillNextSlot(); - setTimeout( - this.updateSlot, - diffTillNextSlot - ); - } - public async stop(): Promise { - this.isRunning = false; - } + public async start(): Promise { + this.isRunning = true; + const diffTillNextSlot = this.getDiffTillNextSlot(); + setTimeout( + this.updateSlot, + diffTillNextSlot + ); + } + public async stop(): Promise { + this.isRunning = false; + } - public getCurrentSlot(): number { - return this.currentSlot; - } + public getCurrentSlot(): number { + return this.currentSlot; + } - public onNewEpoch(cb: NewEpochCallback): void { - if(cb) { - this.newEpochCallbacks.push(cb); - } + public onNewEpoch(cb: NewEpochCallback): void { + if(cb) { + this.newEpochCallbacks.push(cb); } + } - public onNewSlot(cb: NewSlotCallback): void { - if(cb) { - this.newSlotCallbacks.push(cb); - } + public onNewSlot(cb: NewSlotCallback): void { + if(cb) { + this.newSlotCallbacks.push(cb); } + } - private updateSlot = () => { - if(!this.isRunning) { - return; - } - const previousSlot = this.currentSlot; - this.currentSlot++; - this.newSlotCallbacks.forEach((cb) => { - cb(this.currentSlot); - }); - const currentEpoch = computeEpochAtSlot(this.config, this.currentSlot); - if(computeEpochAtSlot(this.config, previousSlot) < currentEpoch) { - this.newEpochCallbacks.forEach((cb) => { - cb(currentEpoch); - }) - } - //recursively invoke update slot - setTimeout( - this.updateSlot, - this.getDiffTillNextSlot() - ); - }; - - private getDiffTillNextSlot(): number { - const diffInSeconds = (Date.now() / 1000) - this.genesisTime; - return (this.config.params.SECONDS_PER_SLOT - diffInSeconds % this.config.params.SECONDS_PER_SLOT) * 1000; + private updateSlot = () => { + if(!this.isRunning) { + return; + } + const previousSlot = this.currentSlot; + this.currentSlot++; + this.newSlotCallbacks.forEach((cb) => { + cb(this.currentSlot); + }); + const currentEpoch = computeEpochAtSlot(this.config, this.currentSlot); + if(computeEpochAtSlot(this.config, previousSlot) < currentEpoch) { + this.newEpochCallbacks.forEach((cb) => { + cb(currentEpoch); + }); } + //recursively invoke update slot + setTimeout( + this.updateSlot, + this.getDiffTillNextSlot() + ); + }; + + private getDiffTillNextSlot(): number { + const diffInSeconds = (Date.now() / 1000) - this.genesisTime; + return (this.config.params.SECONDS_PER_SLOT - diffInSeconds % this.config.params.SECONDS_PER_SLOT) * 1000; + } } \ No newline at end of file diff --git a/packages/lodestar/src/chain/interface.ts b/packages/lodestar/src/chain/interface.ts index 1ab23886226..a216acf70c7 100644 --- a/packages/lodestar/src/chain/interface.ts +++ b/packages/lodestar/src/chain/interface.ts @@ -7,6 +7,7 @@ import { import {ILMDGHOST} from "./forkChoice"; import StrictEventEmitter from "strict-event-emitter-types"; import {ProgressiveMerkleTree} from "@chainsafe/eth2.0-utils"; +import {IBeaconClock} from "./clock/interface"; export interface IChainEvents { unknownBlockRoot: (root: Root) => void; @@ -26,6 +27,7 @@ export type ChainEventEmitter = StrictEventEmitter; export interface IBeaconChain extends ChainEventEmitter { latestState: BeaconState|null; forkChoice: ILMDGHOST; + clock: IBeaconClock; chainId: uint16; networkId: uint64; /** diff --git a/packages/lodestar/src/sync/subnet/attestation-collector.ts b/packages/lodestar/src/sync/subnet/attestation-collector.ts new file mode 100644 index 00000000000..9268fbff4da --- /dev/null +++ b/packages/lodestar/src/sync/subnet/attestation-collector.ts @@ -0,0 +1,67 @@ +import {IBeaconChain} from "../../chain"; +import {OpPool} from "../../opPool"; +import {IBeaconConfig} from "@chainsafe/eth2.0-config"; +import {Attestation, CommitteeIndex, Slot} from "@chainsafe/eth2.0-types"; +import {IService} from "../../node"; +import {INetwork} from "../../network"; +import {getCommitteeIndexSubnet} from "../../network/gossip/utils"; + +export interface IAttestationCollectorModules { + chain: IBeaconChain; + network: INetwork; + opPool: OpPool; +} + +export class AttestationCollector implements IService { + + private readonly config: IBeaconConfig; + private readonly chain: IBeaconChain; + private readonly network: INetwork; + private readonly opPool: OpPool; + + private aggregationDuties: Map> = new Map(); + + public constructor(config: IBeaconConfig, modules: IAttestationCollectorModules) { + this.config = config; + this.chain = modules.chain; + this.network = modules.network; + this.opPool = modules.opPool; + } + + public async start(): Promise { + this.chain.clock.onNewSlot(this.checkDuties); + } + + public async stop(): Promise { + } + + public subscribeToCommitteeAttestations(slot: Slot, committeeIndex: CommitteeIndex): void { + this.network.gossip.subscribeToAttestationSubnet(getCommitteeIndexSubnet(committeeIndex)); + if(this.aggregationDuties.has(slot)) { + this.aggregationDuties.get(slot).add(committeeIndex); + } else { + this.aggregationDuties.set(slot, new Set([committeeIndex])); + } + } + + private checkDuties = (slot: Slot) => { + const committees = this.aggregationDuties.get(slot) || new Set(); + committees.forEach((committeeIndex) => { + this.network.gossip.subscribeToAttestationSubnet(getCommitteeIndexSubnet(committeeIndex), this.handleCommitteeAttestation); + setTimeout( + this.unsubscribeSubnet, + this.config.params.SECONDS_PER_SLOT * 1000, + getCommitteeIndexSubnet(committeeIndex) + ) + }); + this.aggregationDuties.delete(slot); + }; + + private unsubscribeSubnet = (subnet: number) => { + this.network.gossip.unsubscribeFromAttestationSubnet(subnet, this.handleCommitteeAttestation); + }; + + private handleCommitteeAttestation = async ({attestation}: {attestation: Attestation}) => { + await this.opPool.attestations.receive(attestation); + } +} \ No newline at end of file diff --git a/packages/lodestar/test/unit/sync/subnet/attestation-collector.test.ts b/packages/lodestar/test/unit/sync/subnet/attestation-collector.test.ts new file mode 100644 index 00000000000..d801e136fed --- /dev/null +++ b/packages/lodestar/test/unit/sync/subnet/attestation-collector.test.ts @@ -0,0 +1,74 @@ +import {describe, it} from "mocha"; +import {config} from "@chainsafe/eth2.0-config/lib/presets/minimal"; +import {AttestationCollector} from "../../../../src/sync/subnet/attestation-collector"; +import {LocalClock} from "../../../../src/chain/clock/local/LocalClock"; +import sinon from "sinon"; +import {Gossip} from "../../../../src/network/gossip/gossip"; +import {OpPool} from "../../../../src/opPool"; +import {getCommitteeIndexSubnet} from "../../../../src/network/gossip/utils"; +import { expect } from "chai"; + +describe("Attestation collector",function() { + + const sandbox = sinon.createSandbox(); + + it("should subscribe and collect attestations", async function () { + const clock = sandbox.useFakeTimers(); + const fakeGossip = sandbox.createStubInstance(Gossip); + const fakeOpPool = sandbox.createStubInstance(OpPool); + const realClock = new LocalClock(config, Math.round(new Date().getTime() /1000)); + const collector = new AttestationCollector( + config, + { + // @ts-ignore + chain: { + clock: realClock + }, + // @ts-ignore + network: { + gossip: fakeGossip + }, + // @ts-ignore + opPool: fakeOpPool + } + ); + await realClock.start(); + await collector.start(); + collector.subscribeToCommitteeAttestations(1, 1); + expect(fakeGossip.subscribeToAttestationSubnet.withArgs(getCommitteeIndexSubnet(1)).calledOnce).to.be.true; + clock.tick(config.params.SECONDS_PER_SLOT * 1000); + expect(fakeGossip.subscribeToAttestationSubnet.withArgs(getCommitteeIndexSubnet(1), sinon.match.any).calledOnce).to.be.true; + clock.tick(config.params.SECONDS_PER_SLOT * 1000); + expect(fakeGossip.unsubscribeFromAttestationSubnet.withArgs(getCommitteeIndexSubnet(1), sinon.match.func).calledOnce).to.be.true; + await collector.stop(); + await realClock.stop(); + }); + + it("should skip if there is no duties", async function () { + const clock = sandbox.useFakeTimers(); + const realClock = new LocalClock(config, Math.round(new Date().getTime() /1000)); + const fakeGossip = sandbox.createStubInstance(Gossip); + const collector = new AttestationCollector( + config, + { + // @ts-ignore + chain: { + clock: realClock + }, + // @ts-ignore + network: { + gossip: fakeGossip + }, + // @ts-ignore + opPool: null + } + ); + await realClock.start(); + await collector.start(); + clock.tick(config.params.SECONDS_PER_SLOT * 1000); + expect(fakeGossip.subscribeToAttestationSubnet.called).to.be.false; + await collector.stop(); + await realClock.stop(); + }) + +}); \ No newline at end of file From 8f26aa4f33684fb0be699c78e81acd463d087d8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Tue, 11 Feb 2020 11:03:47 +0100 Subject: [PATCH 12/24] add missing subscription api --- .../src/api/impl/rest/validator/validator.ts | 17 ++++ .../src/api/interface/validators.ts | 4 + .../src/services/attestation.ts | 10 +- .../test/utils/mocks/validator.ts | 6 ++ .../validator/subscribeToCommitteeSubnet.ts | 71 ++++++++++++++ .../src/api/rpc/api/validator/interface.ts | 5 + .../src/api/rpc/api/validator/validator.ts | 38 +++++++- .../lodestar/src/chain/clock/interface.ts | 6 +- .../src/chain/clock/local/LocalClock.ts | 2 +- .../lodestar/src/network/gossip/gossip.ts | 20 ++-- .../lodestar/src/network/gossip/interface.ts | 10 +- packages/lodestar/src/network/gossip/utils.ts | 8 +- packages/lodestar/src/sync/index.ts | 4 +- packages/lodestar/src/sync/regular.ts | 19 +++- .../src/sync/subnet/attestation-collector.ts | 92 ++++++++++--------- 15 files changed, 241 insertions(+), 71 deletions(-) create mode 100644 packages/lodestar/src/api/rest/routes/validator/subscribeToCommitteeSubnet.ts diff --git a/packages/lodestar-validator/src/api/impl/rest/validator/validator.ts b/packages/lodestar-validator/src/api/impl/rest/validator/validator.ts index 9ee2d78e9ad..a530758ce70 100644 --- a/packages/lodestar-validator/src/api/impl/rest/validator/validator.ts +++ b/packages/lodestar-validator/src/api/impl/rest/validator/validator.ts @@ -92,4 +92,21 @@ export class RestValidatorApi implements IValidatorApi { `/duties/${slot}/aggregator?committee_index=${committeeIndex}&slot_signature=${toHex(slotSignature)}` ); } + + public async subscribeCommitteeSubnet( + slot: Slot, + slotSignature: BLSSignature, + committeeIndex: CommitteeIndex, + aggregatorPubkey: BLSPubkey + ): Promise { + return this.client.post( + "/beacon_committee_subscription", + toJson({ + slot, + slotSignature, + committeeIndex, + aggregatorPubkey + }) + ); + } } diff --git a/packages/lodestar-validator/src/api/interface/validators.ts b/packages/lodestar-validator/src/api/interface/validators.ts index 0f118e8441b..0fd90d63cc6 100644 --- a/packages/lodestar-validator/src/api/interface/validators.ts +++ b/packages/lodestar-validator/src/api/interface/validators.ts @@ -50,4 +50,8 @@ export interface IValidatorApi { ): Promise; getWireAttestations(epoch: Epoch, committeeIndex: CommitteeIndex): Promise; + + subscribeCommitteeSubnet( + slot: Slot, slotSignature: BLSSignature, committeeIndex: CommitteeIndex, aggregatorPubkey: BLSPubkey + ): Promise; } diff --git a/packages/lodestar-validator/src/services/attestation.ts b/packages/lodestar-validator/src/services/attestation.ts index 319d35425e9..912f22b6dde 100644 --- a/packages/lodestar-validator/src/services/attestation.ts +++ b/packages/lodestar-validator/src/services/attestation.ts @@ -60,10 +60,11 @@ export class AttestationService { if(attesterDuties && attesterDuties.length === 1 && attesterDuties[0].validatorPubkey.equals(this.publicKey)) { const duty = attesterDuties[0]; const fork = (await this.provider.beacon.getFork()).fork; + const slotSignature = this.getSlotSignature(duty.attestationSlot, fork); const isAggregator = await this.provider.validator.isAggregator( duty.attestationSlot, duty.committeeIndex, - this.getSlotSignature(duty.attestationSlot, fork) + slotSignature ); this.nextAttesterDuties.set( duty.attestationSlot, @@ -72,7 +73,12 @@ export class AttestationService { isAggregator }); if(isAggregator) { - //TODO: subscribe to committee subnet + await this.provider.validator.subscribeCommitteeSubnet( + duty.attestationSlot, + slotSignature, + duty.committeeIndex, + this.publicKey + ); } } }; diff --git a/packages/lodestar-validator/test/utils/mocks/validator.ts b/packages/lodestar-validator/test/utils/mocks/validator.ts index 580d1e555d3..c5bd9a31e17 100644 --- a/packages/lodestar-validator/test/utils/mocks/validator.ts +++ b/packages/lodestar-validator/test/utils/mocks/validator.ts @@ -74,4 +74,10 @@ export class MockValidatorApi implements IValidatorApi { publishBlock(beaconBlock: SignedBeaconBlock): Promise { return undefined; } + + subscribeCommitteeSubnet( + slot: number, slotSignature: Buffer, committeeIndex: number, aggregatorPubkey: Buffer + ): Promise { + return undefined; + } } diff --git a/packages/lodestar/src/api/rest/routes/validator/subscribeToCommitteeSubnet.ts b/packages/lodestar/src/api/rest/routes/validator/subscribeToCommitteeSubnet.ts new file mode 100644 index 00000000000..213744c819c --- /dev/null +++ b/packages/lodestar/src/api/rest/routes/validator/subscribeToCommitteeSubnet.ts @@ -0,0 +1,71 @@ +import {IFastifyServer} from "../../index"; +import fastify, {DefaultBody, DefaultHeaders, DefaultParams, DefaultQuery} from "fastify"; +import {IApiModules} from "../../../interface"; +import {verify} from "@chainsafe/bls"; +import {hexToBuffer} from "../../../../util/hex"; +import {hashTreeRoot} from "@chainsafe/ssz"; +import {getDomain} from "@chainsafe/eth2.0-state-transition"; +import {computeEpochAtSlot, DomainType} from "@chainsafe/lodestar-validator/lib/util"; + + +const opts: fastify.RouteShorthandOptions = { + schema: { + body: { + type: "object", + requiredKeys: ["committee_index", "slot", "slot_signature", "aggregator_pubkey"], + "committee_index": { + type: "string" + }, + "slot": { + type: "string" + }, + "slot_signature": { + type: "string" + }, + "aggregator_pubkey": { + type: "string" + }, + }, + } +}; + +interface IBody extends DefaultBody { + committee_index: string; + slot: string; + slot_signature: string; + aggregator_pubkey: string; +} + +export const registerSubscribeToCommitteeSubnet = (fastify: IFastifyServer, modules: IApiModules): void => { + fastify.post( + "/beacon_committee_subscription", + opts, + async (request, reply) => { + const slot = Number(request.body.slot); + const valid = verify( + hexToBuffer(request.body.aggregator_pubkey), + hashTreeRoot(modules.config.types.Slot, slot), + hexToBuffer(request.body.slot_signature), + getDomain( + modules.config, + modules.chain.latestState, + DomainType.BEACON_ATTESTER, + computeEpochAtSlot(modules.config, slot)) + ); + if(!valid) { + reply + .code(403) + .send(); + return; + } + modules.sync.regularSync.collectAttestations( + slot, + Number(request.body.committee_index) + ); + reply + .code(200) + .type("application/json") + .send(); + } + ); +}; diff --git a/packages/lodestar/src/api/rpc/api/validator/interface.ts b/packages/lodestar/src/api/rpc/api/validator/interface.ts index 62c757f1c12..759be021c5e 100644 --- a/packages/lodestar/src/api/rpc/api/validator/interface.ts +++ b/packages/lodestar/src/api/rpc/api/validator/interface.ts @@ -57,4 +57,9 @@ export interface IValidatorApi extends IApi { ): Promise; getWireAttestations(epoch: Epoch, committeeIndex: CommitteeIndex): Promise; + + subscribeCommitteeSubnet( + slot: Slot, slotSignature: BLSSignature, committeeIndex: CommitteeIndex, aggregatorPubkey: BLSPubkey + ): Promise; + } diff --git a/packages/lodestar/src/api/rpc/api/validator/validator.ts b/packages/lodestar/src/api/rpc/api/validator/validator.ts index a1925ee7050..e8394d4d0db 100644 --- a/packages/lodestar/src/api/rpc/api/validator/validator.ts +++ b/packages/lodestar/src/api/rpc/api/validator/validator.ts @@ -11,9 +11,9 @@ import { bytes96, CommitteeIndex, Epoch, + SignedBeaconBlock, Slot, - ValidatorDuty, - SignedBeaconBlock + ValidatorDuty } from "@chainsafe/eth2.0-types"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; @@ -26,9 +26,13 @@ import {IEth1Notifier} from "../../../../eth1"; import {getAttesterDuties, getEpochProposers, produceAttestation, publishAttestation} from "../../../impl/validator"; import {ApiNamespace, IApiModules} from "../../../index"; import {IApiOptions} from "../../../options"; -import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; +import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; import {INetwork} from "../../../../network"; -import {isAggregator} from "@chainsafe/eth2.0-state-transition"; +import {getDomain, isAggregator} from "@chainsafe/eth2.0-state-transition"; +import {verify} from "@chainsafe/bls"; +import {hashTreeRoot} from "@chainsafe/ssz"; +import {computeEpochAtSlot, DomainType} from "@chainsafe/lodestar-validator/lib/util"; +import {Sync} from "../../../../sync"; export class ValidatorApi implements IValidatorApi { @@ -38,19 +42,21 @@ export class ValidatorApi implements IValidatorApi { private chain: IBeaconChain; private db: IBeaconDb; private network: INetwork; + private sync: Sync; private opPool: OpPool; private eth1: IEth1Notifier; private logger: ILogger; public constructor( opts: Partial, - modules: Pick + modules: Pick ) { this.namespace = ApiNamespace.VALIDATOR; this.config = modules.config; this.chain = modules.chain; this.db = modules.db; this.network = modules.network; + this.sync = modules.sync; this.logger = modules.logger; this.opPool = modules.opPool; this.eth1 = modules.eth1; @@ -118,4 +124,26 @@ export class ValidatorApi implements IValidatorApi { return isAggregator(this.config, await this.db.state.getLatest(), slot, committeeIndex, slotSignature); } + public async subscribeCommitteeSubnet( + slot: Slot, slotSignature: BLSSignature, committeeIndex: CommitteeIndex, aggregatorPubkey: BLSPubkey + ): Promise { + const valid = verify( + aggregatorPubkey, + hashTreeRoot(this.config.types.Slot, slot), + slotSignature, + getDomain( + this.config, + this.chain.latestState, + DomainType.BEACON_ATTESTER, + computeEpochAtSlot(this.config, slot)) + ); + if(!valid) { + throw new Error("Ivalid slot signature"); + } + this.sync.regularSync.collectAttestations( + slot, + committeeIndex + ); + } + } diff --git a/packages/lodestar/src/chain/clock/interface.ts b/packages/lodestar/src/chain/clock/interface.ts index 6ef6f26225a..3452d616e5f 100644 --- a/packages/lodestar/src/chain/clock/interface.ts +++ b/packages/lodestar/src/chain/clock/interface.ts @@ -2,7 +2,7 @@ import {Slot} from "@chainsafe/eth2.0-types"; import {IService} from "../../node"; export interface IBeaconClock extends IService { - getCurrentSlot(): Slot; - onNewSlot(cb: (slot: Slot) => void): void; - onNewEpoch(cb: (slot: Slot) => void): void; + getCurrentSlot(): Slot; + onNewSlot(cb: (slot: Slot) => void): void; + onNewEpoch(cb: (slot: Slot) => void): void; } \ No newline at end of file diff --git a/packages/lodestar/src/chain/clock/local/LocalClock.ts b/packages/lodestar/src/chain/clock/local/LocalClock.ts index bd2d243177d..22f1d5c2b69 100644 --- a/packages/lodestar/src/chain/clock/local/LocalClock.ts +++ b/packages/lodestar/src/chain/clock/local/LocalClock.ts @@ -51,7 +51,7 @@ export class LocalClock implements IBeaconClock { } } - private updateSlot = () => { + private updateSlot = (): void => { if(!this.isRunning) { return; } diff --git a/packages/lodestar/src/network/gossip/gossip.ts b/packages/lodestar/src/network/gossip/gossip.ts index a2b942a7e54..f718a96393d 100644 --- a/packages/lodestar/src/network/gossip/gossip.ts +++ b/packages/lodestar/src/network/gossip/gossip.ts @@ -117,26 +117,34 @@ export class Gossip extends (EventEmitter as { new(): GossipEventEmitter }) impl this.subscribe(GossipEvent.ATTESTER_SLASHING, callback); } - public subscribeToAttestationSubnet(subnet: number|string, callback: (attestation: Attestation) => void): void { + public subscribeToAttestationSubnet( + subnet: number|string, callback?: (attestation: {attestation: Attestation; subnet: number}) => void + ): void { this.subscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet.toString()]])); } - public unsubscribeFromAttestationSubnet(subnet: number|string, callback: (attestation: Attestation) => void): void { + public unsubscribeFromAttestationSubnet( + subnet: number|string, callback?: (attestation: {attestation: Attestation; subnet: number}) => void + ): void { this.unsubscribe(GossipEvent.ATTESTATION_SUBNET, callback, new Map([["subnet", subnet.toString()]])); } - public unsubscribe(event: keyof IGossipEvents, listener: unknown, params: Map = new Map()): void { + public unsubscribe(event: keyof IGossipEvents, listener?: unknown, params: Map = new Map()): void { if(this.listenerCount(event) === 1 && !event.startsWith("gossipsub")) { this.pubsub.unsubscribe(getGossipTopic(event as GossipEvent, "ssz", params)); } - this.removeListener(event, listener as (...args: unknown[]) => void); + if(listener) { + this.removeListener(event, listener as (...args: unknown[]) => void); + } } - private subscribe(event: keyof IGossipEvents, listener: unknown, params: Map = new Map()): void { + private subscribe(event: keyof IGossipEvents, listener?: unknown, params: Map = new Map()): void { if(this.listenerCount(event) === 0 && !event.startsWith("gossipsub")) { this.pubsub.subscribe(getGossipTopic(event as GossipEvent, "ssz", params)); } - this.on(event, listener as (...args: unknown[]) => void); + if(listener) { + this.on(event, listener as (...args: unknown[]) => void); + } } private registerHandlers(): Map { diff --git a/packages/lodestar/src/network/gossip/interface.ts b/packages/lodestar/src/network/gossip/interface.ts index 0a8a8bf0939..33e9c9c5106 100644 --- a/packages/lodestar/src/network/gossip/interface.ts +++ b/packages/lodestar/src/network/gossip/interface.ts @@ -58,8 +58,14 @@ export interface IGossip extends IService { subscribeToVoluntaryExit(callback: (voluntaryExit: SignedVoluntaryExit) => void): void; subscribeToProposerSlashing(callback: (slashing: ProposerSlashing) => void): void; subscribeToAttesterSlashing(callback: (slashing: AttesterSlashing) => void): void; - subscribeToAttestationSubnet(subnet: number|string, callback: (attestation: Attestation) => void): void; - unsubscribeFromAttestationSubnet(subnet: number|string, callback: (attestation: Attestation) => void): void; + subscribeToAttestationSubnet( + subnet: number|string, + callback?: (attestation: {attestation: Attestation; subnet: number}) => void + ): void; + unsubscribeFromAttestationSubnet( + subnet: number|string, + callback?: (attestation: {attestation: Attestation; subnet: number}) => void + ): void; unsubscribe(event: keyof IGossipEvents, listener: unknown, params?: Map): void; } diff --git a/packages/lodestar/src/network/gossip/utils.ts b/packages/lodestar/src/network/gossip/utils.ts index be0cb121c11..affc2b3189a 100644 --- a/packages/lodestar/src/network/gossip/utils.ts +++ b/packages/lodestar/src/network/gossip/utils.ts @@ -3,7 +3,7 @@ */ import {ATTESTATION_SUBNET_COUNT, GOSSIP_MAX_SIZE} from "../../constants"; -import {Attestation} from "@chainsafe/eth2.0-types"; +import {Attestation, CommitteeIndex} from "@chainsafe/eth2.0-types"; import {IGossipMessage} from "./interface"; import assert from "assert"; import {AnySSZType, deserialize} from "@chainsafe/ssz"; @@ -26,7 +26,11 @@ export function getAttestationSubnetTopic(attestation: Attestation, encoding = " } export function getAttestationSubnet(attestation: Attestation): string { - return String(attestation.data.index % ATTESTATION_SUBNET_COUNT); + return getCommitteeIndexSubnet(attestation.data.index); +} + +export function getCommitteeIndexSubnet(committeeIndex: CommitteeIndex): string { + return String(committeeIndex % ATTESTATION_SUBNET_COUNT); } export function deserializeGossipMessage(msg: IGossipMessage, type: AnySSZType): T { diff --git a/packages/lodestar/src/sync/index.ts b/packages/lodestar/src/sync/index.ts index a331ad637a3..522ba4f974b 100644 --- a/packages/lodestar/src/sync/index.ts +++ b/packages/lodestar/src/sync/index.ts @@ -32,6 +32,9 @@ export interface ISyncModules { * The strategy may differ depending on whether the chain is synced or not */ export class Sync extends EventEmitter { + + public regularSync: RegularSync; + private opts: ISyncOptions; private config: IBeaconConfig; private chain: IBeaconChain; @@ -42,7 +45,6 @@ export class Sync extends EventEmitter { private logger: ILogger; //array of valid peers (peer on same fork) private peers: PeerInfo[] = []; - private regularSync: RegularSync; private initialSync: InitialSync; private waitingForPeer = true; diff --git a/packages/lodestar/src/sync/regular.ts b/packages/lodestar/src/sync/regular.ts index ad7762e1682..6efc7006185 100644 --- a/packages/lodestar/src/sync/regular.ts +++ b/packages/lodestar/src/sync/regular.ts @@ -10,7 +10,7 @@ import { AggregateAndProof, Root, SignedBeaconBlock, - SignedVoluntaryExit, + SignedVoluntaryExit, CommitteeIndex, Slot, } from "@chainsafe/eth2.0-types"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {IBeaconDb} from "../db"; @@ -21,6 +21,7 @@ import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; import {ISyncModules} from "./index"; import {ISyncOptions} from "./options"; import {GossipEvent} from "../network/gossip/constants"; +import {AttestationCollector} from "./subnet/attestation-collector"; export type IRegularSyncModules = Pick; @@ -29,6 +30,7 @@ export class RegularSync { private db: IBeaconDb; private chain: IBeaconChain; private network: INetwork; + private attestationCollector: AttestationCollector; private opPool: OpPool; private logger: ILogger; @@ -39,10 +41,15 @@ export class RegularSync { this.network = modules.network; this.opPool = modules.opPool; this.logger = modules.logger; + this.attestationCollector = new AttestationCollector( + this.config, + {chain: this.chain, network: this.network, opPool: this.opPool} + ); } public async start(): Promise { this.logger.verbose("regular sync start"); + await this.attestationCollector.start(); this.network.gossip.subscribeToBlock(this.receiveBlock); this.network.gossip.subscribeToAggregateAndProof(this.receiveAggregateAndProof); // For interop only, will be removed prior to mainnet @@ -58,6 +65,7 @@ export class RegularSync { public async stop(): Promise { this.logger.verbose("regular sync stop"); + await this.attestationCollector.stop(); this.network.gossip.unsubscribe(GossipEvent.BLOCK, this.receiveBlock); this.network.gossip.unsubscribe(GossipEvent.AGGREGATE_AND_PROOF, this.receiveAggregateAndProof); this.network.gossip.unsubscribe(GossipEvent.ATTESTATION, this.receiveAttestation); @@ -85,10 +93,7 @@ export class RegularSync { public receiveAttestation = async (attestation: Attestation): Promise => { // send attestation on to other modules - await Promise.all([ - this.opPool.attestations.receive(attestation), - this.chain.receiveAttestation(attestation), - ]); + await this.opPool.attestations.receive(attestation); }; public receiveVoluntaryExit = async (voluntaryExit: SignedVoluntaryExit): Promise => { @@ -103,6 +108,10 @@ export class RegularSync { await this.opPool.attesterSlashings.receive(attesterSlashing); }; + public collectAttestations(slot: Slot, committeeIndex: CommitteeIndex): void { + this.attestationCollector.subscribeToCommitteeAttestations(slot, committeeIndex); + } + private onProcessedBlock = (signedBlock: SignedBeaconBlock): void => { this.network.gossip.publishBlock(signedBlock); }; diff --git a/packages/lodestar/src/sync/subnet/attestation-collector.ts b/packages/lodestar/src/sync/subnet/attestation-collector.ts index 9268fbff4da..bdc5f7ad988 100644 --- a/packages/lodestar/src/sync/subnet/attestation-collector.ts +++ b/packages/lodestar/src/sync/subnet/attestation-collector.ts @@ -7,61 +7,65 @@ import {INetwork} from "../../network"; import {getCommitteeIndexSubnet} from "../../network/gossip/utils"; export interface IAttestationCollectorModules { - chain: IBeaconChain; - network: INetwork; - opPool: OpPool; + chain: IBeaconChain; + network: INetwork; + opPool: OpPool; } export class AttestationCollector implements IService { - private readonly config: IBeaconConfig; - private readonly chain: IBeaconChain; - private readonly network: INetwork; - private readonly opPool: OpPool; + private readonly config: IBeaconConfig; + private readonly chain: IBeaconChain; + private readonly network: INetwork; + private readonly opPool: OpPool; - private aggregationDuties: Map> = new Map(); + private aggregationDuties: Map> = new Map(); - public constructor(config: IBeaconConfig, modules: IAttestationCollectorModules) { - this.config = config; - this.chain = modules.chain; - this.network = modules.network; - this.opPool = modules.opPool; - } + public constructor(config: IBeaconConfig, modules: IAttestationCollectorModules) { + this.config = config; + this.chain = modules.chain; + this.network = modules.network; + this.opPool = modules.opPool; + } - public async start(): Promise { - this.chain.clock.onNewSlot(this.checkDuties); - } + public async start(): Promise { + this.chain.clock.onNewSlot(this.checkDuties); + } - public async stop(): Promise { - } + // eslint-disable-next-line @typescript-eslint/no-empty-function + public async stop(): Promise { + } - public subscribeToCommitteeAttestations(slot: Slot, committeeIndex: CommitteeIndex): void { - this.network.gossip.subscribeToAttestationSubnet(getCommitteeIndexSubnet(committeeIndex)); - if(this.aggregationDuties.has(slot)) { - this.aggregationDuties.get(slot).add(committeeIndex); - } else { - this.aggregationDuties.set(slot, new Set([committeeIndex])); - } + public subscribeToCommitteeAttestations(slot: Slot, committeeIndex: CommitteeIndex): void { + this.network.gossip.subscribeToAttestationSubnet(getCommitteeIndexSubnet(committeeIndex)); + if(this.aggregationDuties.has(slot)) { + this.aggregationDuties.get(slot).add(committeeIndex); + } else { + this.aggregationDuties.set(slot, new Set([committeeIndex])); } + } - private checkDuties = (slot: Slot) => { - const committees = this.aggregationDuties.get(slot) || new Set(); - committees.forEach((committeeIndex) => { - this.network.gossip.subscribeToAttestationSubnet(getCommitteeIndexSubnet(committeeIndex), this.handleCommitteeAttestation); - setTimeout( - this.unsubscribeSubnet, - this.config.params.SECONDS_PER_SLOT * 1000, - getCommitteeIndexSubnet(committeeIndex) - ) - }); - this.aggregationDuties.delete(slot); - }; + private checkDuties = (slot: Slot): void => { + const committees = this.aggregationDuties.get(slot) || new Set(); + committees.forEach((committeeIndex) => { + this.network.gossip.subscribeToAttestationSubnet( + getCommitteeIndexSubnet(committeeIndex), + this.handleCommitteeAttestation + ); + setTimeout( + this.unsubscribeSubnet, + this.config.params.SECONDS_PER_SLOT * 1000, + getCommitteeIndexSubnet(committeeIndex) + ); + }); + this.aggregationDuties.delete(slot); + }; - private unsubscribeSubnet = (subnet: number) => { - this.network.gossip.unsubscribeFromAttestationSubnet(subnet, this.handleCommitteeAttestation); - }; + private unsubscribeSubnet = (subnet: number): void => { + this.network.gossip.unsubscribeFromAttestationSubnet(subnet, this.handleCommitteeAttestation); + }; - private handleCommitteeAttestation = async ({attestation}: {attestation: Attestation}) => { - await this.opPool.attestations.receive(attestation); - } + private handleCommitteeAttestation = async ({attestation}: {attestation: Attestation}): Promise => { + await this.opPool.attestations.receive(attestation); + }; } \ No newline at end of file From 0719a378ce1257affd62862fb69c4d8d129a74b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Tue, 11 Feb 2020 12:14:18 +0100 Subject: [PATCH 13/24] fix test --- .../api/rpc/api/validator/validator.test.ts | 6 ++++- .../test/unit/eth1/dev/network.test.ts | 22 +++++++++---------- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/lodestar/test/unit/api/rpc/api/validator/validator.test.ts b/packages/lodestar/test/unit/api/rpc/api/validator/validator.test.ts index b4d601d2266..d43e2e12011 100644 --- a/packages/lodestar/test/unit/api/rpc/api/validator/validator.test.ts +++ b/packages/lodestar/test/unit/api/rpc/api/validator/validator.test.ts @@ -44,7 +44,11 @@ describe("validator rpc api", function () { opStub.attestations = sandbox.createStubInstance(AttestationOperations); validatorApi = new ValidatorApi( {}, - {config, chain: chainStub, db: dbStub, opPool: opStub, network: networkStub, eth1: eth1Stub, logger: logger} + { + config, chain: chainStub, db: dbStub, + opPool: opStub, sync: null, network: networkStub, + eth1: eth1Stub, logger: logger + } ); }); diff --git a/packages/lodestar/test/unit/eth1/dev/network.test.ts b/packages/lodestar/test/unit/eth1/dev/network.test.ts index e2660d69973..a15f81c48e1 100644 --- a/packages/lodestar/test/unit/eth1/dev/network.test.ts +++ b/packages/lodestar/test/unit/eth1/dev/network.test.ts @@ -5,9 +5,9 @@ import {expect} from "chai"; import {WinstonLogger} from "@chainsafe/eth2.0-utils/lib/logger"; -describe('Eth1 dev network', () => { +describe("Eth1 dev network", () => { - let logger: WinstonLogger = new WinstonLogger(); + const logger: WinstonLogger = new WinstonLogger(); before(() => { logger.silent = true; @@ -17,11 +17,11 @@ describe('Eth1 dev network', () => { logger.silent = false; }); - it('should start as configured', async () => { + it("should start as configured", async () => { const network = new PrivateEth1Network({ - host: '127.0.0.1', + host: "127.0.0.1", port: 34568, - mnemonic: 'test', + mnemonic: "test", defaultBalance: 1400 } , @@ -32,18 +32,18 @@ describe('Eth1 dev network', () => { const accountBalance = await new Wallet( network.accounts()[9], new ethers.providers.JsonRpcProvider(network.rpcUrl())).getBalance(); - expect(accountBalance.gt(ethers.utils.parseEther('1300'))).to.be.true; - expect(network.rpcUrl()).to.be.equal('http://127.0.0.1:34568'); - expect(network.mnemonic()).to.be.equal('test'); + expect(accountBalance.gt(ethers.utils.parseEther("1300"))).to.be.true; + expect(network.rpcUrl()).to.be.equal("http://127.0.0.1:34568"); + expect(network.mnemonic()).to.be.equal("test"); expect(network.accounts().length).to.be.equal(10); await network.stop(); }); - it('should deploy deposit contract', async () => { + it("should deploy deposit contract", async () => { const network = new PrivateEth1Network({ - host: '127.0.0.1', + host: "127.0.0.1", port: 34567, - mnemonic: 'test', + mnemonic: "test", defaultBalance: 1400 }, { From 938cbcb804db4e59913fb70df8806f74bdec080c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Tue, 11 Feb 2020 14:02:05 +0100 Subject: [PATCH 14/24] fix unit tests --- packages/lodestar/test/unit/eth1/dev/network.test.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/lodestar/test/unit/eth1/dev/network.test.ts b/packages/lodestar/test/unit/eth1/dev/network.test.ts index a15f81c48e1..5df2e618082 100644 --- a/packages/lodestar/test/unit/eth1/dev/network.test.ts +++ b/packages/lodestar/test/unit/eth1/dev/network.test.ts @@ -29,9 +29,11 @@ describe("Eth1 dev network", () => { logger, }); await network.start(); - const accountBalance = await new Wallet( - network.accounts()[9], - new ethers.providers.JsonRpcProvider(network.rpcUrl())).getBalance(); + const accountBalance = await ( + new Wallet( + network.accounts()[9], + new ethers.providers.JsonRpcProvider(network.rpcUrl())) + ).getBalance(); expect(accountBalance.gt(ethers.utils.parseEther("1300"))).to.be.true; expect(network.rpcUrl()).to.be.equal("http://127.0.0.1:34568"); expect(network.mnemonic()).to.be.equal("test"); @@ -42,7 +44,7 @@ describe("Eth1 dev network", () => { it("should deploy deposit contract", async () => { const network = new PrivateEth1Network({ host: "127.0.0.1", - port: 34567, + port: 0, mnemonic: "test", defaultBalance: 1400 }, From a39bc866893d313c7da12241b0fdf98fccfdef40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 12 Feb 2020 09:34:04 +0100 Subject: [PATCH 15/24] disable private eth1 tests as they have some interference --- packages/lodestar/test/unit/eth1/dev/network.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/lodestar/test/unit/eth1/dev/network.test.ts b/packages/lodestar/test/unit/eth1/dev/network.test.ts index 5df2e618082..63e03aebc8e 100644 --- a/packages/lodestar/test/unit/eth1/dev/network.test.ts +++ b/packages/lodestar/test/unit/eth1/dev/network.test.ts @@ -4,8 +4,8 @@ import * as ethers from "ethers/ethers"; import {expect} from "chai"; import {WinstonLogger} from "@chainsafe/eth2.0-utils/lib/logger"; - -describe("Eth1 dev network", () => { +//Tests failing when run in group, works when run individually +describe.skip("Eth1 dev network", () => { const logger: WinstonLogger = new WinstonLogger(); From 61607982d57aa7f54a5b8b285073b4060c14afaf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Mon, 17 Feb 2020 14:45:37 +0100 Subject: [PATCH 16/24] address PR comments --- packages/eth2.0-utils/src/math.ts | 2 +- .../lodestar/src/chain/clock/interface.ts | 11 ++- .../src/chain/clock/local/LocalClock.ts | 39 +++++----- packages/lodestar/src/sync/regular.ts | 15 ++-- .../lodestar/test/unit/sync/regular.test.ts | 76 ++++++++++--------- 5 files changed, 75 insertions(+), 68 deletions(-) diff --git a/packages/eth2.0-utils/src/math.ts b/packages/eth2.0-utils/src/math.ts index 80d069fdfbf..e4f0bec5b23 100644 --- a/packages/eth2.0-utils/src/math.ts +++ b/packages/eth2.0-utils/src/math.ts @@ -45,7 +45,7 @@ export function bigIntSqrt(n: bigint): bigint { } /** - * Renerates a random integer between min (included) and max (excluded). + * Regenerates a random integer between min (included) and max (excluded). */ export function randBetween(min: number, max: number): number { return Math.floor(Math.random() * (max - min)) + min; diff --git a/packages/lodestar/src/chain/clock/interface.ts b/packages/lodestar/src/chain/clock/interface.ts index 3452d616e5f..e3cc0192c6f 100644 --- a/packages/lodestar/src/chain/clock/interface.ts +++ b/packages/lodestar/src/chain/clock/interface.ts @@ -1,8 +1,13 @@ -import {Slot} from "@chainsafe/eth2.0-types"; +import {Epoch, Slot} from "@chainsafe/eth2.0-types"; import {IService} from "../../node"; +export type NewSlotCallback = (slot: Slot) => void; +export type NewEpochCallback = (epoch: Epoch) => void; + export interface IBeaconClock extends IService { getCurrentSlot(): Slot; - onNewSlot(cb: (slot: Slot) => void): void; - onNewEpoch(cb: (slot: Slot) => void): void; + onNewSlot(cb: NewSlotCallback): void; + onNewEpoch(cb: NewEpochCallback): void; + unsubscribeFromNewEpoch(cb: NewEpochCallback): void; + unsubscribeFromNewSlot(cb: NewSlotCallback): void; } \ No newline at end of file diff --git a/packages/lodestar/src/chain/clock/local/LocalClock.ts b/packages/lodestar/src/chain/clock/local/LocalClock.ts index 22f1d5c2b69..96541728ecb 100644 --- a/packages/lodestar/src/chain/clock/local/LocalClock.ts +++ b/packages/lodestar/src/chain/clock/local/LocalClock.ts @@ -1,22 +1,18 @@ -import {IBeaconClock} from "../interface"; +import {IBeaconClock, NewEpochCallback, NewSlotCallback} from "../interface"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {computeEpochAtSlot, getCurrentSlot} from "@chainsafe/eth2.0-state-transition"; -import {Epoch, Slot} from "@chainsafe/eth2.0-types"; +import {EventEmitter} from "events"; -type NewSlotCallback = (slot: Slot) => void; -type NewEpochCallback = (epoch: Epoch) => void; - -export class LocalClock implements IBeaconClock { +export class LocalClock extends EventEmitter implements IBeaconClock { private readonly config: IBeaconConfig; private readonly genesisTime: number; private currentSlot: number; private isRunning: boolean; - private newSlotCallbacks: NewSlotCallback[] = []; - private newEpochCallbacks: NewEpochCallback[] = []; - + private timeoutId: NodeJS.Timeout; public constructor(config: IBeaconConfig, genesisTime: number) { + super(); this.config = config; this.genesisTime = genesisTime; //this assumes clock time is trusted @@ -33,6 +29,7 @@ export class LocalClock implements IBeaconClock { } public async stop(): Promise { this.isRunning = false; + clearTimeout(this.timeoutId); } public getCurrentSlot(): number { @@ -40,15 +37,19 @@ export class LocalClock implements IBeaconClock { } public onNewEpoch(cb: NewEpochCallback): void { - if(cb) { - this.newEpochCallbacks.push(cb); - } + this.on("epoch", cb); } public onNewSlot(cb: NewSlotCallback): void { - if(cb) { - this.newSlotCallbacks.push(cb); - } + this.on("slot", cb); + } + + public unsubscribeFromNewEpoch(cb: NewEpochCallback): void { + this.removeListener("epoch", cb); + } + + public unsubscribeFromNewSlot(cb: NewSlotCallback): void { + this.removeListener("slot", cb); } private updateSlot = (): void => { @@ -57,14 +58,10 @@ export class LocalClock implements IBeaconClock { } const previousSlot = this.currentSlot; this.currentSlot++; - this.newSlotCallbacks.forEach((cb) => { - cb(this.currentSlot); - }); + this.emit("slot", this.currentSlot); const currentEpoch = computeEpochAtSlot(this.config, this.currentSlot); if(computeEpochAtSlot(this.config, previousSlot) < currentEpoch) { - this.newEpochCallbacks.forEach((cb) => { - cb(currentEpoch); - }); + this.emit("epoch", currentEpoch); } //recursively invoke update slot setTimeout( diff --git a/packages/lodestar/src/sync/regular.ts b/packages/lodestar/src/sync/regular.ts index 6efc7006185..e0ef3f98eb0 100644 --- a/packages/lodestar/src/sync/regular.ts +++ b/packages/lodestar/src/sync/regular.ts @@ -3,21 +3,23 @@ */ import { + AggregateAndProof, Attestation, + AttesterSlashing, Checkpoint, + CommitteeIndex, ProposerSlashing, - AttesterSlashing, - AggregateAndProof, Root, SignedBeaconBlock, - SignedVoluntaryExit, CommitteeIndex, Slot, + SignedVoluntaryExit, + Slot, } from "@chainsafe/eth2.0-types"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {IBeaconDb} from "../db"; import {IBeaconChain} from "../chain"; import {INetwork} from "../network"; import {OpPool} from "../opPool"; -import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; +import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; import {ISyncModules} from "./index"; import {ISyncOptions} from "./options"; import {GossipEvent} from "../network/gossip/constants"; @@ -82,11 +84,6 @@ export class RegularSync { await this.chain.receiveBlock(signedBlock); }; - public receiveCommitteeAttestation = async (attestationSubnet: {attestation: Attestation; subnet: number}): - Promise => { - await this.opPool.attestations.receive(attestationSubnet.attestation); - }; - public receiveAggregateAndProof = async (aggregate: AggregateAndProof): Promise => { await this.opPool.aggregateAndProofs.receive(aggregate); }; diff --git a/packages/lodestar/test/unit/sync/regular.test.ts b/packages/lodestar/test/unit/sync/regular.test.ts index 1860366233b..40cfaa7db31 100644 --- a/packages/lodestar/test/unit/sync/regular.test.ts +++ b/packages/lodestar/test/unit/sync/regular.test.ts @@ -1,27 +1,47 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ import sinon from "sinon"; import {expect} from "chai"; import {config} from "@chainsafe/eth2.0-config/lib/presets/mainnet"; - import * as attestationUtils from "@chainsafe/eth2.0-state-transition/lib/util/attestation"; import * as validatorStatusUtils from "@chainsafe/eth2.0-state-transition/lib/util/validatorStatus"; - import {BeaconChain} from "../../../src/chain"; import {Libp2pNetwork} from "../../../src/network"; -import {AttestationOperations, OpPool, VoluntaryExitOperations, AggregateAndProofOperations} from "../../../src/opPool"; +import { + AggregateAndProofOperations, + AttestationOperations, + AttesterSlashingOperations, + OpPool, + ProposerSlashingOperations, + VoluntaryExitOperations +} from "../../../src/opPool"; import {WinstonLogger} from "@chainsafe/eth2.0-utils/lib/logger"; import {RegularSync} from "../../../src/sync/regular"; -import {generateState} from "../../utils/state"; import {generateEmptySignedBlock} from "../../utils/block"; -import {generateEmptyAttestation, generateEmptyAggregateAndProof, generateEmptySignedVoluntaryExit} from "../../utils/attestation"; -import {AttestationRepository, BlockRepository, StateRepository, VoluntaryExitRepository, ProposerSlashingRepository, AttesterSlashingRepository} from "../../../src/db/api/beacon/repositories"; -import {generateEmptyProposerSlashing, generateEmptyAttesterSlashing} from "@chainsafe/eth2.0-state-transition/test/utils/slashings"; -import {ProposerSlashingOperations, AttesterSlashingOperations} from "../../../src/opPool"; +import { + generateEmptyAggregateAndProof, + generateEmptyAttestation, + generateEmptySignedVoluntaryExit +} from "../../utils/attestation"; +import { + AttestationRepository, + AttesterSlashingRepository, + BlockRepository, + ProposerSlashingRepository, + StateRepository, + VoluntaryExitRepository +} from "../../../src/db/api/beacon/repositories"; +import { + generateEmptyAttesterSlashing, + generateEmptyProposerSlashing +} from "@chainsafe/eth2.0-state-transition/test/utils/slashings"; +import {describe, it, beforeEach, afterEach} from "mocha"; + describe("syncing", function () { - let sandbox = sinon.createSandbox(); + const sandbox = sinon.createSandbox(); let regularSync: RegularSync; let chainStub: any, networkStub: any, dbStub: any, opPoolStub: any, logger: any, isValidIndexedAttestationStub: any, - isValidVoluntaryExitStub: any, isValidProposerSlashingStub: any, isValidAttesterSlashingStub: any; + isValidVoluntaryExitStub: any, isValidProposerSlashingStub: any, isValidAttesterSlashingStub: any; beforeEach(() => { isValidIndexedAttestationStub = sandbox.stub(attestationUtils, "isValidIndexedAttestation"); @@ -61,8 +81,8 @@ describe("syncing", function () { }); - it('should able to receive block', async function () { - let block = generateEmptySignedBlock(); + it("should able to receive block", async function () { + const block = generateEmptySignedBlock(); dbStub.block.has.resolves(false); chainStub.receiveBlock.resolves(0); try { @@ -74,8 +94,8 @@ describe("syncing", function () { }); - it('should receive attestation', async function () { - let attestation = generateEmptyAttestation(); + it("should receive attestation", async function () { + const attestation = generateEmptyAttestation(); opPoolStub.attestations = new AttestationOperations(dbStub.attestation, {config}); try { await regularSync.receiveAttestation(attestation); @@ -85,38 +105,26 @@ describe("syncing", function () { } }); - - it('should receive committee attestation', async function () { - let attestation = generateEmptyAttestation(); - attestation.aggregationBits.setBit(0, true); - let state = generateState(); - dbStub.state.getLatest.resolves(state); - isValidIndexedAttestationStub.returns(true); - opPoolStub.attestations = new AttestationOperations(dbStub.attestation, {config}); - await regularSync.receiveCommitteeAttestation({attestation, subnet: 0}); - expect(chainStub.receiveAttestation.calledOnceWith(attestation)).to.be.true; - }); - - it('should receive aggregate and proof', async function() { - let aggregateProof = generateEmptyAggregateAndProof(); + it("should receive aggregate and proof", async function() { + const aggregateProof = generateEmptyAggregateAndProof(); await regularSync.receiveAggregateAndProof(aggregateProof); expect(opPoolStub.aggregateAndProofs.receive.calledOnceWith(aggregateProof)).to.be.equal(true); }); - it('should receive Voluntary Exit', async function() { - let voluntaryExit = generateEmptySignedVoluntaryExit(); + it("should receive Voluntary Exit", async function() { + const voluntaryExit = generateEmptySignedVoluntaryExit(); await regularSync.receiveVoluntaryExit(voluntaryExit); expect(opPoolStub.voluntaryExits.receive.calledOnceWith(voluntaryExit)).to.be.equal(true); }); - it('should receive Proposer Slashing', async function() { - let slashing = generateEmptyProposerSlashing(); + it("should receive Proposer Slashing", async function() { + const slashing = generateEmptyProposerSlashing(); await regularSync.receiveProposerSlashing(slashing); expect(opPoolStub.proposerSlashings.receive.calledOnceWith(slashing)).to.be.equal(true); }); - it('should receive Attester Slashing', async function() { - let slashing = generateEmptyAttesterSlashing(); + it("should receive Attester Slashing", async function() { + const slashing = generateEmptyAttesterSlashing(); await regularSync.receiveAttesterSlashing(slashing); expect(opPoolStub.attesterSlashings.receive.calledOnceWith(slashing)).to.be.equal(true); }); From 2683d40ce9b3e8dd63ddf9f68ea7ccd6504aff3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Mon, 17 Feb 2020 14:58:23 +0100 Subject: [PATCH 17/24] fix tests --- .../lodestar/src/network/gossip/interface.ts | 2 +- .../lodestar/test/e2e/network/network.test.ts | 22 +++++++++---------- .../lodestar/test/utils/mocks/chain/chain.ts | 2 ++ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/packages/lodestar/src/network/gossip/interface.ts b/packages/lodestar/src/network/gossip/interface.ts index 8c09c21755c..de2c54cc8c7 100644 --- a/packages/lodestar/src/network/gossip/interface.ts +++ b/packages/lodestar/src/network/gossip/interface.ts @@ -45,7 +45,7 @@ export interface IGossipSub extends EventEmitter { unsubscribe(topic: string): void; } -export interface IGossip extends IService { +export interface IGossip extends IService, GossipEventEmitter { publishBlock(signedBlock: SignedBeaconBlock): Promise; publishCommiteeAttestation(attestation: Attestation): Promise; publishAggregatedAttestation(aggregateAndProof: AggregateAndProof): Promise; diff --git a/packages/lodestar/test/e2e/network/network.test.ts b/packages/lodestar/test/e2e/network/network.test.ts index 291cf5ea356..bed3ffa9abe 100644 --- a/packages/lodestar/test/e2e/network/network.test.ts +++ b/packages/lodestar/test/e2e/network/network.test.ts @@ -3,7 +3,7 @@ import {afterEach, beforeEach, describe, it} from "mocha"; import {config} from "@chainsafe/eth2.0-config/lib/presets/mainnet"; import {Libp2pNetwork} from "../../../src/network"; import {createNode} from "../../unit/network/util"; -import {generateEmptyAttestation} from "../../utils/attestation"; +import {generateEmptyAggregateAndProof, generateEmptyAttestation} from "../../utils/attestation"; import {generateEmptySignedBlock} from "../../utils/block"; import {ILogger, WinstonLogger} from "@chainsafe/eth2.0-utils/lib/logger"; import {INetworkOptions} from "../../../src/network/options"; @@ -86,14 +86,14 @@ describe("[network] network", function () { await connected; const received = new Promise((resolve, reject) => { setTimeout(reject, 4000); - netA.gossip.on(GossipEvent.BLOCK, resolve); + netA.gossip.subscribeToBlock(resolve); }); await new Promise((resolve) => netB.gossip.once("gossipsub:heartbeat", resolve)); validator.isValidIncomingBlock.resolves(true); netB.gossip.publishBlock(generateEmptySignedBlock()); await received; }); - it("should receive attestations on subscription", async function () { + it("should receive aggregate on subscription", async function () { const connected = Promise.all([ new Promise((resolve) => netA.on("peer:connect", resolve)), new Promise((resolve) => netB.on("peer:connect", resolve)), @@ -102,15 +102,14 @@ describe("[network] network", function () { await connected; const received = new Promise((resolve, reject) => { setTimeout(reject, 4000); - netA.gossip.on(GossipEvent.ATTESTATION, resolve); + netA.gossip.subscribeToAttestation(resolve); }); await new Promise((resolve) => netB.gossip.once("gossipsub:heartbeat", resolve)); validator.isValidIncomingUnaggregatedAttestation.resolves(true); - netB.gossip.publishCommiteeAttestation(generateEmptyAttestation()); + await netB.gossip.publishAggregatedAttestation(generateEmptyAggregateAndProof()); await received; }); it("should receive shard attestations on subscription", async function () { - const committeeIndex = 10; const connected = Promise.all([ new Promise((resolve) => netA.on("peer:connect", resolve)), new Promise((resolve) => netB.on("peer:connect", resolve)), @@ -119,14 +118,13 @@ describe("[network] network", function () { await connected; const received = new Promise((resolve, reject) => { setTimeout(reject, 4000); - // @ts-ignore - netA.gossip.on(GossipEvent.ATTESTATION, resolve); + netA.gossip.subscribeToAttestationSubnet(0, resolve); }); await new Promise((resolve) => netB.gossip.once("gossipsub:heartbeat", resolve)); const attestation = generateEmptyAttestation(); - attestation.data.index = committeeIndex; - validator.isValidIncomingUnaggregatedAttestation.resolves(true); - netB.gossip.publishCommiteeAttestation(attestation); + attestation.data.index = 0; + validator.isValidIncomingCommitteeAttestation.resolves(true); + await netB.gossip.publishCommiteeAttestation(attestation); await received; }); -}); +}); \ No newline at end of file diff --git a/packages/lodestar/test/utils/mocks/chain/chain.ts b/packages/lodestar/test/utils/mocks/chain/chain.ts index cc494cb98bc..732468b605d 100644 --- a/packages/lodestar/test/utils/mocks/chain/chain.ts +++ b/packages/lodestar/test/utils/mocks/chain/chain.ts @@ -14,11 +14,13 @@ import { import {IBeaconChain, ILMDGHOST} from "../../../../src/chain"; import {generateState} from "../../state"; import {ProgressiveMerkleTree} from "@chainsafe/eth2.0-utils"; +import {IBeaconClock} from "../../../../src/chain/clock/interface"; export class MockBeaconChain extends EventEmitter implements IBeaconChain { public latestState: BeaconState; public forkChoice: ILMDGHOST; public chainId: uint16; + public clock: IBeaconClock; public networkId: uint64; public constructor({genesisTime, chainId, networkId}) { From a3d9848e6da76451b2a92615e6b35aa0add9d541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Tue, 18 Feb 2020 18:03:58 +0100 Subject: [PATCH 18/24] address Pr comment --- .../src/chain/clock/local/LocalClock.ts | 2 +- .../test/unit/chain/clock/local.test.ts | 42 +++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/packages/lodestar/src/chain/clock/local/LocalClock.ts b/packages/lodestar/src/chain/clock/local/LocalClock.ts index 96541728ecb..4132481a5da 100644 --- a/packages/lodestar/src/chain/clock/local/LocalClock.ts +++ b/packages/lodestar/src/chain/clock/local/LocalClock.ts @@ -64,7 +64,7 @@ export class LocalClock extends EventEmitter implements IBeaconClock { this.emit("epoch", currentEpoch); } //recursively invoke update slot - setTimeout( + this.timeoutId = setTimeout( this.updateSlot, this.getDiffTillNextSlot() ); diff --git a/packages/lodestar/test/unit/chain/clock/local.test.ts b/packages/lodestar/test/unit/chain/clock/local.test.ts index 1e6b66964df..eee79ce28db 100644 --- a/packages/lodestar/test/unit/chain/clock/local.test.ts +++ b/packages/lodestar/test/unit/chain/clock/local.test.ts @@ -6,28 +6,28 @@ import {expect} from "chai"; describe("LocalClock", function() { - const sandbox = sinon.createSandbox(); + const sandbox = sinon.createSandbox(); - it("Should notify on new slot", async function () { - const realClock = sandbox.useFakeTimers(); - const clock = new LocalClock(config, Math.round(new Date().getTime() / 1000)); - const spy = sinon.spy(); - clock.onNewSlot(spy); - await clock.start(); - realClock.tick(config.params.SECONDS_PER_SLOT * 1000); - expect(spy.calledOnce).to.be.true; - await clock.stop(); - }); + it("Should notify on new slot", async function () { + const realClock = sandbox.useFakeTimers(); + const clock = new LocalClock(config, Math.round(new Date().getTime() / 1000)); + const spy = sinon.spy(); + clock.onNewSlot(spy); + await clock.start(); + realClock.tick(config.params.SECONDS_PER_SLOT * 1000); + expect(spy.calledOnce).to.be.true; + await clock.stop(); + }); - it("Should notify on new epoch", async function () { - const realClock = sandbox.useFakeTimers(); - const clock = new LocalClock(config, Math.round(new Date().getTime() / 1000)); - const spy = sinon.spy(); - clock.onNewEpoch(spy); - await clock.start(); - realClock.tick(config.params.SLOTS_PER_EPOCH * config.params.SECONDS_PER_SLOT * 1000); - expect(spy.calledOnce).to.be.true; - await clock.stop(); - }) + it("Should notify on new epoch", async function () { + const realClock = sandbox.useFakeTimers(); + const clock = new LocalClock(config, Math.round(new Date().getTime() / 1000)); + const spy = sinon.spy(); + clock.onNewEpoch(spy); + await clock.start(); + realClock.tick(config.params.SLOTS_PER_EPOCH * config.params.SECONDS_PER_SLOT * 1000); + expect(spy.calledOnce).to.be.true; + await clock.stop(); + }); }); \ No newline at end of file From 56b0fb4dd368579a96800b39bdc83d8035fe8fa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 19 Feb 2020 11:43:06 +0100 Subject: [PATCH 19/24] address Pr comment --- packages/lodestar/src/chain/clock/local/LocalClock.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/lodestar/src/chain/clock/local/LocalClock.ts b/packages/lodestar/src/chain/clock/local/LocalClock.ts index 4132481a5da..b7f0a973bd9 100644 --- a/packages/lodestar/src/chain/clock/local/LocalClock.ts +++ b/packages/lodestar/src/chain/clock/local/LocalClock.ts @@ -22,7 +22,7 @@ export class LocalClock extends EventEmitter implements IBeaconClock { public async start(): Promise { this.isRunning = true; const diffTillNextSlot = this.getDiffTillNextSlot(); - setTimeout( + this.timeoutId = setTimeout( this.updateSlot, diffTillNextSlot ); From 40dacaefb34882f4662a768f83cc553cef9ad9d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Wed, 19 Feb 2020 14:25:41 +0100 Subject: [PATCH 20/24] fix types --- .../lodestar/src/network/gossip/gossip.ts | 28 +++++++++++++------ .../lodestar/src/network/gossip/gossipsub.ts | 6 ++-- packages/lodestar/src/network/gossip/utils.ts | 1 + yarn.lock | 14 ---------- 4 files changed, 23 insertions(+), 26 deletions(-) diff --git a/packages/lodestar/src/network/gossip/gossip.ts b/packages/lodestar/src/network/gossip/gossip.ts index 7a4e18951cf..9550d292ed1 100644 --- a/packages/lodestar/src/network/gossip/gossip.ts +++ b/packages/lodestar/src/network/gossip/gossip.ts @@ -11,17 +11,27 @@ import {ATTESTATION_SUBNET_COUNT} from "../../constants"; import {ILogger, LogLevel} from "@chainsafe/eth2.0-utils/lib/logger"; import {getGossipTopic,} from "./utils"; import {INetworkOptions} from "../options"; -import {GossipEventEmitter, IGossip, IGossipEvents, IGossipSub, IGossipModules, - GossipObject} from "./interface"; +import {GossipEventEmitter, GossipObject, IGossip, IGossipEvents, IGossipModules, IGossipSub} from "./interface"; import {GossipEvent} from "./constants"; -import {publishBlock, handleIncomingBlock} from "./handlers/block"; -import {publishCommiteeAttestation, getCommitteeAttestationHandler, handleIncomingAttestation} - from "./handlers/attestation"; -import {publishAttesterSlashing, handleIncomingAttesterSlashing} from "./handlers/attesterSlashing"; -import {publishProposerSlashing, handleIncomingProposerSlashing} from "./handlers/proposerSlashing"; -import {publishVoluntaryExit, handleIncomingVoluntaryExit} from "./handlers/voluntaryExit"; -import {publishAggregatedAttestation, handleIncomingAggregateAndProof} from "./handlers/aggregateAndProof"; +import {handleIncomingBlock, publishBlock} from "./handlers/block"; +import { + getCommitteeAttestationHandler, + handleIncomingAttestation, + publishCommiteeAttestation +} from "./handlers/attestation"; +import {handleIncomingAttesterSlashing, publishAttesterSlashing} from "./handlers/attesterSlashing"; +import {handleIncomingProposerSlashing, publishProposerSlashing} from "./handlers/proposerSlashing"; +import {handleIncomingVoluntaryExit, publishVoluntaryExit} from "./handlers/voluntaryExit"; +import {handleIncomingAggregateAndProof, publishAggregatedAttestation} from "./handlers/aggregateAndProof"; import {LodestarGossipsub} from "./gossipsub"; +import { + AggregateAndProof, + Attestation, + AttesterSlashing, + ProposerSlashing, + SignedBeaconBlock, + SignedVoluntaryExit +} from "@chainsafe/eth2.0-types"; export type GossipHandlerFn = (this: Gossip, obj: GossipObject ) => void; diff --git a/packages/lodestar/src/network/gossip/gossipsub.ts b/packages/lodestar/src/network/gossip/gossipsub.ts index 6c185d8f577..012d0d098d6 100644 --- a/packages/lodestar/src/network/gossip/gossipsub.ts +++ b/packages/lodestar/src/network/gossip/gossipsub.ts @@ -1,6 +1,6 @@ -import Gossipsub, {IGossipMessage, Registrar, Options} from "libp2p-gossipsub"; -import {IGossipMessageValidator, GossipObject, GossipMessageValidatorFn} from "./interface"; -import {getGossipTopic, isAttestationSubnetTopic, getSubnetFromAttestationSubnetTopic} from "./utils"; +import Gossipsub, {IGossipMessage, Options, Registrar} from "libp2p-gossipsub"; +import {GossipMessageValidatorFn, GossipObject, IGossipMessageValidator} from "./interface"; +import {getGossipTopic, getSubnetFromAttestationSubnetTopic, isAttestationSubnetTopic} from "./utils"; import {GossipEvent} from "./constants"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {utils} from "libp2p-pubsub"; diff --git a/packages/lodestar/src/network/gossip/utils.ts b/packages/lodestar/src/network/gossip/utils.ts index d1506cc6a91..31701fb4172 100644 --- a/packages/lodestar/src/network/gossip/utils.ts +++ b/packages/lodestar/src/network/gossip/utils.ts @@ -6,6 +6,7 @@ import {ATTESTATION_SUBNET_COUNT} from "../../constants"; import {Attestation} from "@chainsafe/eth2.0-types"; import {GossipEvent, AttestationSubnetRegExp} from "./constants"; import assert from "assert"; +import {CommitteeIndex} from "@chainsafe/eth2.0-types/lib"; export function getGossipTopic(event: GossipEvent, encoding = "ssz", params: Map = new Map()): string { let topic = `${event}/${encoding}`; diff --git a/yarn.lock b/yarn.lock index e7bff81a42e..eec8103ec00 100644 --- a/yarn.lock +++ b/yarn.lock @@ -923,20 +923,6 @@ resolved "https://registry.yarnpkg.com/@chainsafe/eth2-spec-tests/-/eth2-spec-tests-0.9.4.tgz#2a250eee523c31c1662eaf9d8def1a6697296fdd" integrity sha512-yA+zvS9Qbxz+Yd6mV+0T2mqGtAs3tvIRIvkLnQb1Yr9+r0mgr1CWmh7u50pJNYMLaElI3oc1MhPB+VPuKyBRMQ== -"@chainsafe/eth2.0-spec-test-util@0.4.2": - version "0.4.2" - resolved "https://registry.yarnpkg.com/@chainsafe/eth2.0-spec-test-util/-/eth2.0-spec-test-util-0.4.2.tgz#69cb01613d5ce1d9dc21c3de95d8660103791215" - integrity sha512-Dve34BLry1ejEz0V13P297zUDmxxFRykBBFFcCFJE9eKjNST6DLdlc63O7NjuSniWoT7EbFVtPH95UsFlbZRdA== - dependencies: - "@chainsafe/eth2.0-utils" "^0.1.0" - "@chainsafe/ssz" "^0.5.2" - camelcase "^5.3.1" - chai "^4.2.0" - deepmerge "^4.0.0" - js-yaml "^3.13.1" - mocha "^6.2.0" - v8-profiler-next "^1.1.1" - "@evocateur/libnpmaccess@^3.1.2": version "3.1.2" resolved "https://registry.yarnpkg.com/@evocateur/libnpmaccess/-/libnpmaccess-3.1.2.tgz#ecf7f6ce6b004e9f942b098d92200be4a4b1c845" From 1ffecd6265b1e05aa15a7d047a1a6d0bdae2b1c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Thu, 20 Feb 2020 09:25:49 +0100 Subject: [PATCH 21/24] address PR comments --- packages/lodestar/src/sync/index.ts | 1 + packages/lodestar/src/tasks/index.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/lodestar/src/sync/index.ts b/packages/lodestar/src/sync/index.ts index 522ba4f974b..6eb9a069b2c 100644 --- a/packages/lodestar/src/sync/index.ts +++ b/packages/lodestar/src/sync/index.ts @@ -95,6 +95,7 @@ export class Sync extends EventEmitter { } private startRegularSync = (): void => { + this.emit("regularSyncStarted"); this.regularSync.start(); }; diff --git a/packages/lodestar/src/tasks/index.ts b/packages/lodestar/src/tasks/index.ts index f69cf20364e..82f9045ed38 100644 --- a/packages/lodestar/src/tasks/index.ts +++ b/packages/lodestar/src/tasks/index.ts @@ -58,7 +58,7 @@ export class TasksService implements IService { }; private handleRegularSyncStartedTasks = async (): Promise => { - new InteropSubnetsJoiningTask(this.config, {network: this.network}); + new InteropSubnetsJoiningTask(this.config, {network: this.network}).run(); }; } \ No newline at end of file From aaa1f1d98c208293badc28e51a590ae62a288919 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Thu, 20 Feb 2020 10:06:23 +0100 Subject: [PATCH 22/24] fix justification test --- packages/lodestar-cli/test/e2e/commands/dev.test.ts | 2 +- packages/lodestar-validator/src/api/abstract.ts | 2 +- packages/lodestar/src/api/impl/validator/duties/proposer.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/lodestar-cli/test/e2e/commands/dev.test.ts b/packages/lodestar-cli/test/e2e/commands/dev.test.ts index 7af922c0e4f..4921ea3c89f 100644 --- a/packages/lodestar-cli/test/e2e/commands/dev.test.ts +++ b/packages/lodestar-cli/test/e2e/commands/dev.test.ts @@ -76,7 +76,7 @@ describe("e2e interop simulation", function() { const libp2p = await createNodeJsLibp2p(peerId, {}); node = new BeaconNode(conf, {config: minimalConfig, logger, eth1: new InteropEth1Notifier(), libp2p}); - const genesisTime = Math.round(Date.now()/1000); + const genesisTime = Math.floor(Date.now()/1000); const tree = ProgressiveMerkleTree.empty(DEPOSIT_CONTRACT_TREE_DEPTH, new MerkleTreeSerialization(minimalConfig)); const state = quickStartState(minimalConfig, tree, genesisTime, VALIDATOR_COUNT); await node.chain.initializeBeaconChain(state, tree); diff --git a/packages/lodestar-validator/src/api/abstract.ts b/packages/lodestar-validator/src/api/abstract.ts index a1cb0a22d53..fb47216e203 100644 --- a/packages/lodestar-validator/src/api/abstract.ts +++ b/packages/lodestar-validator/src/api/abstract.ts @@ -67,7 +67,7 @@ export abstract class AbstractApiClient private async startSlotCounting(): Promise { this.running = true; const genesisTime = await this.beacon.getGenesisTime(); - const diffInSeconds = (Date.now() / 1000) - genesisTime; + const diffInSeconds = (Math.floor(Date.now() / 1000)) - genesisTime; this.currentSlot = getCurrentSlot(this.config, genesisTime); //update slot after remaining seconds until next slot const diffTillNextSlot = diff --git a/packages/lodestar/src/api/impl/validator/duties/proposer.ts b/packages/lodestar/src/api/impl/validator/duties/proposer.ts index a2a9592ab9e..5f20251e830 100644 --- a/packages/lodestar/src/api/impl/validator/duties/proposer.ts +++ b/packages/lodestar/src/api/impl/validator/duties/proposer.ts @@ -18,7 +18,7 @@ export async function getEpochProposers( ): Promise> { const block = await db.block.get(chain.forkChoice.head()); const state = await db.state.get(block.message.stateRoot); - assert(epoch <= computeEpochAtSlot(config, state.slot) + 2); + assert(epoch >= 0 && epoch <= computeEpochAtSlot(config, state.slot) + 2); const startSlot = computeStartSlotAtEpoch(config, epoch); if(state.slot < startSlot) { processSlots(config, state, startSlot); From 5bb041d6486260da5f9fa88bceb5af0c14b6e177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Thu, 20 Feb 2020 11:07:52 +0100 Subject: [PATCH 23/24] resolves post merge errors --- packages/eth2.0-types/src/ssz/generators/api.ts | 11 +++++++++++ packages/eth2.0-types/src/ssz/generators/index.ts | 2 ++ packages/eth2.0-types/src/ssz/interface.ts | 2 ++ packages/eth2.0-types/src/types/api.ts | 9 +++++++++ packages/eth2.0-types/src/types/index.ts | 1 + .../src/api/impl/rest/validator/validator.ts | 4 ++-- .../routes/validator/subscribeToCommitteeSubnet.ts | 7 +++---- .../lodestar/src/api/rpc/api/validator/validator.ts | 10 +++++----- .../src/db/api/beacon/repositories/blockArchive.ts | 8 +------- packages/lodestar/src/network/gossip/gossipsub.ts | 5 +---- 10 files changed, 37 insertions(+), 22 deletions(-) create mode 100644 packages/eth2.0-types/src/ssz/generators/api.ts create mode 100644 packages/eth2.0-types/src/types/api.ts diff --git a/packages/eth2.0-types/src/ssz/generators/api.ts b/packages/eth2.0-types/src/ssz/generators/api.ts new file mode 100644 index 00000000000..3bc3294ea55 --- /dev/null +++ b/packages/eth2.0-types/src/ssz/generators/api.ts @@ -0,0 +1,11 @@ +import {IBeaconSSZTypes} from "../interface"; +import {ContainerType} from "@chainsafe/ssz"; + +export const SubscribeToCommitteeSubnetPayload = (ssz: IBeaconSSZTypes): ContainerType => new ContainerType({ + fields: { + slot: ssz.Slot, + slotSignature: ssz.BLSSignature, + committeeIndex: ssz.CommitteeIndex, + aggregatorPubkey: ssz.BLSPubkey + }, +}); \ No newline at end of file diff --git a/packages/eth2.0-types/src/ssz/generators/index.ts b/packages/eth2.0-types/src/ssz/generators/index.ts index c019dc30a84..fe38a19e9c2 100644 --- a/packages/eth2.0-types/src/ssz/generators/index.ts +++ b/packages/eth2.0-types/src/ssz/generators/index.ts @@ -10,6 +10,7 @@ import * as block from "./block"; import * as state from "./state"; import * as validator from "./validator"; import * as wire from "./wire"; +import * as api from "./api"; import {IBeaconSSZTypes, typeNames} from "../interface"; @@ -20,6 +21,7 @@ const allGenerators = { ...state, ...validator, ...wire, + ...api, }; export function createIBeaconSSZTypes(params: IBeaconParams): IBeaconSSZTypes { diff --git a/packages/eth2.0-types/src/ssz/interface.ts b/packages/eth2.0-types/src/ssz/interface.ts index cc9836f2360..868492ddb85 100644 --- a/packages/eth2.0-types/src/ssz/interface.ts +++ b/packages/eth2.0-types/src/ssz/interface.ts @@ -73,6 +73,8 @@ export interface IBeaconSSZTypes { BeaconBlocksByRangeResponse: ContainerType; BeaconBlocksByRootRequest: ContainerType; BeaconBlocksByRootResponse: ContainerType; + //api + SubscribeToCommitteeSubnetPayload: ContainerType; } export const typeNames: (keyof IBeaconSSZTypes)[] = [ diff --git a/packages/eth2.0-types/src/types/api.ts b/packages/eth2.0-types/src/types/api.ts new file mode 100644 index 00000000000..916cfad205f --- /dev/null +++ b/packages/eth2.0-types/src/types/api.ts @@ -0,0 +1,9 @@ +/* eslint-disable @typescript-eslint/interface-name-prefix */ +import {BLSPubkey, BLSSignature, CommitteeIndex, Slot} from "./primitive"; + +export interface SubscribeToCommitteeSubnetPayload { + slot: Slot; + slotSignature: BLSSignature; + committeeIndex: CommitteeIndex; + aggregatorPubkey: BLSPubkey; +} \ No newline at end of file diff --git a/packages/eth2.0-types/src/types/index.ts b/packages/eth2.0-types/src/types/index.ts index 995b65faaea..682a875ce67 100644 --- a/packages/eth2.0-types/src/types/index.ts +++ b/packages/eth2.0-types/src/types/index.ts @@ -9,3 +9,4 @@ export * from "./block"; export * from "./state"; export * from "./validator"; export * from "./wire"; +export * from "./api"; diff --git a/packages/lodestar-validator/src/api/impl/rest/validator/validator.ts b/packages/lodestar-validator/src/api/impl/rest/validator/validator.ts index b3188d7dced..99d9a1722d9 100644 --- a/packages/lodestar-validator/src/api/impl/rest/validator/validator.ts +++ b/packages/lodestar-validator/src/api/impl/rest/validator/validator.ts @@ -41,7 +41,7 @@ export class RestValidatorApi implements IValidatorApi { const hexPubKeys = validatorPubKeys.map(toHexString); const url = `/duties/${epoch.toString()}/attester?validator_pubkeys=${JSON.stringify(hexPubKeys)}`; const responseData = await this.client.get(url); - return responseData.map(value =>this.config.types.ValidatorDuty.fromJson(value)); + return responseData.map(value => this.config.types.ValidatorDuty.fromJson(value)); } public async publishAggregatedAttestation( @@ -99,7 +99,7 @@ export class RestValidatorApi implements IValidatorApi { ): Promise { return this.client.post( "/beacon_committee_subscription", - toJson({ + this.config.types.SubscribeToCommitteeSubnetPayload.toJson({ slot, slotSignature, committeeIndex, diff --git a/packages/lodestar/src/api/rest/routes/validator/subscribeToCommitteeSubnet.ts b/packages/lodestar/src/api/rest/routes/validator/subscribeToCommitteeSubnet.ts index 213744c819c..99b1f2fc4c3 100644 --- a/packages/lodestar/src/api/rest/routes/validator/subscribeToCommitteeSubnet.ts +++ b/packages/lodestar/src/api/rest/routes/validator/subscribeToCommitteeSubnet.ts @@ -3,9 +3,8 @@ import fastify, {DefaultBody, DefaultHeaders, DefaultParams, DefaultQuery} from import {IApiModules} from "../../../interface"; import {verify} from "@chainsafe/bls"; import {hexToBuffer} from "../../../../util/hex"; -import {hashTreeRoot} from "@chainsafe/ssz"; -import {getDomain} from "@chainsafe/eth2.0-state-transition"; -import {computeEpochAtSlot, DomainType} from "@chainsafe/lodestar-validator/lib/util"; +import {computeEpochAtSlot, getDomain} from "@chainsafe/eth2.0-state-transition"; +import {DomainType} from "../../../../constants"; const opts: fastify.RouteShorthandOptions = { @@ -44,7 +43,7 @@ export const registerSubscribeToCommitteeSubnet = (fastify: IFastifyServer, modu const slot = Number(request.body.slot); const valid = verify( hexToBuffer(request.body.aggregator_pubkey), - hashTreeRoot(modules.config.types.Slot, slot), + modules.config.types.Slot.hashTreeRoot(slot), hexToBuffer(request.body.slot_signature), getDomain( modules.config, diff --git a/packages/lodestar/src/api/rpc/api/validator/validator.ts b/packages/lodestar/src/api/rpc/api/validator/validator.ts index 2110f318685..b466e977589 100644 --- a/packages/lodestar/src/api/rpc/api/validator/validator.ts +++ b/packages/lodestar/src/api/rpc/api/validator/validator.ts @@ -30,9 +30,9 @@ import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; import {INetwork} from "../../../../network"; import {getDomain, isAggregator} from "@chainsafe/eth2.0-state-transition"; import {verify} from "@chainsafe/bls"; -import {hashTreeRoot} from "@chainsafe/ssz"; -import {computeEpochAtSlot, DomainType} from "@chainsafe/lodestar-validator/lib/util"; import {Sync} from "../../../../sync"; +import {DomainType} from "../../../../constants"; +import {computeEpochAtSlot} from "@chainsafe/eth2.0-state-transition/lib"; export class ValidatorApi implements IValidatorApi { @@ -128,9 +128,9 @@ export class ValidatorApi implements IValidatorApi { slot: Slot, slotSignature: BLSSignature, committeeIndex: CommitteeIndex, aggregatorPubkey: BLSPubkey ): Promise { const valid = verify( - aggregatorPubkey, - hashTreeRoot(this.config.types.Slot, slot), - slotSignature, + aggregatorPubkey.valueOf() as Uint8Array, + this.config.types.Slot.hashTreeRoot(slot), + slotSignature.valueOf() as Uint8Array, getDomain( this.config, this.chain.latestState, diff --git a/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts b/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts index 32e9491d7bd..bf03f9b74c0 100644 --- a/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts +++ b/packages/lodestar/src/db/api/beacon/repositories/blockArchive.ts @@ -1,4 +1,4 @@ -import {Root, SignedBeaconBlock, Slot} from "@chainsafe/eth2.0-types"; +import {SignedBeaconBlock, Slot} from "@chainsafe/eth2.0-types"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {BulkRepository} from "../repository"; import {IDatabaseController} from "../../../controller"; @@ -16,12 +16,6 @@ export class BlockArchiveRepository extends BulkRepository { super(config, db, Bucket.blockArchive, config.types.SignedBeaconBlock); } - public async getByRoot(root: Root): Promise { - return this.get( - await this.db.get(encodeKey(Bucket.blockRootRefs, root)) - ); - } - public async addMany(blocks: SignedBeaconBlock[]): Promise { await this.db.batchPut( blocks.map((block) => ({ diff --git a/packages/lodestar/src/network/gossip/gossipsub.ts b/packages/lodestar/src/network/gossip/gossipsub.ts index b7dca03901e..5327b7087c6 100644 --- a/packages/lodestar/src/network/gossip/gossipsub.ts +++ b/packages/lodestar/src/network/gossip/gossipsub.ts @@ -1,13 +1,10 @@ import assert from "assert"; import {utils} from "libp2p-pubsub"; -import Gossipsub, {IGossipMessage, Registrar, Options} from "libp2p-gossipsub"; +import Gossipsub, {IGossipMessage, Options, Registrar} from "libp2p-gossipsub"; import {Type} from "@chainsafe/ssz"; import {IBeaconConfig} from "@chainsafe/eth2.0-config"; import {ILogger} from "@chainsafe/eth2.0-utils/lib/logger"; -import {IGossipMessageValidator, GossipObject, GossipMessageValidatorFn} from "./interface"; -import {getGossipTopic, isAttestationSubnetTopic, getSubnetFromAttestationSubnetTopic} from "./utils"; -import Gossipsub, {IGossipMessage, Options, Registrar} from "libp2p-gossipsub"; import {GossipMessageValidatorFn, GossipObject, IGossipMessageValidator} from "./interface"; import {getGossipTopic, getSubnetFromAttestationSubnetTopic, isAttestationSubnetTopic} from "./utils"; import {GossipEvent} from "./constants"; From 1a09fad5461af6f3ec6e98346f37b02dcc9e8be6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marin=20Petruni=C4=87?= Date: Mon, 24 Feb 2020 09:08:46 +0100 Subject: [PATCH 24/24] fix e2e tests --- packages/lodestar/test/e2e/network/network.test.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/lodestar/test/e2e/network/network.test.ts b/packages/lodestar/test/e2e/network/network.test.ts index 308694aedac..cf69e0fe52b 100644 --- a/packages/lodestar/test/e2e/network/network.test.ts +++ b/packages/lodestar/test/e2e/network/network.test.ts @@ -85,22 +85,21 @@ describe("[network] network", function () { ]); await netA.connect(netB.peerInfo); await connected; - let count = 0; + const spy = sinon.spy(); const received = new Promise((resolve, reject) => { setTimeout(reject, 4000); - netA.gossip.on(GossipEvent.BLOCK, () => { - count ++; - }); - setTimeout(() => resolve(count), 1000); + netA.gossip.subscribeToBlock(spy); + setTimeout(resolve, 1000); }); await new Promise((resolve) => netB.gossip.once("gossipsub:heartbeat", resolve)); validator.isValidIncomingBlock.resolves(true); const block = generateEmptySignedBlock(); block.message.slot = 2020; for (let i = 0; i < 5; i++) { - netB.gossip.publishBlock(block); + await netB.gossip.publishBlock(block); } - expect(await received).to.be.equal(1); + await received; + expect(spy.callCount).to.be.equal(1); }); it("should receive blocks on subscription", async function () { const connected = Promise.all([