Skip to content

Commit

Permalink
add index recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
unclezoro committed Jul 30, 2019
1 parent 0b63fbb commit b291b1b
Show file tree
Hide file tree
Showing 12 changed files with 357 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
### FEATURES:

### IMPROVEMENTS:
- [index] [\#106](https://github.com/binance-chain/bnc-tendermint/pull/106) index service recover from data lost

### BUG FIXES:
79 changes: 48 additions & 31 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
grpccore "github.com/tendermint/tendermint/rpc/grpc"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/snapshot"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/blockindex"
bkv "github.com/tendermint/tendermint/state/blockindex/kv"
nullblk "github.com/tendermint/tendermint/state/blockindex/null"
Expand Down Expand Up @@ -160,21 +160,22 @@ type Node struct {
isListening bool

// services
eventBus *types.EventBus // pub/sub for services
stateDB dbm.DB
blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *cs.ConsensusState // latest consensus state
consensusReactor *cs.ConsensusReactor // for participating in the consensus
evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
blockIndexer blockindex.BlockIndexer
indexerService *txindex.IndexerService
eventBus *types.EventBus // pub/sub for services
stateDB dbm.DB
blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions
consensusState *cs.ConsensusState // latest consensus state
consensusReactor *cs.ConsensusReactor // for participating in the consensus
evidencePool *evidence.EvidencePool // tracking evidence
proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
blockIndexer blockindex.BlockIndexer
indexerService *txindex.IndexerService
blockIndexService *blockindex.IndexerService
prometheusSrv *http.Server
indexHub *sm.IndexHub
prometheusSrv *http.Server
}

// NewNode returns a new, ready to go, Tendermint Node.
Expand Down Expand Up @@ -239,7 +240,7 @@ func NewNode(config *cfg.Config,

// Transaction indexing
var txIndexer txindex.TxIndexer
var txDB dbm.DB // TODO: remove by refactor defaultdbprovider to cache the created db instaces
var txDB dbm.DB // TODO: remove by refactor defaultdbprovider to cache the created db instaces
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
Expand Down Expand Up @@ -286,6 +287,21 @@ func NewNode(config *cfg.Config,
return nil, err
}

csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)

indexHubStore, err := dbProvider(&DBContext{"indexer_hub", config})
if err != nil {
return nil, err
}
indexHub := sm.NewIndexHub(state.LastBlockHeight, indexHubStore, stateDB, blockStore, eventBus, sm.IndexHubWithMetrics(smMetrics))
indexHub.RegisterIndexSvc(blockIndexerService)
indexHub.RegisterIndexSvc(txIndexerService)
indexHub.SetLogger(logger.With("module", "indexer_hub"))
err = indexHub.Start()
if err != nil {
return nil, err
}

// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
Expand Down Expand Up @@ -346,8 +362,6 @@ func NewNode(config *cfg.Config,
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
}

csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)

