Skip to content

Commit

Permalink
Merge pull request #1324 from bysomeone/feat-block-finalize
Browse files Browse the repository at this point in the history
Feature block finalize
  • Loading branch information
vipwzw authored May 31, 2024
2 parents ca0c2c6 + cf859e9 commit a85e11c
Show file tree
Hide file tree
Showing 67 changed files with 6,302 additions and 1,312 deletions.
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,3 @@ jobs:
run: go test ./... -covermode=atomic
env:
GOARCH: 386

259 changes: 259 additions & 0 deletions blockchain/blockfinalize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
package blockchain

import (
"encoding/hex"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"sync"
"sync/atomic"
"time"
)

var (
snowChoiceKey = []byte("blockchain-snowchoice")
)

type finalizer struct {
chain *BlockChain
choice types.SnowChoice
lock sync.RWMutex
healthNotify chan struct{}
resetRunning atomic.Bool
}

func (f *finalizer) Init(chain *BlockChain) {

f.healthNotify = make(chan struct{}, 1)
f.chain = chain
raw, err := chain.blockStore.db.Get(snowChoiceKey)

if err == nil {
err = types.Decode(raw, &f.choice)
if err != nil {
chainlog.Error("newFinalizer", "decode err", err)
panic(err)
}
chainlog.Info("newFinalizer", "height", f.choice.Height, "hash", hex.EncodeToString(f.choice.Hash))
go f.healthCheck()
} else if chain.client.GetConfig().GetModuleConfig().Consensus.Finalizer != "" {
f.choice.Height = chain.cfg.BlockFinalizeEnableHeight
chainlog.Info("newFinalizer", "enableHeight", f.choice.Height, "gapHeight", chain.cfg.BlockFinalizeGapHeight)
go f.waitFinalizeStartBlock(f.choice.Height)
}
go f.lazyStart(chain.cfg.BlockFinalizeGapHeight, MaxRollBlockNum)
}

// 基于最大区块回滚深度, 快速收敛, 主要针对同步节点, 减少历史数据共识流程
func (f *finalizer) lazyStart(gapHeight, maxRollbackNum int64) {

ticker := time.NewTicker(time.Minute * 2)
minPeerCount := 10
defer ticker.Stop()
for {
select {

case <-f.chain.quit:
return
case <-ticker.C:
finalized, _ := f.getLastFinalized()
peerNum := f.chain.GetPeerCount()
height := f.chain.GetBlockHeight() - gapHeight
// 连接节点过少||未达到使能高度, 等待连接及区块同步
if finalized >= height || peerNum < minPeerCount {
chainlog.Debug("lazyStart wait", "peerNum", peerNum,
"finalized", finalized, "height", height)
continue
}

maxPeerHeight := f.chain.GetPeerMaxBlkHeight()
// 已经最终化高度在回滚范围内, 无需加速
if finalized+maxRollbackNum > maxPeerHeight {
chainlog.Debug("lazyStart return", "peerNum", peerNum, "finalized", finalized,
"maxHeight", maxPeerHeight)
return
}

peers := f.chain.getActivePeersByHeight(height + maxRollbackNum)
chainlog.Debug("lazyStart peer", "peerNum", peerNum, "peersLen", len(peers),
"finalized", finalized, "height", height)
// 超过半数节点高度超过该高度, 说明当前链回滚最低高度大于height, 可设为已最终化高度
if len(peers) < peerNum/2 {
continue
}

detail, err := f.chain.GetBlock(height)
if err != nil {
chainlog.Error("lazyStart err", "height", height, "get block err", err)
continue
}
_ = f.reset(height, detail.GetBlock().Hash(f.chain.client.GetConfig()))
}
}

}

func (f *finalizer) healthCheck() {

ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
healthy := false
for {
select {

case <-f.chain.quit:
return
case <-f.healthNotify:
healthy = true
case <-ticker.C:
maxPeerHeight := f.chain.GetPeerMaxBlkHeight()
// 节点高度落后较多情况不处理, 等待同步
height := f.chain.GetBlockHeight()
if height < maxPeerHeight-128 || healthy {
chainlog.Debug("healthCheck not sync", "healthy", healthy, "height", height, "maxHeight", maxPeerHeight)
healthy = false
continue
}
finalized, hash := f.getLastFinalized()
chainlog.Debug("healthCheck timeout", "lastFinalize", finalized,
"hash", hex.EncodeToString(hash), "chainHeight", height)
if finalized >= height {
continue
}
// 重新设置高度, 哈希值
detail, err := f.chain.GetBlock(finalized)
if err != nil {
chainlog.Error("finalizer tiemout", "height", finalized, "get block err", err)
continue
}
_ = f.reset(finalized, detail.GetBlock().Hash(f.chain.client.GetConfig()))
}
}
}

