diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f5bdb3c23457..9c19543a46de 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -219,7 +219,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, stateBloom: stateBloom, mux: mux, checkpoint: checkpoint, - queue: newQueue(blockCacheItems), + queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), peers: newPeerSet(), rttEstimate: uint64(rttMaxEstimate), rttConfidence: uint64(1000000), @@ -379,7 +379,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.stateBloom.Close() } // Reset the queue, peer set and wake channels to clean any internal leftover state - d.queue.Reset(blockCacheItems) + d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems) d.peers.Reset() for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7c165c63c3a6..51d485761b6c 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -39,7 +39,7 @@ import ( func init() { fullMaxForkAncestry = 10000 lightMaxForkAncestry = 10000 - blockCacheItems = 1024 + blockCacheMaxItems = 1024 fsHeaderContCheck = 500 * time.Millisecond } @@ -544,7 +544,7 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) tester.newPeer("peer", protocol, chain) // Synchronise with the peer and make sure all relevant data was retrieved @@ -607,8 +607,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { } tester.lock.Unlock() - if cached == blockCacheItems || - cached == blockCacheItems-reorgProtHeaderDelay || + if cached == blockCacheMaxItems || + cached == blockCacheMaxItems-reorgProtHeaderDelay || retrieved+cached+frozen == targetBlocks+1 || retrieved+cached+frozen == targetBlocks+1-reorgProtHeaderDelay { break @@ -619,8 +619,8 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) { tester.lock.RLock() retrieved = len(tester.ownBlocks) tester.lock.RUnlock() - if cached != blockCacheItems && cached != blockCacheItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay { - t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheItems, retrieved, frozen, targetBlocks+1) + if cached != blockCacheMaxItems && cached != blockCacheMaxItems-reorgProtHeaderDelay && retrieved+cached+frozen != targetBlocks+1 && retrieved+cached+frozen != targetBlocks+1-reorgProtHeaderDelay { + t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1) } // Permit the blocked blocks to import @@ -873,7 +873,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Create peers of every type tester.newPeer("peer 63", 63, chain) @@ -965,7 +965,7 @@ func testMissingHeaderAttack(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) brokenChain := chain.shorten(chain.len()) delete(brokenChain.headerm, brokenChain.chain[brokenChain.len()/2]) tester.newPeer("attack", protocol, brokenChain) @@ -997,7 +997,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Attempt a full sync with an attacker feeding shifted headers brokenChain := chain.shorten(chain.len()) @@ -1202,7 +1202,7 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) @@ -1362,7 +1362,7 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) @@ -1435,7 +1435,7 @@ func testFakedSyncProgress(t *testing.T, protocol int, mode SyncMode) { tester := newTester() defer tester.terminate() - chain := testChainBase.shorten(blockCacheItems - 15) + chain := testChainBase.shorten(blockCacheMaxItems - 15) // Set a sync init hook to catch progress changes starting := make(chan struct{}) diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 7e88e10b0185..745f7c7480f7 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -40,9 +40,10 @@ const ( ) var ( - blockCacheItems = 8192 // Maximum number of blocks to cache before throttling the download - blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching - blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones + blockCacheMaxItems = 8192 // Maximum number of blocks to cache before throttling the download + blockCacheInitialItems = 2048 // Initial number of blocks to start fetching, before we know the sizes of the blocks + blockCacheMemory = 64 * 1024 * 1024 // Maximum amount of memory to use for block caching + blockCacheSizeWeight = 0.1 // Multiplier to approximate the average block size based on past ones ) var ( @@ -142,7 +143,7 @@ type queue struct { } // newQueue creates a new download queue for scheduling block retrieval. -func newQueue(blockCacheLimit int) *queue { +func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { lock := new(sync.RWMutex) q := &queue{ headerContCh: make(chan bool), @@ -151,12 +152,12 @@ func newQueue(blockCacheLimit int) *queue { active: sync.NewCond(lock), lock: lock, } - q.Reset(blockCacheLimit) + q.Reset(blockCacheLimit, thresholdInitialSize) return q } // Reset clears out the queue contents. -func (q *queue) Reset(blockCacheLimit int) { +func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) { q.lock.Lock() defer q.lock.Unlock() @@ -175,6 +176,7 @@ func (q *queue) Reset(blockCacheLimit int) { q.receiptPendPool = make(map[string]*fetchRequest) q.resultCache = newResultStore(blockCacheLimit) + q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize)) } // Close marks the end of the sync, unblocking Results. diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index 07862390f942..aedfba456525 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -97,7 +97,7 @@ func dummyPeer(id string) *peerConnection { } func TestBasics(t *testing.T) { - q := newQueue(10) + q := newQueue(10, 10) if !q.Idle() { t.Errorf("new queue should be idle") } @@ -174,7 +174,7 @@ func TestBasics(t *testing.T) { } func TestEmptyBlocks(t *testing.T) { - q := newQueue(10) + q := newQueue(10, 10) q.Prepare(1, FastSync) // Schedule a batch of headers @@ -244,7 +244,7 @@ func XTestDelivery(t *testing.T) { log.Root().SetHandler(log.StdoutHandler) } - q := newQueue(10) + q := newQueue(10, 10) var wg sync.WaitGroup q.Prepare(1, FastSync) wg.Add(1) diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index 26b6b6a460d7..66376502c55f 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -39,7 +39,7 @@ var ( ) // The common prefix of all test chains: -var testChainBase = newTestChain(blockCacheItems+200, testGenesis) +var testChainBase = newTestChain(blockCacheMaxItems+200, testGenesis) // Different forks on top of the base chain: var testChainForkLightA, testChainForkLightB, testChainForkHeavy *testChain