// Make MempoolReactor
mempool := mempl.NewMempool(
config.Mempool,
Expand Down Expand Up @@ -583,19 +597,20 @@ func NewNode(config *cfg.Config,
nodeInfo: nodeInfo,
nodeKey: nodeKey,

stateDB: stateDB,
blockStore: blockStore,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
consensusState: consensusState,
consensusReactor: consensusReactor,
evidencePool: evidencePool,
proxyApp: proxyApp,
txIndexer: txIndexer,
blockIndexer: blockIndexer,
indexerService: txIndexerService,
stateDB: stateDB,
blockStore: blockStore,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
consensusState: consensusState,
consensusReactor: consensusReactor,
evidencePool: evidencePool,
proxyApp: proxyApp,
txIndexer: txIndexer,
blockIndexer: blockIndexer,
indexerService: txIndexerService,
blockIndexService: blockIndexerService,
eventBus: eventBus,
indexHub: indexHub,
eventBus: eventBus,
}
node.BaseService = *cmn.NewBaseService(logger, "Node", node)
return node, nil
Expand Down Expand Up @@ -666,6 +681,7 @@ func (n *Node) OnStop() {
n.eventBus.Stop()
n.indexerService.Stop()
n.blockIndexService.Stop()
n.indexHub.Stop()

// now stop the reactors
// TODO: gracefully disconnect from peers.
Expand Down Expand Up @@ -719,6 +735,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetProxyAppQuery(n.proxyApp.Query())
rpccore.SetTxIndexer(n.txIndexer)
rpccore.SetBlockIndexer(n.blockIndexer)
rpccore.SetIndexHub(n.indexHub)
rpccore.SetConsensusReactor(n.consensusReactor)
rpccore.SetEventBus(n.eventBus)
rpccore.SetLogger(n.Logger.With("module", "rpc"))
Expand All @@ -738,7 +755,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
// we may expose the rpc over both a unix and tcp socket
listeners := make([]net.Listener, len(listenAddrs))
var wsWorkerPool *gopool.Pool
if n.config.RPC.WebsocketPoolMaxSize > 1{
if n.config.RPC.WebsocketPoolMaxSize > 1 {
wsWorkerPool = gopool.NewPool(n.config.RPC.WebsocketPoolMaxSize, n.config.RPC.WebsocketPoolQueueSize, n.config.RPC.WebsocketPoolSpawnSize)
wsWorkerPool.SetLogger(n.Logger.With("module", "routine-pool"))
}
Expand Down
5 changes: 5 additions & 0 deletions rpc/core/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (
consensusReactor *consensus.ConsensusReactor
eventBus *types.EventBus // thread safe
mempool *mempl.Mempool
indexerHub *sm.IndexHub

logger log.Logger

Expand Down Expand Up @@ -133,6 +134,10 @@ func SetBlockIndexer(indexer blockindex.BlockIndexer) {
blockIndexer = indexer
}

func SetIndexHub(hub *sm.IndexHub) {
indexerHub = hub
}

func SetConsensusReactor(conR *consensus.ConsensusReactor) {
consensusReactor = conR
}
Expand Down
4 changes: 3 additions & 1 deletion rpc/core/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ import (
// "latest_app_hash": "0000000000000000",
// "latest_block_height": "18",
// "latest_block_time": "2018-09-17T11:42:19.149920551Z",
// "catching_up": false
// "catching_up": false,
// "index_height": "18"
// },
// "validator_info": {
// "address": "D9F56456D7C5793815D0E9AF07C3A355D0FC64FD",
Expand Down Expand Up @@ -106,6 +107,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
LatestBlockHeight: latestHeight,
LatestBlockTime: latestBlockTime,
CatchingUp: consensusReactor.FastSync(),
IndexHeight: indexerHub.GetHeight(),
},
ValidatorInfo: ctypes.ValidatorInfo{
Address: pubKey.Address(),
Expand Down
1 change: 1 addition & 0 deletions rpc/core/types/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type SyncInfo struct {
LatestBlockHeight int64 `json:"latest_block_height"`
LatestBlockTime time.Time `json:"latest_block_time"`
CatchingUp bool `json:"catching_up"`
IndexHeight int64 `json:"index_height"`
}

// Info about the node's validator
Expand Down
9 changes: 9 additions & 0 deletions state/blockindex/indexer_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type IndexerService struct {

idr BlockIndexer
eventBus *types.EventBus

onIndex func(int64)
}

// NewIndexerService returns a new service instance.
Expand All @@ -28,6 +30,10 @@ func NewIndexerService(idr BlockIndexer, eventBus *types.EventBus) *IndexerServi
return is
}

func (is *IndexerService) SetOnIndex(callback func(int64)) {
is.onIndex = callback
}

// OnStart implements cmn.Service by subscribing for blocks and indexing them by hash.
func (is *IndexerService) OnStart() error {
blockHeadersSub, err := is.eventBus.SubscribeUnbuffered(context.Background(), subscriber, types.EventQueryNewBlockHeader)
Expand All @@ -45,6 +51,9 @@ func (is *IndexerService) OnStart() error {
} else {
is.Logger.Info("Indexed block", "height", header.Height, "hash", header.LastBlockID.Hash)
}
if is.onIndex != nil {
is.onIndex(header.Height)
}
}
}()
return nil
Expand Down
14 changes: 7 additions & 7 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool Mempool, evpool EvidencePool, withAppState bool, options ...BlockExecutorOption) *BlockExecutor {
res := &BlockExecutor{
db: db,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
db: db,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
metrics: NopMetrics(),
withAppState: withAppState,
}

Expand Down
Loading

0 comments on commit b291b1b

Please sign in to comment.