Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catchup: avoid requesting blocks that aren't needed by the ledger #3089

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package catchup

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -155,8 +156,19 @@ func (s *Service) SynchronizingTime() time.Duration {
return time.Duration(timeInNS - startNS)
}

// errLedgerAlreadyHasBlock is returned by innerFetch in case the local ledger already has the requested block.
var errLedgerAlreadyHasBlock = errors.New("ledger already has block")

// function scope to make a bunch of defer statements better
func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeeping.Block, cert *agreement.Certificate, ddur time.Duration, err error) {
ledgerWaitCh := s.ledger.Wait(r)
select {
case <-ledgerWaitCh:
// if our ledger already have this block, no need to attempt to fetch it.
return nil, nil, time.Duration(0), errLedgerAlreadyHasBlock
default:
}

ctx, cf := context.WithCancel(s.ctx)
fetcher := makeUniversalBlockFetcher(s.log, s.net, s.cfg)
defer cf()
Expand All @@ -165,11 +177,21 @@ func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeepin
go func() {
select {
case <-stopWaitingForLedgerRound:
case <-s.ledger.Wait(r):
case <-ledgerWaitCh:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, don't we want errLedgerAlreadyHasBlock returned when ledgerWaitCh fires?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea. I'll make it so.

cf()
}
}()
return fetcher.fetchBlock(ctx, r, peer)
blk, cert, ddur, err = fetcher.fetchBlock(ctx, r, peer)
// check to see if we aborted due to ledger.
if err != nil {
select {
case <-ledgerWaitCh:
// yes, we aborted since the ledger received this round.
err = errLedgerAlreadyHasBlock
default:
}
}
return
}

// fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary.
Expand Down Expand Up @@ -218,6 +240,10 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
block, cert, blockDownloadDuration, err := s.innerFetch(r, peer)

if err != nil {
if err == errLedgerAlreadyHasBlock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is reported only when the ledger already has the block by the time innferFetch first statements are executed.
However, when the innerFetch returns an error because the ledger already has the block, but the ledger had the block after the first innerFetch statements were executed, the error will not be reported as errLedgerAlreadyHasBlock.

Is this the expected behavior?

// ledger already has the block, no need to request this block from anyone.
return true
}
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
// we've just failed to retrieve a block; wait until the previous block is fetched before trying again
Expand Down