From 61c0b8c3db61ccb1d97fe05fd856c74fda0d0451 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 10:14:28 +0300 Subject: [PATCH 01/11] properly close streams in blocksync we were leaking streams right and left... --- chain/exchange/client.go | 7 +++++++ chain/exchange/server.go | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/chain/exchange/client.go b/chain/exchange/client.go index 22f7a945742..57563d5b215 100644 --- a/chain/exchange/client.go +++ b/chain/exchange/client.go @@ -7,6 +7,7 @@ import ( "math/rand" "time" + "github.com/libp2p/go-libp2p-core/helpers" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -357,6 +358,12 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque return nil, xerrors.Errorf("failed to open stream to peer: %w", err) } + defer func() { + // Note: this will become just stream.Close once we've completed the go-libp2p migration to + // go-libp2p-core 0.7.0 + go helpers.FullClose(stream) //nolint:errcheck + }() + // Write request. _ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline)) if err := cborutil.WriteCborRPC(stream, req); err != nil { diff --git a/chain/exchange/server.go b/chain/exchange/server.go index 54e169b3f19..dcdb5b3a5bf 100644 --- a/chain/exchange/server.go +++ b/chain/exchange/server.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/helpers" inet "github.com/libp2p/go-libp2p-core/network" ) @@ -39,7 +40,9 @@ func (s *server) HandleStream(stream inet.Stream) { ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream") defer span.End() - defer stream.Close() //nolint:errcheck + // Note: this will become just stream.Close once we've completed the go-libp2p migration to + // go-libp2p-core 0.7.0 + defer helpers.FullClose(stream) //nolint:errcheck var req Request if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil { From 35f6e1064620f05f29f8ec775f453b2e56c3e9ae Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 21:04:44 +0300 Subject: [PATCH 02/11] parallel chain sync --- chain/sync.go | 116 +++++++++++++++++++++++++++----------------------- 1 file changed, 63 insertions(+), 53 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index b5716a34308..882fb1d9597 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "sort" - "strconv" "strings" "sync" "time" @@ -63,20 +62,12 @@ var ( // where the Syncer publishes candidate chain heads to be synced. LocalIncoming = "incoming" - log = logging.Logger("chain") - defaultMessageFetchWindowSize = 200 -) + log = logging.Logger("chain") -func init() { - if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" { - val, err := strconv.Atoi(s) - if err != nil { - log.Errorf("failed to parse LOTUS_BSYNC_MSG_WINDOW: %s", err) - return - } - defaultMessageFetchWindowSize = val - } -} + concurrentSyncRequests = 16 + syncRequestBatchSize = 4 + syncRequestRetries = 5 +) // Syncer is in charge of running the chain synchronization logic. As such, it // is tasked with these functions, amongst others: @@ -132,8 +123,6 @@ type Syncer struct { verifier ffiwrapper.Verifier - windowSize int - tickerCtxCancel context.CancelFunc checkptLk sync.Mutex @@ -175,7 +164,6 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C receiptTracker: newBlockReceiptTracker(), connmgr: connmgr, verifier: verifier, - windowSize: defaultMessageFetchWindowSize, incoming: pubsub.New(50), } @@ -1483,8 +1471,6 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers)))) - windowSize := syncer.windowSize -mainLoop: for i := len(headers) - 1; i >= 0; { fts, err := syncer.store.TryFillTipSet(headers[i]) if err != nil { @@ -1498,35 +1484,73 @@ mainLoop: continue } - batchSize := windowSize + batchSize := concurrentSyncRequests * syncRequestBatchSize if i < batchSize { - batchSize = i + if i == 0 { + batchSize = 1 + } else { + batchSize = i + } } - nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index - ss.SetStage(api.StageFetchingMessages) - var bstout []*exchange.CompactedMessages - for len(bstout) < batchSize { - next := headers[nextI] - - nreq := batchSize - len(bstout) - bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq)) - if err != nil { - // TODO check errors for temporary nature - if windowSize > 1 { - windowSize /= 2 - log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize) - continue mainLoop + bstout := make([]*exchange.CompactedMessages, batchSize) + var wg sync.WaitGroup + var mx sync.Mutex + var batchErr error + for j := 0; j < batchSize; j += syncRequestBatchSize { + wg.Add(1) + go func(j int) { + defer wg.Done() + + nreq := syncRequestBatchSize + if j*syncRequestBatchSize+nreq > batchSize { + nreq = batchSize - j*syncRequestBatchSize } - return xerrors.Errorf("message processing failed: %w", err) - } - bstout = append(bstout, bstips...) - nextI += len(bstips) + failed := false + for offset := 0; !failed && offset < nreq; { + nextI := (i + 1) - batchSize + j*syncRequestBatchSize + offset + nextHeader := headers[nextI] + + var requestErr error + var requestResult []*exchange.CompactedMessages + for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { + if retry > 0 { + log.Infof("fetching messages at %d (retry %d)", nextI, retry) + } else { + log.Infof("fetching messages at %d", nextI) + } + + result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) + if err != nil { + requestErr = multierror.Append(requestErr, err) + } else { + requestResult = result + } + } + + mx.Lock() + if requestResult == nil { + // we failed! + log.Errorf("error fetching messages at %d: %s", nextI, requestErr) + batchErr = multierror.Append(batchErr, requestErr) + failed = true + } else { + copy(bstout[j*syncRequestBatchSize+offset:], requestResult) + offset += len(requestResult) + } + mx.Unlock() + } + }(j) } + wg.Wait() ss.SetStage(api.StageMessages) + if batchErr != nil { + return xerrors.Errorf("failed to fetch messages: %w", err) + } + for bsi := 0; bsi < len(bstout); bsi++ { // temp storage so we don't persist data we dont want to bs := bstore.NewTemporary() @@ -1555,23 +1579,9 @@ mainLoop: } } - if i >= windowSize { - newWindowSize := windowSize + 10 - if newWindowSize > int(exchange.MaxRequestLength) { - newWindowSize = int(exchange.MaxRequestLength) - } - if newWindowSize > windowSize { - windowSize = newWindowSize - log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize) - } - } - i -= batchSize } - // remember our window size - syncer.windowSize = windowSize - return nil } From 05a233f84d6de0916b1135f46eef3a61c7aa02ae Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 21:43:35 +0300 Subject: [PATCH 03/11] add some more logging --- chain/sync.go | 1 + 1 file changed, 1 insertion(+) diff --git a/chain/sync.go b/chain/sync.go index 882fb1d9597..87d2cf6f4e2 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1537,6 +1537,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS batchErr = multierror.Append(batchErr, requestErr) failed = true } else { + log.Infof("fetched messages for %d tipsets", len(requestResult)) copy(bstout[j*syncRequestBatchSize+offset:], requestResult) offset += len(requestResult) } From b984e94a87237b14ef1eb90df95d5979b4c665bf Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 21:52:08 +0300 Subject: [PATCH 04/11] fix bug --- chain/sync.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 87d2cf6f4e2..80a5ad423a0 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1504,13 +1504,13 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS defer wg.Done() nreq := syncRequestBatchSize - if j*syncRequestBatchSize+nreq > batchSize { - nreq = batchSize - j*syncRequestBatchSize + if j+nreq > batchSize { + nreq = batchSize - j } failed := false for offset := 0; !failed && offset < nreq; { - nextI := (i + 1) - batchSize + j*syncRequestBatchSize + offset + nextI := (i + 1) - batchSize + j + offset nextHeader := headers[nextI] var requestErr error @@ -1537,8 +1537,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS batchErr = multierror.Append(batchErr, requestErr) failed = true } else { - log.Infof("fetched messages for %d tipsets", len(requestResult)) - copy(bstout[j*syncRequestBatchSize+offset:], requestResult) + copy(bstout[j+offset:], requestResult) offset += len(requestResult) } mx.Unlock() From 8a4b629f407a9b0ff1e39e9eeb5ba7d541c4adfc Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 21:55:51 +0300 Subject: [PATCH 05/11] increase sync request batch size to 8 --- chain/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync.go b/chain/sync.go index 80a5ad423a0..554f81ee99e 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -65,7 +65,7 @@ var ( log = logging.Logger("chain") concurrentSyncRequests = 16 - syncRequestBatchSize = 4 + syncRequestBatchSize = 8 syncRequestRetries = 5 ) From 2a428f09e67f01bdd927f09e9e40f758b2bd60e1 Mon Sep 17 00:00:00 2001 From: vyzo Date: Wed, 16 Sep 2020 22:09:36 +0300 Subject: [PATCH 06/11] increase exchange ShufflePeersPrefix to 16, use that as the value of concurrent sync requests --- chain/exchange/protocol.go | 2 +- chain/sync.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chain/exchange/protocol.go b/chain/exchange/protocol.go index ca4b6183662..ac02cf60f31 100644 --- a/chain/exchange/protocol.go +++ b/chain/exchange/protocol.go @@ -40,7 +40,7 @@ const ( WriteReqDeadline = 5 * time.Second ReadResDeadline = WriteReqDeadline ReadResMinSpeed = 50 << 10 - ShufflePeersPrefix = 5 + ShufflePeersPrefix = 16 WriteResDeadline = 60 * time.Second ) diff --git a/chain/sync.go b/chain/sync.go index 554f81ee99e..0ab8ac183f8 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -64,7 +64,7 @@ var ( log = logging.Logger("chain") - concurrentSyncRequests = 16 + concurrentSyncRequests = exchange.ShufflePeersPrefix syncRequestBatchSize = 8 syncRequestRetries = 5 ) From fb605f6d7fedea4704882a631d3155b0c280c599 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Sep 2020 17:21:26 +0300 Subject: [PATCH 07/11] refactor parallel fetch logic into a separate function --- chain/sync.go | 118 ++++++++++++++++++++++++++++---------------------- 1 file changed, 67 insertions(+), 51 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 0ab8ac183f8..95c2e2e8428 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1494,57 +1494,8 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS } ss.SetStage(api.StageFetchingMessages) - bstout := make([]*exchange.CompactedMessages, batchSize) - var wg sync.WaitGroup - var mx sync.Mutex - var batchErr error - for j := 0; j < batchSize; j += syncRequestBatchSize { - wg.Add(1) - go func(j int) { - defer wg.Done() - - nreq := syncRequestBatchSize - if j+nreq > batchSize { - nreq = batchSize - j - } - - failed := false - for offset := 0; !failed && offset < nreq; { - nextI := (i + 1) - batchSize + j + offset - nextHeader := headers[nextI] - - var requestErr error - var requestResult []*exchange.CompactedMessages - for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { - if retry > 0 { - log.Infof("fetching messages at %d (retry %d)", nextI, retry) - } else { - log.Infof("fetching messages at %d", nextI) - } - - result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) - if err != nil { - requestErr = multierror.Append(requestErr, err) - } else { - requestResult = result - } - } - - mx.Lock() - if requestResult == nil { - // we failed! - log.Errorf("error fetching messages at %d: %s", nextI, requestErr) - batchErr = multierror.Append(batchErr, requestErr) - failed = true - } else { - copy(bstout[j+offset:], requestResult) - offset += len(requestResult) - } - mx.Unlock() - } - }(j) - } - wg.Wait() + startOffset := i + 1 - batchSize + bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset) ss.SetStage(api.StageMessages) if batchErr != nil { @@ -1585,6 +1536,71 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS return nil } +func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) { + batchSize := len(headers) + batch := make([]*exchange.CompactedMessages, batchSize) + + var wg sync.WaitGroup + var mx sync.Mutex + var batchErr error + + start := build.Clock.Now() + + for j := 0; j < batchSize; j += syncRequestBatchSize { + wg.Add(1) + go func(j int) { + defer wg.Done() + + nreq := syncRequestBatchSize + if j+nreq > batchSize { + nreq = batchSize - j + } + + failed := false + for offset := 0; !failed && offset < nreq; { + nextI := j + offset + nextHeader := headers[nextI] + + var requestErr error + var requestResult []*exchange.CompactedMessages + for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ { + if retry > 0 { + log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry) + } else { + log.Infof("fetching messages at %d", startOffset+nextI) + } + + result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset)) + if err != nil { + requestErr = multierror.Append(requestErr, err) + } else { + requestResult = result + } + } + + mx.Lock() + if requestResult != nil { + copy(batch[j+offset:], requestResult) + offset += len(requestResult) + } else { + log.Errorf("error fetching messages at %d: %s", nextI, requestErr) + batchErr = multierror.Append(batchErr, requestErr) + failed = true + } + mx.Unlock() + } + }(j) + } + wg.Wait() + + if batchErr != nil { + dt := build.Clock.Since(start) + log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, dt) + } + + return batch, batchErr +} + func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error { for _, m := range bst.Bls { //log.Infof("putting BLS message: %s", m.Cid()) From d7948fcbcd687961f746d88ed55a2afc13024aeb Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Sep 2020 17:33:52 +0300 Subject: [PATCH 08/11] fix log; we want to log time when we succeed! --- chain/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync.go b/chain/sync.go index 95c2e2e8428..d09ba84defb 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1593,7 +1593,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet } wg.Wait() - if batchErr != nil { + if batchErr == nil { dt := build.Clock.Since(start) log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, dt) } From 2946561decdb7bd7b9056cc256c6e32294a13279 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Sep 2020 17:35:40 +0300 Subject: [PATCH 09/11] clean up return code --- chain/sync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index d09ba84defb..9c97144472a 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1593,11 +1593,12 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet } wg.Wait() - if batchErr == nil { - dt := build.Clock.Since(start) - log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, dt) + if batchErr != nil { + return nil, batchErr } + log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start)) + return batch, batchErr } From 6dfc40abc1c9152d3fb59061f2008b54b0b63872 Mon Sep 17 00:00:00 2001 From: vyzo Date: Thu, 17 Sep 2020 18:23:50 +0300 Subject: [PATCH 10/11] error is nil at end, so return batch, nil --- chain/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chain/sync.go b/chain/sync.go index 9c97144472a..03ae1cd4fd9 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1599,7 +1599,7 @@ func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start)) - return batch, batchErr + return batch, nil } func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error { From f135ec84682f072626e90b9b43f65ad535ddaa33 Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 21 Sep 2020 09:21:25 +0300 Subject: [PATCH 11/11] fix handling of end of sync --- chain/sync.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/chain/sync.go b/chain/sync.go index 03ae1cd4fd9..74dc5aa1aa3 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -1486,11 +1486,7 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS batchSize := concurrentSyncRequests * syncRequestBatchSize if i < batchSize { - if i == 0 { - batchSize = 1 - } else { - batchSize = i - } + batchSize = i + 1 } ss.SetStage(api.StageFetchingMessages)