Skip to content

Commit

Permalink
Merge pull request #3887 from filecoin-project/feat/parallel-sync
Browse files Browse the repository at this point in the history
Parallel fetch for chain sync
  • Loading branch information
magik6k committed Sep 23, 2020
2 parents b28b8b9 + f135ec8 commit a2278e2
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 55 deletions.
7 changes: 7 additions & 0 deletions chain/exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"time"

"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -357,6 +358,12 @@ func (c *client) sendRequestToPeer(ctx context.Context, peer peer.ID, req *Reque
return nil, xerrors.Errorf("failed to open stream to peer: %w", err)
}

defer func() {
// Note: this will become just stream.Close once we've completed the go-libp2p migration to
// go-libp2p-core 0.7.0
go helpers.FullClose(stream) //nolint:errcheck
}()

// Write request.
_ = stream.SetWriteDeadline(time.Now().Add(WriteReqDeadline))
if err := cborutil.WriteCborRPC(stream, req); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion chain/exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
WriteReqDeadline = 5 * time.Second
ReadResDeadline = WriteReqDeadline
ReadResMinSpeed = 50 << 10
ShufflePeersPrefix = 5
ShufflePeersPrefix = 16
WriteResDeadline = 60 * time.Second
)

Expand Down
5 changes: 4 additions & 1 deletion chain/exchange/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/helpers"
inet "github.com/libp2p/go-libp2p-core/network"
)

Expand All @@ -39,7 +40,9 @@ func (s *server) HandleStream(stream inet.Stream) {
ctx, span := trace.StartSpan(context.Background(), "chainxchg.HandleStream")
defer span.End()

defer stream.Close() //nolint:errcheck
// Note: this will become just stream.Close once we've completed the go-libp2p migration to
// go-libp2p-core 0.7.0
defer helpers.FullClose(stream) //nolint:errcheck

var req Request
if err := cborutil.ReadCborRPC(bufio.NewReader(stream), &req); err != nil {
Expand Down
129 changes: 76 additions & 53 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -62,20 +61,12 @@ var (
// where the Syncer publishes candidate chain heads to be synced.
LocalIncoming = "incoming"

log = logging.Logger("chain")
defaultMessageFetchWindowSize = 200
)
log = logging.Logger("chain")

func init() {
if s := os.Getenv("LOTUS_BSYNC_MSG_WINDOW"); s != "" {
val, err := strconv.Atoi(s)
if err != nil {
log.Errorf("failed to parse LOTUS_BSYNC_MSG_WINDOW: %s", err)
return
}
defaultMessageFetchWindowSize = val
}
}
concurrentSyncRequests = exchange.ShufflePeersPrefix
syncRequestBatchSize = 8
syncRequestRetries = 5
)

// Syncer is in charge of running the chain synchronization logic. As such, it
// is tasked with these functions, amongst others:
Expand Down Expand Up @@ -131,8 +122,6 @@ type Syncer struct {

verifier ffiwrapper.Verifier

windowSize int

tickerCtxCancel context.CancelFunc

checkptLk sync.Mutex
Expand Down Expand Up @@ -174,7 +163,6 @@ func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.C
receiptTracker: newBlockReceiptTracker(),
connmgr: connmgr,
verifier: verifier,
windowSize: defaultMessageFetchWindowSize,

incoming: pubsub.New(50),
}
Expand Down Expand Up @@ -1481,8 +1469,6 @@ func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipS

span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))

windowSize := syncer.windowSize
mainLoop:
for i := len(headers) - 1; i >= 0; {
fts, err := syncer.store.TryFillTipSet(headers[i])
if err != nil {
Expand All @@ -1496,34 +1482,19 @@ mainLoop:
continue
}

batchSize := windowSize
batchSize := concurrentSyncRequests * syncRequestBatchSize
if i < batchSize {
batchSize = i
batchSize = i + 1
}

nextI := (i + 1) - batchSize // want to fetch batchSize values, 'i' points to last one we want to fetch, so its 'inclusive' of our request, thus we need to add one to our request start index

ss.SetStage(api.StageFetchingMessages)
var bstout []*exchange.CompactedMessages
for len(bstout) < batchSize {
next := headers[nextI]

nreq := batchSize - len(bstout)
bstips, err := syncer.Exchange.GetChainMessages(ctx, next, uint64(nreq))
if err != nil {
// TODO check errors for temporary nature
if windowSize > 1 {
windowSize /= 2
log.Infof("error fetching messages: %s; reducing window size to %d and trying again", err, windowSize)
continue mainLoop
}
return xerrors.Errorf("message processing failed: %w", err)
}
startOffset := i + 1 - batchSize
bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset)
ss.SetStage(api.StageMessages)

bstout = append(bstout, bstips...)
nextI += len(bstips)
if batchErr != nil {
return xerrors.Errorf("failed to fetch messages: %w", err)
}
ss.SetStage(api.StageMessages)

for bsi := 0; bsi < len(bstout); bsi++ {
// temp storage so we don't persist data we dont want to
Expand Down Expand Up @@ -1553,24 +1524,76 @@ mainLoop:
}
}

if i >= windowSize {
newWindowSize := windowSize + 10
if newWindowSize > int(exchange.MaxRequestLength) {
newWindowSize = int(exchange.MaxRequestLength)
i -= batchSize
}

return nil
}

func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) {
batchSize := len(headers)
batch := make([]*exchange.CompactedMessages, batchSize)

var wg sync.WaitGroup
var mx sync.Mutex
var batchErr error

start := build.Clock.Now()

for j := 0; j < batchSize; j += syncRequestBatchSize {
wg.Add(1)
go func(j int) {
defer wg.Done()

nreq := syncRequestBatchSize
if j+nreq > batchSize {
nreq = batchSize - j
}
if newWindowSize > windowSize {
windowSize = newWindowSize
log.Infof("successfully fetched %d messages; increasing window size to %d", len(bstout), windowSize)

failed := false
for offset := 0; !failed && offset < nreq; {
nextI := j + offset
nextHeader := headers[nextI]

var requestErr error
var requestResult []*exchange.CompactedMessages
for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ {
if retry > 0 {
log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry)
} else {
log.Infof("fetching messages at %d", startOffset+nextI)
}

result, err := syncer.Exchange.GetChainMessages(ctx, nextHeader, uint64(nreq-offset))
if err != nil {
requestErr = multierror.Append(requestErr, err)
} else {
requestResult = result
}
}

mx.Lock()
if requestResult != nil {
copy(batch[j+offset:], requestResult)
offset += len(requestResult)
} else {
log.Errorf("error fetching messages at %d: %s", nextI, requestErr)
batchErr = multierror.Append(batchErr, requestErr)
failed = true
}
mx.Unlock()
}
}
}(j)
}
wg.Wait()

i -= batchSize
if batchErr != nil {
return nil, batchErr
}

// remember our window size
syncer.windowSize = windowSize
log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start))

return nil
return batch, nil
}

func persistMessages(bs bstore.Blockstore, bst *exchange.CompactedMessages) error {
Expand Down

0 comments on commit a2278e2

Please sign in to comment.