diff --git a/chain/exchange/client.go b/chain/exchange/client.go index 22f7a945742..57563d5b215 100644 --- a/chain/exchange/client.go +++ b/chain/exchange/client.go @@ -7,6 +7,7 @@ import ( "math/rand" "time" + "github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -357,6 +358,12 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } + defer func() { + // Note: this will become just stream.Close once we've completed the go-libp2p migration to + // go-libp2p-core 0.7.0 + go helpers.FullClose(stream) //nolint:errcheck + }() + // Write request. _ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline)) if err := cborutil.WriteCborRPC(stream, req); err != nil { diff --git a/chain/exchange/protocol.go b/chain/exchange/protocol.go index ca4b6183662..ac02cf60f31 100644 --- a/chain/exchange/protocol.go +++ b/chain/exchange/protocol.go @@ -40,7 +40,7 @@ const ( WriteReqDeadline = 5 * time.Second ReadResDeadline = WriteReqDeadline ReadResMinSpeed = 50 << 10 - ShufflePeersPrefix = 5 + ShufflePeersPrefix = 16 WriteResDeadline = 60 * time.Second ) diff --git a/chain/exchange/server.go b/chain/exchange/server.go index 54e169b3f19..dcdb5b3a5bf 100644 --- a/chain/exchange/server.go +++ b/chain/exchange/server.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/helpers" inet "github.com/libp2p/go-libp2p-core/network" ) @@ -39,7 +40,9 @@ func (s *server) HandleStream(stream inet.Stream) { ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream") defer span.End() - defer stream.Close() //nolint:errcheck + // Note: this will become just stream.Close once we've completed the go-libp2p migration to + // go-libp2p-core 0.7.0 + defer helpers.FullClose(stream) //nolint:errcheck var req Request if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil { diff --git a/chain/sync.go b/chain/sync.go index b5716a34308..74dc5aa1aa3 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "sort" - "strconv" "strings" "sync" "time" @@ -63,20 +62,12 @@ var ( // where the Syncer publishes candidate chain heads to be synced. LocalIncoming = "incoming" - log = logging.Logger("chain") - defaultMessageFetchWindowSize = 200 -) + log = logging.Logger("chain") -func init() { - if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" { - val, err := strconv.Atoi(s) - if err != nil { - log.Errorf("failed to parse LOTUS_BSYNC_MSG_WINDOW: %s", err) - return - } - defaultMessageFetchWindowSize = val - } -} + concurrentSyncRequests = exchange.ShufflePeersPrefix + syncRequestBatchSize = 8 + syncRequestRetries = 5 +) // Syncer is in charge of running the chain synchronization logic. As such, it // is tasked with these functions, amongst others: @@ -132,8 +123,6 @@ type Syncer struct { verifier ffiwrapper.Verifier - windowSize int - tickerCtxCancel context.CancelFunc checkptLk sync.Mutex @@ -175,7 +164,6 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C receiptTracker: newBlockReceiptTracker(), connmgr: connmgr, verifier: verifier, - windowSize: defaultMessageFetchWindowSize, incoming: pubsub.New(50), } @@ -1483,8 +1471,6 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers)))) - windowSize := syncer.windowSize -mainLoop: for i := len(headers) - 1; i >= 0; { fts, err := syncer.store.TryFillTipSet(headers[i]) if err != nil { @@ -1498,34 +1484,19 @@ mainLoop: continue } - batchSize := windowSize + batchSize := concurrentSyncRequests * syncRequestBatchSize if i < batchSize { - batchSize = i + batchSize = i + 1 } - nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index - ss.SetStage(api.StageFetchingMessages) - var bstout []*exchange.CompactedMessages - for len(bstout) < batchSize { - next := headers[nextI] - - nreq := batchSize - len(bstout) - bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq)) - if err != nil { - // TODO check errors for temporary nature - if windowSize > 1 { - windowSize /= 2 - log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize) - continue mainLoop - } - return xerrors.Errorf("message processing failed: %w", err) - } + startOffset := i + 1 - batchSize + bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset) + ss.SetStage(api.StageMessages) - bstout = append(bstout, bstips...) - nextI += len(bstips) + if batchErr != nil { + return xerrors.Errorf("failed to fetch messages: %w", err) } - ss.SetStage(api.StageMessages) for bsi := 0; bsi < len(bstout); bsi++ { // temp storage so we don't persist data we dont want to @@ -1555,24 +1526,76 @@ mainLoop: } } - if i >= windowSize { - newWindowSize := windowSize + 10 - if newWindowSize > int(exchange.MaxRequestLength) { - newWindowSize = int(exchange.MaxRequestLength) + i -= batchSize + } + + return nil +} + +func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) { + batchSize := len(headers) + batch := make([]*exchange.CompactedMessages, batchSize) + + var wg sync.WaitGroup + var mx sync.Mutex + var batchErr error + + start := build.Clock.Now() + + for j := 0; j < batchSize; j += syncRequestBatchSize { + wg.Add(1) + go func(j int) { + defer wg.Done() + + nreq := syncRequestBatchSize + if j+nreq > batchSize { + nreq = batchSize - j } - if newWindowSize > windowSize { - windowSize = newWindowSize - log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize) + + failed := false + for offset := 0; !failed && offset < nreq; { + nextI := j + offset + nextHeader := headers[nextI] + + var requestErr error + var requestResult []*exchange.CompactedMessages + for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { + if retry > 0 { + log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry) + } else { + log.Infof("fetching messages at %d", startOffset+nextI) + } + + result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) + if err != nil { + requestErr = multierror.Append(requestErr, err) + } else { + requestResult = result + } + } + + mx.Lock() + if requestResult != nil { + copy(batch[j+offset:], requestResult) + offset += len(requestResult) + } else { + log.Errorf("error fetching messages at %d: %s", nextI, requestErr) + batchErr = multierror.Append(batchErr, requestErr) + failed = true + } + mx.Unlock() } - } + }(j) + } + wg.Wait() - i -= batchSize + if batchErr != nil { + return nil, batchErr } - // remember our window size - syncer.windowSize = windowSize + log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start)) - return nil + return batch, nil } func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error {