Skip to content

Commit

Permalink
feat(blocksync): set the max number of (concurrently) downloaded bloc…
Browse files Browse the repository at this point in the history
…ks (backport #2467) (#2515)

Manual backport of #2467
  • Loading branch information
melekes authored Mar 11, 2024
1 parent d27a96e commit 6d606ce
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[blocksync]` make the max number of downloaded blocks dynamic.
Previously it was a const 600. Now it's `peersCount * maxPendingRequestsPerPeer (20)`
[\#2467](https://github.com/cometbft/cometbft/pull/2467)
47 changes: 18 additions & 29 deletions blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ eg, L = latency = 0.1s

const (
requestIntervalMS = 2
maxTotalRequesters = 600
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
requestRetrySeconds = 30

Expand All @@ -41,9 +39,6 @@ const (
// Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s,
// sending data across atlantic ~ 7.5 KB/s.
minRecvRate = 7680

// Maximum difference between current and new block's height.
maxDiffBetweenCurrentAndReceivedBlockHeight = 100
)

var peerTimeout = 15 * time.Second // not const so we can override with tests
Expand Down Expand Up @@ -108,24 +103,27 @@ func (pool *BlockPool) OnStart() error {
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
break
return
}

_, numPending, lenRequesters := pool.GetStatus()
pool.mtx.Lock()
var (
maxRequestersCreated = len(pool.requesters) >= len(pool.peers)*maxPendingRequestsPerPeer

nextHeight = pool.height + int64(len(pool.requesters))
maxPeerHeightReached = nextHeight > pool.maxPeerHeight
)
pool.mtx.Unlock()

switch {
case numPending >= maxPendingRequests:
// sleep for a bit.
case maxRequestersCreated: // If we have enough requesters, wait for them to finish.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
case lenRequesters >= maxTotalRequesters:
// sleep for a bit.
case maxPeerHeightReached: // If we're caught up, wait for a bit so reactor could finish or a higher height is reported.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
default:
// request for more blocks.
pool.makeNextRequester()
pool.makeNextRequester(nextHeight)
}
}
}
Expand Down Expand Up @@ -277,7 +275,8 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, extCommit *ty
if diff < 0 {
diff *= -1
}
if diff > maxDiffBetweenCurrentAndReceivedBlockHeight {
const maxDiff = 100 // maximum difference between current and received block height
if diff > maxDiff {
pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
}
return fmt.Errorf("peer sent us a block we didn't expect (peer: %s, current height: %d, block height: %d)", peerID, pool.height, block.Height)
Expand Down Expand Up @@ -391,30 +390,20 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
return nil
}

func (pool *BlockPool) makeNextRequester() {
func (pool *BlockPool) makeNextRequester(nextHeight int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

nextHeight := pool.height + pool.requestersLen()
if nextHeight > pool.maxPeerHeight {
return
}

request := newBPRequester(pool, nextHeight)

pool.requesters[nextHeight] = request
atomic.AddInt32(&pool.numPending, 1)

err := request.Start()
if err != nil {
if err := request.Start(); err != nil {
request.Logger.Error("Error starting request", "err", err)
}
}

func (pool *BlockPool) requestersLen() int64 {
return int64(len(pool.requesters))
}

func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
if !pool.IsRunning() {
return
Expand All @@ -437,7 +426,7 @@ func (pool *BlockPool) debug() string {
defer pool.mtx.Unlock()

str := ""
nextHeight := pool.height + pool.requestersLen()
nextHeight := pool.height + int64(len(pool.requesters))
for h := pool.height; h < nextHeight; h++ {
if pool.requesters[h] == nil {
str += fmt.Sprintf("H(%v):X ", h)
Expand Down
21 changes: 13 additions & 8 deletions blocksync/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@ func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
}

func TestBlockPoolBasic(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
var (
start = int64(42)
peers = makePeers(10, start, 1000)
errorsCh = make(chan peerError)
requestsCh = make(chan BlockRequest)
)
pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())

Expand Down Expand Up @@ -141,10 +143,13 @@ func TestBlockPoolBasic(t *testing.T) {
}

func TestBlockPoolTimeout(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
var (
start = int64(42)
peers = makePeers(10, start, 1000)
errorsCh = make(chan peerError)
requestsCh = make(chan BlockRequest)
)

pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
Expand Down
5 changes: 4 additions & 1 deletion blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch, stores were left in an inconsistent state", state.LastBlockHeight,
storeHeight))
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)

// It's okay to block since sendRequest is called from a separate goroutine
// (bpRequester#requestRoutine; 1 per each peer).
requestsCh := make(chan BlockRequest)

const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
Expand Down

0 comments on commit 6d606ce

Please sign in to comment.