Skip to content

Commit

Permalink
serve libp2p protocol for light client sync
Browse files Browse the repository at this point in the history
This extends the `--serve-light-client-data` launch option to serve
locally collected light client data via libp2p.
See ethereum/consensus-specs#2802

Backfill of historic best `LightClientUpdate` data is not implemented.

To test, in `conf.nim` change `serveLightClientData`'s `defaultValue` to
`true`, then run this command:
```
scripts/launch_local_testnet.sh --kill-old-processes --preset minimal \
    --nodes 4 --disable-htop --stop-at-epoch 7
```
The log files of the beacon nodes will be in the `local_testnet_data`
directory. They are named `log0.txt` through `log3.txt`. The logs can be
browsed for light client related messages.
  • Loading branch information
etan-status committed Jan 27, 2022
1 parent 37d344a commit 8b340dd
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 12 deletions.
24 changes: 24 additions & 0 deletions beacon_chain/gossip_processing/eth2_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ declareCounter beacon_sync_committee_contributions_received,
"Number of valid sync committee contributions processed by this node"
declareCounter beacon_sync_committee_contributions_dropped,
"Number of invalid sync committee contributions dropped by this node", labels = ["reason"]
declareCounter beacon_optimistic_light_client_updates_received,
"Number of valid optimistic light client updates processed by this node"
declareCounter beacon_optimistic_light_client_updates_dropped,
"Number of invalid optimistic light client updates dropped by this node", labels = ["reason"]

const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf]

Expand Down Expand Up @@ -528,3 +532,23 @@ proc contributionValidator*(
beacon_sync_committee_contributions_dropped.inc(1, [$v.error[0]])

err(v.error())

proc optimisticLightClientUpdateValidator*(
self: var Eth2Processor, src: MsgSource,
optimistic_update: altair.OptimisticLightClientUpdate
): Result[void, ValidationError] =
logScope:
optimisticUpdate = shortLog(optimistic_update)

debug "Optimistic light client update received"

let v = self.dag.validateOptimisticLightClientUpdate(optimistic_update)
if v.isOk():
trace "Optimistic light client update validated"

beacon_optimistic_light_client_updates_received.inc()
else:
debug "Dropping optimistic light client update", error = v.error
beacon_optimistic_light_client_updates_dropped.inc(1, [$v.error[0]])

v
17 changes: 17 additions & 0 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -984,3 +984,20 @@ proc validateContribution*(
sig.get()

return ok((sig, participants))

# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#optimistic_light_client_update
proc validateOptimisticLightClientUpdate*(
dag: ChainDAGRef, optimistic_update: altair.OptimisticLightClientUpdate):
Result[void, ValidationError] =
template latest_local_update(): auto = dag.optimisticLightClientUpdate

if optimistic_update != latest_local_update:
# [IGNORE] The optimistic update is not attesting to the latest block's
# parent block.
if optimistic_update.attested_header != latest_local_update.attested_header:
return errIgnore("OptimisticLightClientUpdate: different attested block")

# [REJECT] The optimistic update does not match the expected value.
return errReject("OptimisticLightClientUpdate: update does not match block")

ok()
14 changes: 14 additions & 0 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2225,3 +2225,17 @@ proc broadcastSignedContributionAndProof*(
node: Eth2Node, msg: SignedContributionAndProof) =
let topic = getSyncCommitteeContributionAndProofTopic(node.forkDigests.altair)
node.broadcast(topic, msg)

proc broadcastOptimisticLightClientUpdate*(
node: Eth2Node, msg: altair.OptimisticLightClientUpdate) =
let
forkDigest =
if msg.fork_version == node.cfg.SHARDING_FORK_VERSION:
node.forkDigests.sharding
elif msg.fork_version == node.cfg.BELLATRIX_FORK_VERSION:
node.forkDigests.bellatrix
else:
doAssert msg.fork_version == node.cfg.ALTAIR_FORK_VERSION
node.forkDigests.altair
topic = getOptimisticLightClientUpdateTopic(forkDigest)
node.broadcast(topic, msg)
19 changes: 19 additions & 0 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,10 @@ proc addAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest, slot: Sl
getSyncCommitteeContributionAndProofTopic(forkDigest), basicParams)
node.network.updateSyncnetsMetadata(syncnets)

if node.config.serveLightClientData:
node.network.subscribe(
getOptimisticLightClientUpdateTopic(forkDigest), basicParams)

proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.removePhase0MessageHandlers(forkDigest)

Expand All @@ -742,6 +746,9 @@ proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) =
node.network.unsubscribe(
getSyncCommitteeContributionAndProofTopic(forkDigest))

