Skip to content

Commit

Permalink
avoid re-requesting finalized blocks during sync (#3461)
Browse files Browse the repository at this point in the history
When a `beaconBlocksByRange` response advances the `safeSlot`, but later
has errors, the sync queue keeps repeating that same request until it is
fulfilled without errors. Data up through `safeSlot` is considered to be
immutable, i.e., finalized, so re-requesting that data is not useful.
By advancing the sync progress in that scenario, those redundant query
portions can be avoided. Note, the finalized block _itself_ is always
requested, even in the initial request. This behaviour is kept same.
  • Loading branch information
etan-status authored Mar 15, 2022
1 parent 7256925 commit 6d1d31d
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 64 deletions.
8 changes: 6 additions & 2 deletions AllTests-mainnet.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,17 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
+ Process all unviable blocks OK
+ [SyncQueue#Backward] Async unordered push test OK
+ [SyncQueue#Backward] Async unordered push with rewind test OK
+ [SyncQueue#Backward] Good response with missing values towards end OK
+ [SyncQueue#Backward] Handle out-of-band sync progress advancement OK
+ [SyncQueue#Backward] Pass through established limits test OK
+ [SyncQueue#Backward] Smoke test OK
+ [SyncQueue#Backward] Start and finish slots equal OK
+ [SyncQueue#Backward] Two full requests success/fail OK
+ [SyncQueue#Backward] getRewindPoint() test OK
+ [SyncQueue#Forward] Async unordered push test OK
+ [SyncQueue#Forward] Async unordered push with rewind test OK
+ [SyncQueue#Forward] Good response with missing values towards end OK
+ [SyncQueue#Forward] Handle out-of-band sync progress advancement OK
+ [SyncQueue#Forward] Pass through established limits test OK
+ [SyncQueue#Forward] Smoke test OK
+ [SyncQueue#Forward] Start and finish slots equal OK
Expand All @@ -419,7 +423,7 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
+ [SyncQueue] getLastNonEmptySlot() test OK
+ [SyncQueue] hasEndGap() test OK
```
OK: 19/19 Fail: 0/19 Skip: 0/19
OK: 23/23 Fail: 0/23 Skip: 0/23
## Zero signature sanity checks
```diff
+ SSZ serialization roundtrip of SignedBeaconBlockHeader OK
Expand Down Expand Up @@ -521,4 +525,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1

---TOTAL---
OK: 285/291 Fail: 0/291 Skip: 6/291
OK: 289/295 Fail: 0/295 Skip: 6/295
183 changes: 141 additions & 42 deletions beacon_chain/sync/sync_queue.nim
Original file line number Diff line number Diff line change
Expand Up @@ -511,9 +511,38 @@ proc advanceInput[T](sq: SyncQueue[T], number: uint64) =
proc notInRange[T](sq: SyncQueue[T], sr: SyncRequest[T]): bool =
case sq.kind
of SyncQueueKind.Forward:
(sq.queueSize > 0) and (sr.slot != sq.outSlot)
(sq.queueSize > 0) and (sr.slot > sq.outSlot)
of SyncQueueKind.Backward:
(sq.queueSize > 0) and (sr.slot + sr.count - 1'u64 != sq.outSlot)
(sq.queueSize > 0) and (sr.lastSlot < sq.outSlot)

func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
## Compute the number of slots covered by a given `SyncRequest` that are
## already known and, hence, no longer relevant for sync progression.
let
outSlot = sq.outSlot
lowSlot = sr.slot
highSlot = sr.lastSlot
case sq.kind
of SyncQueueKind.Forward:
if outSlot > highSlot:
# Entire request is no longer relevant.
sr.count
elif outSlot > lowSlot:
# Request is only partially relevant.
outSlot - lowSlot
else:
# Entire request is still relevant.
0
of SyncQueueKind.Backward:
if lowSlot > outSlot:
# Entire request is no longer relevant.
sr.count
elif highSlot > outSlot:
# Request is only partially relevant.
highSlot - outSlot
else:
# Entire request is still relevant.
0

proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock],
Expand Down Expand Up @@ -552,14 +581,14 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
case sq.kind
of SyncQueueKind.Forward:
let minSlot = sq.readyQueue[0].request.slot
if sq.outSlot != minSlot:
if sq.outSlot < minSlot:
none[SyncResult[T]]()
else:
some(sq.readyQueue.pop())
of SyncQueueKind.Backward:
let maxSlot = sq.readyQueue[0].request.slot +
(sq.readyQueue[0].request.count - 1'u64)
if sq.outSlot != maxSlot:
if sq.outSlot > maxSlot:
none[SyncResult[T]]()
else:
some(sq.readyQueue.pop())
Expand Down Expand Up @@ -625,14 +654,16 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
let retryRequest =
hasInvalidBlock or unviableBlock.isSome() or missingParentSlot.isSome()
if not retryRequest:
sq.advanceOutput(item.request.count)
let numSlotsAdvanced = item.request.count - sq.numAlreadyKnownSlots(sr)
sq.advanceOutput(numSlotsAdvanced)

if hasOkBlock:
# If there no error and response was not empty we should reward peer
# with some bonus score - not for duplicate blocks though.
item.request.item.updateScore(PeerScoreGoodBlocks)

sq.wakeupWaiters()
if numSlotsAdvanced > 0:
sq.wakeupWaiters()
else:
debug "Block pool rejected peer's response", request = item.request,
blocks_map = getShortMap(item.request, item.data),
Expand Down Expand Up @@ -721,9 +752,74 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
sq.pending.del(sr.index)
sq.toDebtsQueue(sr)

proc handlePotentialSafeSlotAdvancement[T](sq: SyncQueue[T]) =
# It may happen that sync progress advanced to a newer `safeSlot`, either
# by a response that started with good values and only had errors late, or
# through an out-of-band mechanism, e.g., VC / REST.
# If that happens, advance to the new `safeSlot` to avoid repeating requests
# for data that is considered immutable and no longer relevant.
let
safeSlot = sq.getSafeSlot()
numSlotsAdvanced: uint64 =
case sq.kind
of SyncQueueKind.Forward:
if safeSlot > sq.outSlot:
safeSlot - sq.outSlot
else:
0
of SyncQueueKind.Backward:
if sq.outSlot > safeSlot:
sq.outSlot - safeSlot
else:
0
if numSlotsAdvanced != 0:
debug "Sync progress advanced out-of-band",
slot_before = sq.outSlot, slot_after = safeSlot
sq.advanceOutput(numSlotsAdvanced)
sq.wakeupWaiters()

func updateRequestForNewSafeSlot[T](sq: SyncQueue[T], sr: var SyncRequest[T]) =
# Requests may have originated before the latest `safeSlot` advancement.
# Update it to not request any data prior to `safeSlot`.
let
outSlot = sq.outSlot
lowSlot = sr.slot
highSlot = sr.lastSlot
case sq.kind
of SyncQueueKind.Forward:
if outSlot <= lowSlot:
# Entire request is still relevant.
discard
elif outSlot <= highSlot:
# Request is only partially relevant.
let
numSlotsDone = outSlot - lowSlot
numStepsDone = (numSlotsDone + sr.step - 1) div sr.step
sr.slot += numStepsDone * sr.step
sr.count -= numStepsDone
else:
# Entire request is no longer relevant.
sr.step = 0
sr.count = 0
of SyncQueueKind.Backward:
if outSlot >= highSlot:
# Entire request is still relevant.
discard
elif outSlot >= lowSlot:
# Request is only partially relevant.
let
numSlotsDone = highSlot - outSlot
numStepsDone = (numSlotsDone + sr.step - 1) div sr.step
sr.count -= numStepsDone
else:
# Entire request is no longer relevant.
sr.step = 0
sr.count = 0

proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
## Create new request according to current SyncQueue parameters.
if len(sq.debtsQueue) > 0:
sq.handlePotentialSafeSlotAdvancement()
while len(sq.debtsQueue) > 0:
if maxSlot < sq.debtsQueue[0].slot:
# Peer's latest slot is less than starting request's slot.
return SyncRequest.empty(sq.kind, T)
Expand All @@ -732,44 +828,47 @@ proc pop*[T](sq: SyncQueue[T], maxslot: Slot, item: T): SyncRequest[T] =
return SyncRequest.empty(sq.kind, T)
var sr = sq.debtsQueue.pop()
sq.debtsCount = sq.debtsCount - sr.count
sq.updateRequestForNewSafeSlot(sr)
if sr.isEmpty:
continue
sr.setItem(item)
sq.makePending(sr)
return sr

case sq.kind
of SyncQueueKind.Forward:
if maxSlot < sq.inpSlot:
# Peer's latest slot is less than queue's input slot.
return SyncRequest.empty(sq.kind, T)
if sq.inpSlot > sq.finalSlot:
# Queue's input slot is bigger than queue's final slot.
return SyncRequest.empty(sq.kind, T)
let lastSlot = min(maxslot, sq.finalSlot)
let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item)
sq.advanceInput(count)
sq.makePending(sr)
sr
of SyncQueueKind.Backward:
if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64:
return SyncRequest.empty(sq.kind, T)
if sq.inpSlot < sq.finalSlot:
return SyncRequest.empty(sq.kind, T)
let (slot, count) =
block:
let baseSlot = sq.inpSlot + 1'u64
if baseSlot - sq.finalSlot < sq.chunkSize:
let count = uint64(baseSlot - sq.finalSlot)
(baseSlot - count, count)
else:
(baseSlot - sq.chunkSize, sq.chunkSize)
if (maxSlot + 1'u64) < slot + count:
# Peer's latest slot is less than queue's input slot.
return SyncRequest.empty(sq.kind, T)
var sr = SyncRequest.init(sq.kind, slot, count, item)
sq.advanceInput(count)
sq.makePending(sr)
sr
else:
case sq.kind
of SyncQueueKind.Forward:
if maxSlot < sq.inpSlot:
# Peer's latest slot is less than queue's input slot.
return SyncRequest.empty(sq.kind, T)
if sq.inpSlot > sq.finalSlot:
# Queue's input slot is bigger than queue's final slot.
return SyncRequest.empty(sq.kind, T)
let lastSlot = min(maxslot, sq.finalSlot)
let count = min(sq.chunkSize, lastSlot + 1'u64 - sq.inpSlot)
var sr = SyncRequest.init(sq.kind, sq.inpSlot, count, item)
sq.advanceInput(count)
sq.makePending(sr)
sr
of SyncQueueKind.Backward:
if sq.inpSlot == 0xFFFF_FFFF_FFFF_FFFF'u64:
return SyncRequest.empty(sq.kind, T)
if sq.inpSlot < sq.finalSlot:
return SyncRequest.empty(sq.kind, T)
let (slot, count) =
block:
let baseSlot = sq.inpSlot + 1'u64
if baseSlot - sq.finalSlot < sq.chunkSize:
let count = uint64(baseSlot - sq.finalSlot)
(baseSlot - count, count)
else:
(baseSlot - sq.chunkSize, sq.chunkSize)
if (maxSlot + 1'u64) < slot + count:
# Peer's latest slot is less than queue's input slot.
return SyncRequest.empty(sq.kind, T)
var sr = SyncRequest.init(sq.kind, slot, count, item)
sq.advanceInput(count)
sq.makePending(sr)
sr

proc debtLen*[T](sq: SyncQueue[T]): uint64 =
sq.debtsCount
Expand Down
Loading

0 comments on commit 6d1d31d

Please sign in to comment.