const defaultFinalizeGapHeight = 128

func (f *finalizer) waitFinalizeStartBlock(beginHeight int64) {

waitHeight := f.chain.cfg.BlockFinalizeGapHeight
for f.chain.blockStore.Height() < beginHeight+waitHeight {
time.Sleep(time.Second * 5)
}

detail, err := f.chain.GetBlock(beginHeight)
if err != nil {
chainlog.Error("waitFinalizeStartBlock", "height", beginHeight, "waitHeight", waitHeight, "get block err", err)
panic(err)
}
go f.healthCheck()
_ = f.setFinalizedBlock(detail.GetBlock().Height, detail.GetBlock().Hash(f.chain.client.GetConfig()), false)

}

func (f *finalizer) snowmanPreferBlock(msg *queue.Message) {
//req := (msg.Data).(*types.ReqBytes)
return

}

func (f *finalizer) snowmanAcceptBlock(msg *queue.Message) {

req := (msg.Data).(*types.SnowChoice)
chainlog.Debug("snowmanAcceptBlock", "height", req.Height, "hash", hex.EncodeToString(req.Hash))

// 已经最终化区块不在当前最佳链中, 即可能在侧链上, 最终化记录不更新
if !f.chain.bestChain.HaveBlock(req.GetHash(), req.GetHeight()) {
chainHeight := f.chain.bestChain.Height()
chainlog.Debug("snowmanAcceptBlock not in bestChain", "height", req.Height,
"hash", hex.EncodeToString(req.GetHash()), "chainHeight", chainHeight)
if f.resetRunning.CompareAndSwap(false, true) {
go f.resetEngine(chainHeight, req, time.Second*10)
}
return
}

err := f.setFinalizedBlock(req.GetHeight(), req.GetHash(), true)
if err == nil {
f.healthNotify <- struct{}{}
}
}

const consensusTopic = "consensus"

func (f *finalizer) resetEngine(chainHeight int64, sc *types.SnowChoice, duration time.Duration) {

defer f.resetRunning.Store(false)
ticker := time.NewTicker(duration)
defer ticker.Stop()
for {

select {

case <-f.chain.quit:
return

case <-ticker.C:

currHeight := f.chain.bestChain.Height()
if f.chain.bestChain.HaveBlock(sc.GetHash(), sc.GetHeight()) {
chainlog.Debug("resetEngine accept", "chainHeight", chainHeight,
"currHeight", currHeight, "sc.height", sc.GetHeight(), "sc.hash", hex.EncodeToString(sc.GetHash()))
return
}
// 最终化区块不在主链上且主链高度正常增长, 重置最终化引擎, 尝试对该高度重新共识
if currHeight > chainHeight && currHeight > sc.GetHeight()+12 {
chainlog.Debug("resetEngine reject", "chainHeight", chainHeight,
"currHeight", currHeight, "sc.height", sc.GetHeight(), "sc.hash", hex.EncodeToString(sc.GetHash()))
_ = f.chain.client.Send(queue.NewMessage(types.EventSnowmanResetEngine, consensusTopic, types.EventForFinalizer, nil), true)
return
}
}
}
}

func (f *finalizer) reset(height int64, hash []byte) error {

chainlog.Debug("finalizer reset", "height", height, "hash", hex.EncodeToString(hash))
err := f.setFinalizedBlock(height, hash, false)
if err != nil {
chainlog.Error("finalizer reset", "setFinalizedBlock err", err)
return err
}
err = f.chain.client.Send(queue.NewMessage(types.EventSnowmanResetEngine, consensusTopic, types.EventForFinalizer, nil), true)
if err != nil {
chainlog.Error("finalizer reset", "send msg err", err)
}
return err
}

func (f *finalizer) setFinalizedBlock(height int64, hash []byte, mustInorder bool) error {

chainlog.Debug("setFinalizedBlock", "height", height, "hash", hex.EncodeToString(hash))
f.lock.Lock()
defer f.lock.Unlock()
if mustInorder && height <= f.choice.Height {
chainlog.Debug("setFinalizedBlock disorder", "height", height, "currHeight", f.choice.Height)
return types.ErrInvalidParam
}
f.choice.Height = height
f.choice.Hash = hash
err := f.chain.blockStore.db.Set(snowChoiceKey, types.Encode(&f.choice))
if err != nil {
chainlog.Error("setFinalizedBlock", "height", height, "hash", hex.EncodeToString(hash), "err", err)
return err
}
return nil
}