if node.config.serveLightClientData:
node.network.unsubscribe(getOptimisticLightClientUpdateTopic(forkDigest))

proc trackSyncCommitteeTopics*(node: BeaconNode) =
# TODO
discard
Expand Down Expand Up @@ -1149,6 +1156,18 @@ proc installMessageValidators(node: BeaconNode) =
installSyncCommitteeeValidators(node.dag.forkDigests.altair)
installSyncCommitteeeValidators(node.dag.forkDigests.bellatrix)

if node.config.serveLightClientData:
template installOptimisticLightClientUpdateValidator(digest: auto) =
node.network.addValidator(
getOptimisticLightClientUpdateTopic(digest),
proc(msg: altair.OptimisticLightClientUpdate): ValidationResult =
toValidationResult(
node.processor[].optimisticLightClientUpdateValidator(
MsgSource.gossip, msg)))

installOptimisticLightClientUpdateValidator(node.dag.forkDigests.altair)
installOptimisticLightClientUpdateValidator(node.dag.forkDigests.bellatrix)

proc stop(node: BeaconNode) =
bnStatus = BeaconNodeStatus.Stopping
notice "Graceful shutdown"
Expand Down
7 changes: 6 additions & 1 deletion beacon_chain/spec/beacon_time.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -146,6 +146,9 @@ const
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/validator.md#broadcast-sync-committee-contribution
syncContributionSlotOffset* = TimeDiff(nanoseconds:
NANOSECONDS_PER_SLOT.int64 * 2 div INTERVALS_PER_SLOT)
# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#block-proposal
optimisticLightClientUpdateSlotOffset* = TimeDiff(nanoseconds:
NANOSECONDS_PER_SLOT.int64 div INTERVALS_PER_SLOT)

func toFloatSeconds*(t: TimeDiff): float =
float(t.nanoseconds) / 1_000_000_000.0
Expand All @@ -167,6 +170,8 @@ func sync_committee_message_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + syncCommitteeMessageSlotOffset
func sync_contribution_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + syncContributionSlotOffset
func optimistic_light_client_update_deadline*(s: Slot): BeaconTime =
s.start_beacon_time + optimisticLightClientUpdateSlotOffset

func slotOrZero*(time: BeaconTime): Slot =
let exSlot = time.toSlot
Expand Down
17 changes: 16 additions & 1 deletion beacon_chain/spec/datatypes/altair.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import
std/[typetraits, sets, hashes],
chronicles,
stew/[assign2, bitops2],
stew/[assign2, bitops2, objects],
"."/[base, phase0]

export base, sets
Expand Down Expand Up @@ -601,9 +601,24 @@ chronicles.formatIt SyncCommitteeContribution: shortLog(it)
chronicles.formatIt ContributionAndProof: shortLog(it)
chronicles.formatIt SignedContributionAndProof: shortLog(it)

func shortLog*(v: LightClientUpdate): auto =
(
attested_header: shortLog(v.attested_header),
finalized_header: shortLog(v.finalized_header),
num_active_participants: countOnes(v.sync_aggregate.sync_committee_bits),
is_signed_by_next: v.next_sync_committee.isZeroMemory
)

template hash*(x: LightClientUpdate): Hash =
hash(x.header)

func shortLog*(v: OptimisticLightClientUpdate): auto =
(
attested_header: shortLog(v.attested_header),
num_active_participants: countOnes(v.sync_aggregate.sync_committee_bits),
is_signed_by_next: v.is_signed_by_next_sync_committee
)

func clear*(info: var EpochInfo) =
info.validators.setLen(0)
info.balances = UnslashedParticipatingBalances()
Expand Down
5 changes: 5 additions & 0 deletions beacon_chain/spec/network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func getSyncCommitteeContributionAndProofTopic*(forkDigest: ForkDigest): string
## For subscribing and unsubscribing to/from a subnet.
eth2Prefix(forkDigest) & "sync_committee_contribution_and_proof/ssz_snappy"

# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/altair/sync-protocol.md#optimistic_light_client_update
func getOptimisticLightClientUpdateTopic*(forkDigest: ForkDigest): string =
## For broadcasting the latest `OptimisticLightClientUpdate` to light clients.
eth2Prefix(forkDigest) & "optimistic_light_client_update/ssz_snappy"

func getENRForkID*(cfg: RuntimeConfig,
epoch: Epoch,
genesis_validators_root: Eth2Digest): ENRForkID =
Expand Down
7 changes: 5 additions & 2 deletions beacon_chain/spec/ssz_codec.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Copyright (c) 2018-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand All @@ -17,7 +17,7 @@ export codec, base, typetraits

# Coding and decoding of SSZ to spec-specific types

template toSszType*(v: Slot|Epoch): auto = uint64(v)
template toSszType*(v: Slot|Epoch|SyncCommitteePeriod): auto = uint64(v)
template toSszType*(v: BlsCurveType): auto = toRaw(v)
template toSszType*(v: ForkDigest|GraffitiBytes): auto = distinctBase(v)
template toSszType*(v: Version): auto = distinctBase(v)
Expand All @@ -34,6 +34,9 @@ template fromSszBytes*(T: type Slot, bytes: openArray[byte]): T =
template fromSszBytes*(T: type Epoch, bytes: openArray[byte]): T =
T fromSszBytes(uint64, bytes)

template fromSszBytes*(T: type SyncCommitteePeriod, bytes: openArray[byte]): T =
T fromSszBytes(uint64, bytes)

func fromSszBytes*(T: type ForkDigest, bytes: openArray[byte]): T {.raisesssz.} =
if bytes.len != sizeof(result):
raiseIncorrectSize T
Expand Down
42 changes: 42 additions & 0 deletions beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ logScope:

const
MAX_REQUEST_BLOCKS = 1024
MAX_REQUEST_LIGHT_CLIENT_UPDATES = 128

blockByRootLookupCost = allowedOpsPerSecondCost(50)
blockResponseCost = allowedOpsPerSecondCost(100)
blockByRangeLookupCost = allowedOpsPerSecondCost(20)
lightClientUpdateResponseCost = allowedOpsPerSecondCost(100)
lightClientUpdateByRangeLookupCost = allowedOpsPerSecondCost(20)

