Skip to content

Commit

Permalink
fix(sync): fix syncing issue on prune mode (pactus-project#1415)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Jul 13, 2024
1 parent 2e2fd0a commit 1f7d2c3
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 47 deletions.
18 changes: 10 additions & 8 deletions sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ type Config struct {

// Private configs
MaxSessions int `toml:"-"`
LatestBlockInterval uint32 `toml:"-"`
BlockPerSession uint32 `toml:"-"`
BlockPerMessage uint32 `toml:"-"`
PruneWindow uint32 `toml:"-"`
LatestSupportingVer version.Version `toml:"-"`
Services service.Services `toml:"-"`
}

func DefaultConfig() *Config {
return &Config{
SessionTimeout: time.Second * 10,
Services: service.New(service.PrunedNode),
BlockPerMessage: 60,
MaxSessions: 8,
LatestBlockInterval: 10 * 8640, // 10 days, same as default retention blocks in prune node
Firewall: firewall.DefaultConfig(),
SessionTimeout: time.Second * 10,
Services: service.New(service.PrunedNode),
MaxSessions: 8,
BlockPerSession: 720,
BlockPerMessage: 60,
PruneWindow: 86_400, // Default retention blocks in prune mode
Firewall: firewall.DefaultConfig(),
LatestSupportingVer: version.Version{
Major: 1,
Minor: 1,
Expand All @@ -45,5 +47,5 @@ func (conf *Config) BasicCheck() error {

func (conf *Config) CacheSize() int {
return util.LogScale(
int(conf.BlockPerMessage * conf.LatestBlockInterval))
int(conf.BlockPerMessage * conf.BlockPerSession))
}
27 changes: 9 additions & 18 deletions sync/handler_blocks_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, pid peer.ID
msg := m.(*message.BlocksRequestMessage)
handler.logger.Trace("parsing BlocksRequest message", "msg", msg)

status := handler.peerSet.GetPeerStatus(pid)
if status.IsUnknown() {
p := handler.peerSet.GetPeer(pid)
if p == nil {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("unknown peer (%s)", pid.String()), msg.SessionID, 0, nil, nil)

Expand All @@ -33,39 +33,30 @@ func (handler *blocksRequestHandler) ParseMessage(m message.Message, pid peer.ID
return
}

if !status.IsKnown() {
if !p.Status.IsKnown() {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("not handshaked (%s)", status.String()), msg.SessionID, 0, nil, nil)
fmt.Sprintf("not handshaked (%s)", p.Status.String()), msg.SessionID, 0, nil, nil)

handler.respond(response, pid)

return
}

ourHeight := handler.state.LastBlockHeight()
if !handler.config.Services.IsFullNode() {
if ourHeight > handler.config.LatestBlockInterval && msg.From < ourHeight-handler.config.LatestBlockInterval {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("the request height is not acceptable: %v", msg.From), msg.SessionID, 0, nil, nil)

handler.respond(response, pid)

return
}
}

if msg.From > ourHeight {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("don't have requested blocks: %v", msg.From), msg.SessionID, 0, nil, nil)
fmt.Sprintf("requested blocks from %v exceed current height %v",
msg.From, ourHeight), msg.SessionID, 0, nil, nil)

handler.respond(response, pid)

return
}

if msg.Count > handler.config.LatestBlockInterval {
if msg.Count > handler.config.BlockPerSession {
response := message.NewBlocksResponseMessage(message.ResponseCodeRejected,
fmt.Sprintf("too many blocks requested: %v-%v", msg.From, msg.Count), msg.SessionID, 0, nil, nil)
fmt.Sprintf("requested block range %v-%v exceeds the allowed %v blocks per session",
msg.From, msg.To(), handler.config.BlockPerSession), msg.SessionID, 0, nil, nil)

handler.respond(response, pid)

Expand Down
20 changes: 5 additions & 15 deletions sync/handler_blocks_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,20 @@ func TestBlocksRequestMessages(t *testing.T) {
bdl := td.shouldPublishMessageWithThisType(t, message.TypeBlocksResponse)
res := bdl.Message.(*message.BlocksResponseMessage)
assert.Equal(t, message.ResponseCodeRejected, res.ResponseCode)
assert.Contains(t, res.Reason, "don't have requested block")
assert.Contains(t, res.Reason, "requested blocks from 32 exceed current height 31")
})

t.Run("Reject requests not within `LatestBlockInterval`", func(t *testing.T) {
msg := message.NewBlocksRequestMessage(sid, curHeight-config.LatestBlockInterval-1, 1)
t.Run("Request blocks more than `BlockPerSession`", func(t *testing.T) {
msg := message.NewBlocksRequestMessage(sid, 10, config.BlockPerSession+1)
td.receivingNewMessage(td.sync, msg, pid)

bdl := td.shouldPublishMessageWithThisType(t, message.TypeBlocksResponse)
res := bdl.Message.(*message.BlocksResponseMessage)
assert.Equal(t, message.ResponseCodeRejected, res.ResponseCode)
assert.Contains(t, res.Reason, "the request height is not acceptable")
assert.Contains(t, res.Reason, "requested block range 10-33 exceeds the allowed 23 blocks per session")
})

t.Run("Request blocks more than `LatestBlockInterval`", func(t *testing.T) {
msg := message.NewBlocksRequestMessage(sid, 10, config.LatestBlockInterval+1)
td.receivingNewMessage(td.sync, msg, pid)

bdl := td.shouldPublishMessageWithThisType(t, message.TypeBlocksResponse)
res := bdl.Message.(*message.BlocksResponseMessage)
assert.Equal(t, message.ResponseCodeRejected, res.ResponseCode)
assert.Contains(t, res.Reason, "too many blocks requested")
})

t.Run("Accept request within `LatestBlockInterval`", func(t *testing.T) {
t.Run("Accept request within `BlockPerSession`", func(t *testing.T) {
t.Run("Peer needs more block", func(t *testing.T) {
msg := message.NewBlocksRequestMessage(sid, curHeight-config.BlockPerMessage, config.BlockPerMessage)
td.receivingNewMessage(td.sync, msg, pid)
Expand Down
2 changes: 1 addition & 1 deletion sync/handler_blocks_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func TestSyncing(t *testing.T) {

// Perform block syncing
assert.Equal(t, uint32(11), td.syncAlice.config.BlockPerMessage)
assert.Equal(t, uint32(23), td.syncAlice.config.LatestBlockInterval)
assert.Equal(t, uint32(23), td.syncAlice.config.BlockPerSession)

shouldNotPublishMessageWithThisType(t, td.networkBob, message.TypeBlocksRequest)
shouldPublishBlockRequest(t, td.networkAlice, 1)
Expand Down
6 changes: 4 additions & 2 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,9 @@ func (sync *synchronizer) updateBlockchain() {

sync.logger.Info("start syncing with the network",
"numOfBlocks", numOfBlocks, "height", downloadHeight)
if numOfBlocks > sync.config.LatestBlockInterval {

if numOfBlocks > sync.config.PruneWindow {
// Don't have blocks for mre than 10 days
sync.downloadBlocks(downloadHeight, true)
} else {
sync.downloadBlocks(downloadHeight, false)
Expand All @@ -445,7 +447,7 @@ func (sync *synchronizer) downloadBlocks(from uint32, onlyFullNodes bool) {
sync.logger.Debug("downloading blocks", "from", from)

for i := sync.peerSet.NumberOfSessions(); i < sync.config.MaxSessions; i++ {
count := sync.config.LatestBlockInterval
count := sync.config.BlockPerSession
sent := sync.sendBlockRequestToRandomPeer(from, count, onlyFullNodes)
if !sent {
return
Expand Down
5 changes: 3 additions & 2 deletions sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func testConfig() *Config {
Moniker: "test",
SessionTimeout: time.Second * 1,
BlockPerMessage: 11,
MaxSessions: 8,
LatestBlockInterval: 23,
MaxSessions: 4,
BlockPerSession: 23,
PruneWindow: 13,
Firewall: firewall.DefaultConfig(),
LatestSupportingVer: DefaultConfig().LatestSupportingVer,
Services: service.New(service.FullNode, service.PrunedNode),
Expand Down
2 changes: 1 addition & 1 deletion tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestMain(m *testing.M) {
tConfigs[i].Logger.Levels["_network"] = "info"
tConfigs[i].Logger.Levels["_pool"] = "info"
tConfigs[i].Sync.Firewall.BannedNets = make([]string, 0)
tConfigs[i].Sync.LatestBlockInterval = 10
tConfigs[i].Sync.BlockPerSession = 10
tConfigs[i].Network.EnableMdns = true
tConfigs[i].Network.EnableRelay = false
tConfigs[i].Network.DefaultBootstrapAddrStrings = []string{}
Expand Down

0 comments on commit 1f7d2c3

Please sign in to comment.