func (f *finalizer) getLastFinalized() (int64, []byte) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.choice.Height, f.choice.Hash
}

func (f *finalizer) snowmanLastChoice(msg *queue.Message) {

height, hash := f.getLastFinalized()
//chainlog.Debug("snowmanLastChoice", "height", height, "hash", hex.EncodeToString(hash))
msg.Reply(f.chain.client.NewMessage(msg.Topic,
types.EventSnowmanLastChoice, &types.SnowChoice{Height: height, Hash: hash}))
}
97 changes: 97 additions & 0 deletions blockchain/blockfinalize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package blockchain

import (
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/require"
"os"
"testing"
"time"
)

func newTestChain(t *testing.T) (*BlockChain, string) {

chain := InitEnv()
chain.client.GetConfig().GetModuleConfig().Consensus.Finalizer = "snowman"
dir, err := os.MkdirTemp("", "finalize")
require.Nil(t, err)
blockStoreDB := dbm.NewDB("blockchain", "leveldb", dir, 64)
chain.blockStore = NewBlockStore(chain, blockStoreDB, nil)
node := newPreGenBlockNode()
node.parent = nil
chain.bestChain = newChainView(node)
return chain, dir
}

func TestFinalizer(t *testing.T) {

chain, dir := newTestChain(t)
defer os.RemoveAll(dir)
defer chain.Close()
f := &finalizer{}
f.Init(chain)

hash := []byte("testhash")
choice := &types.SnowChoice{Height: 1, Hash: hash}
msg := queue.NewMessage(0, "test", 0, choice)

f.snowmanPreferBlock(msg)
f.snowmanAcceptBlock(msg)
height, hash1 := f.getLastFinalized()
require.Equal(t, 0, int(height))
require.Equal(t, 0, len(hash1))
node := &blockNode{
parent: chain.bestChain.Tip(),
height: 1,
hash: hash,
}
chain.bestChain.SetTip(node)
f.snowmanAcceptBlock(msg)
height, hash1 = f.getLastFinalized()
require.Equal(t, 1, int(height))
require.Equal(t, hash, hash1)

f.snowmanLastChoice(msg)
msg1, err := chain.client.Wait(msg)
require.Nil(t, err)
require.Equal(t, msg1.Data, choice)

}

func TestResetEngine(t *testing.T) {

chain, dir := newTestChain(t)
defer os.RemoveAll(dir)
defer chain.Close()
f := &finalizer{}
f.Init(chain)

hash := []byte("testhash")
choice := &types.SnowChoice{Height: 1, Hash: hash}
node := &blockNode{
parent: chain.bestChain.Tip(),
height: 14,
hash: hash,
}
chain.bestChain.SetTip(node)
chain.client.Sub(consensusTopic)
go f.resetEngine(1, choice, time.Second)
recvMsg := func() {
select {
case msg := <-chain.client.Recv():
require.Equal(t, "consensus", msg.Topic)
require.Equal(t, int64(types.EventSnowmanResetEngine), msg.ID)
require.Equal(t, int64(types.EventForFinalizer), msg.Ty)
case <-time.After(time.Second * 5):
t.Error("resetEngine timeout")
}
}
recvMsg()
f.reset(choice.Height, choice.Hash)
recvMsg()
height, hash1 := f.getLastFinalized()
require.Equal(t, choice.Height, height)
require.Equal(t, hash, hash1)

}
6 changes: 4 additions & 2 deletions blockchain/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package blockchain

import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -865,7 +866,7 @@ func (bs *BlockStore) GetTdByBlockHash(hash []byte) (*big.Int, error) {
blocktd, err := bs.db.Get(calcHashToTdKey(hash))
if blocktd == nil || err != nil {
if err != dbm.ErrNotFoundInDb {
storeLog.Error("GetTdByBlockHash ", "error", err)
storeLog.Error("GetTdByBlockHash ", "hash", hex.EncodeToString(hash), "err", err)
}
return nil, types.ErrHashNotExist
}
Expand Down Expand Up @@ -944,7 +945,8 @@ func (bs *BlockStore) dbMaybeStoreBlock(blockdetail *types.BlockDetail, sync boo
} else {
parenttd, err := bs.GetTdByBlockHash(parentHash)
if err != nil {
chainlog.Error("dbMaybeStoreBlock GetTdByBlockHash", "height", height, "parentHash", common.ToHex(parentHash))
chainlog.Error("dbMaybeStoreBlock GetTdByBlockHash",
"height", height, "parentHash", common.ToHex(parentHash), "err", err)
return err
}
blocktd = new(big.Int).Add(difficulty, parenttd)
Expand Down
Loading

0 comments on commit a85e11c

Please sign in to comment.