type
StatusMsg* = object
Expand Down Expand Up @@ -356,6 +359,45 @@ p2pProtocol BeaconSync(version = 1,
debug "Block root request done",
peer, roots = blockRoots.len, count, found

proc bestLightClientUpdatesByRange(
peer: Peer,
startPeriod: SyncCommitteePeriod,
reqCount: uint64,
reqStep: uint64,
response: MultipleChunksResponse[altair.LightClientUpdate])
{.async, libp2pProtocol("best_light_client_updates_by_range", 1).} =
trace "Received BestLightClientUpdatesByRange request",
peer, startPeriod, count = reqCount, step = reqStep
if reqCount > 0'u64 and reqStep > 0'u64:
let
dag = peer.networkState.dag
headPeriod = dag.head.slot.sync_committee_period
# Limit number of updates in response
count =
if startPeriod < headPeriod:
0'u64
else:
min(reqCount,
min(1 + (headPeriod - startPeriod) div reqStep,
MAX_REQUEST_LIGHT_CLIENT_UPDATES))
onePastPeriod = startPeriod + reqStep * count
peer.updateRequestQuota(
lightClientUpdateByRangeLookupCost +
count.float * lightClientUpdateResponseCost)
peer.awaitNonNegativeRequestQuota()

var found = 0
for period in startPeriod..<onePastPeriod:
let update = dag.getBestLightClientUpdateForPeriod(period)
if update.isSome():
await response.write(update.get)
inc found

debug "BestLightClientUpdatesByRange request done",
peer, startPeriod, count, reqStep, found
else:
raise newException(InvalidInputsError, "Empty range requested")

proc goodbye(peer: Peer,
reason: uint64)
{.async, libp2pProtocol("goodbye", 1).} =
Expand Down
37 changes: 29 additions & 8 deletions beacon_chain/validators/validator_duties.nim
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,18 @@ proc handleSyncCommitteeMessages(node: BeaconNode, head: BlockRef, slot: Slot) =
asyncSpawn createAndSendSyncCommitteeMessage(node, slot, validator,
subcommitteeIdx, head)

proc handleOptimisticLightClientUpdates(
node: BeaconNode, head: BlockRef, slot: Slot, didPropose: bool) =
if not didPropose:
return
let msg = node.dag.optimisticLightClientUpdate
if msg.attested_header.slot != head.parent.bid.slot:
notice "No optimistic light client update for proposed block",
slot = slot, block_root = shortLog(head.root)
return
node.network.broadcastOptimisticLightClientUpdate(msg)
notice "Sent optimistic light client update", message = shortLog(msg)

proc signAndSendContribution(node: BeaconNode,
validator: AttachedValidator,
contribution: SyncCommitteeContribution,
Expand Down Expand Up @@ -841,14 +853,14 @@ proc handleSyncCommitteeContributions(node: BeaconNode,
slot, head, subnet_id = candidateAggregators[i].subcommitteeIdx

proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
Future[BlockRef] {.async.} =
Future[tuple[head: BlockRef, didPropose: bool]] {.async.} =
## Perform the proposal for the given slot, iff we have a validator attached
## that is supposed to do so, given the shuffling at that slot for the given
## head - to compute the proposer, we need to advance a state to the given
## slot
let proposer = node.dag.getProposer(head, slot)
if proposer.isNone():
return head
return (head: head, didPropose: false)

let
proposerKey = node.dag.validatorKey(proposer.get).get().toPubKey
Expand All @@ -862,9 +874,12 @@ proc handleProposal(node: BeaconNode, head: BlockRef, slot: Slot):
proposer_index = proposer.get(),
proposer = shortLog(proposerKey)

head
(head: head, didPropose: false)
else:
await proposeBlock(node, validator, proposer.get(), head, slot)
(
head: await proposeBlock(node, validator, proposer.get(), head, slot),
didPropose: true
)

proc makeAggregateAndProof*(
pool: var AttestationPool, epochRef: EpochRef, slot: Slot, index: CommitteeIndex,
Expand Down Expand Up @@ -1052,6 +1067,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =

# Start by checking if there's work we should have done in the past that we
# can still meaningfully do
var didPropose = false
while curSlot < slot:
notice "Catching up on validator duties",
curSlot = shortLog(curSlot),
Expand All @@ -1061,7 +1077,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
# For every slot we're catching up, we'll propose then send
# attestations - head should normally be advancing along the same branch
# in this case
head = await handleProposal(node, head, curSlot)
(head, didPropose) = await handleProposal(node, head, curSlot)

# For each slot we missed, we need to send out attestations - if we were
# proposing during this time, we'll use the newly proposed head, else just
Expand All @@ -1071,7 +1087,7 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =

curSlot += 1

head = await handleProposal(node, head, slot)
(head, didPropose) = await handleProposal(node, head, slot)

let
# The latest point in time when we'll be sending out attestations
Expand Down Expand Up @@ -1116,11 +1132,16 @@ proc handleValidatorDuties*(node: BeaconNode, lastSlot, slot: Slot) {.async.} =
node.consensusManager[].updateHead(slot)
head = node.dag.head

static: doAssert attestationSlotOffset == syncCommitteeMessageSlotOffset

static:
doAssert attestationSlotOffset == syncCommitteeMessageSlotOffset
handleAttestations(node, head, slot)
handleSyncCommitteeMessages(node, head, slot)

if node.config.serveLightClientData:
static:
doAssert attestationSlotOffset == optimisticLightClientUpdateSlotOffset
handleOptimisticLightClientUpdates(node, head, slot, didPropose)

updateValidatorMetrics(node) # the important stuff is done, update the vanity numbers

# https://github.com/ethereum/consensus-specs/blob/v1.1.8/specs/phase0/validator.md#broadcast-aggregate
Expand Down

0 comments on commit 8b340dd

Please sign in to comment.