Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the performance of eth_getLogs #569

Closed
wants to merge 8 commits into from
68 changes: 43 additions & 25 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ type SimulatedBackend struct {
database ethdb.Database // In memory database to store our testing data
blockchain *core.BlockChain // Ethereum blockchain to handle the consensus

mu sync.Mutex
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on on request
mu sync.Mutex
pendingBlock *types.Block // Currently pending block that will be imported on request
pendingState *state.StateDB // Currently pending state that will be the active on request
pendingReceipts types.Receipts // Currently receipts for the pending block

events *filters.EventSystem // Event system for filtering log events live
events *filters.EventSystem // for filtering log events live
filterSystem *filters.FilterSystem // for filtering database logs

config *params.ChainConfig
}
Expand Down Expand Up @@ -94,9 +96,7 @@ func SimulateWalletAddressAndSignFn() (common.Address, func(account accounts.Acc

// XDC simulated backend for testing purpose.
func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfig *params.ChainConfig) *SimulatedBackend {
// database := ethdb.NewMemDatabase()
database := rawdb.NewMemoryDatabase()

genesis := core.Genesis{
GasLimit: gasLimit, // need this big, support initial smart contract
Config: chainConfig,
Expand Down Expand Up @@ -126,8 +126,12 @@ func NewXDCSimulatedBackend(alloc core.GenesisAlloc, gasLimit uint64, chainConfi
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
}

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

blockchain.Client = backend
backend.rollback()
return backend
Expand All @@ -146,8 +150,12 @@ func NewSimulatedBackend(alloc core.GenesisAlloc) *SimulatedBackend {
database: database,
blockchain: blockchain,
config: genesis.Config,
events: filters.NewEventSystem(new(event.TypeMux), &filterBackend{database, blockchain}, false),
}

filterBackend := &filterBackend{database, blockchain, backend}
backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{})
backend.events = filters.NewEventSystem(backend.filterSystem, false)

backend.rollback()
return backend
}
Expand Down Expand Up @@ -421,7 +429,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
var filter *filters.Filter
if query.BlockHash != nil {
// Block filter requested, construct a single-shot filter
filter = filters.NewBlockFilter(&filterBackend{b.database, b.blockchain}, *query.BlockHash, query.Addresses, query.Topics)
filter = b.filterSystem.NewBlockFilter(*query.BlockHash, query.Addresses, query.Topics)
} else {
// Initialize unset filter boundaried to run from genesis to chain head
from := int64(0)
Expand All @@ -433,7 +441,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query XDPoSChain.Filt
to = query.ToBlock.Int64()
}
// Construct the range filter
filter = filters.NewRangeFilter(&filterBackend{b.database, b.blockchain}, from, to, query.Addresses, query.Topics)
filter = b.filterSystem.NewRangeFilter(from, to, query.Addresses, query.Topics)
}
// Run the filter and return all the logs
logs, err := filter.Logs(ctx)
Expand Down Expand Up @@ -523,8 +531,9 @@ func (m callMsg) AccessList() types.AccessList { return m.CallMsg.AccessList }
// filterBackend implements filters.Backend to support filtering for logs without
// taking bloom-bits acceleration structures into account.
type filterBackend struct {
db ethdb.Database
bc *core.BlockChain
db ethdb.Database
bc *core.BlockChain
backend *SimulatedBackend
}

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }
Expand All @@ -545,35 +554,44 @@ func (fb *filterBackend) GetReceipts(ctx context.Context, hash common.Hash) (typ
return core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash)), nil
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
receipts := core.GetBlockReceipts(fb.db, hash, core.GetBlockNumber(fb.db, hash))
if receipts == nil {
return nil, nil
}
logs := make([][]*types.Log, len(receipts))
for i, receipt := range receipts {
logs[i] = receipt.Logs
}
func (fb *filterBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) {
return fb.backend.pendingBlock, fb.backend.pendingReceipts
}

func (fb *filterBackend) GetLogs(ctx context.Context, hash common.Hash, number uint64) ([][]*types.Log, error) {
logs := rawdb.ReadLogs(fb.db, hash, number)
return logs, nil
}

func (fb *filterBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
return nullSubscription()
}

func (fb *filterBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
return fb.bc.SubscribeChainEvent(ch)
}

func (fb *filterBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return fb.bc.SubscribeRemovedLogsEvent(ch)
}

func (fb *filterBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
return fb.bc.SubscribeLogsEvent(ch)
}

func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription {
return nullSubscription()
}

func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }

func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
panic("not supported")
}

func nullSubscription() event.Subscription {
return event.NewSubscription(func(quit <-chan struct{}) error {
<-quit
return nil
})
}
1 change: 1 addition & 0 deletions cmd/XDC/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ var (
//utils.CacheDatabaseFlag,
//utils.CacheGCFlag,
//utils.TrieCacheGenFlag,
utils.CacheLogSizeFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
Expand Down
24 changes: 24 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import (
"github.com/XinFinOrg/XDPoSChain/crypto"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
"github.com/XinFinOrg/XDPoSChain/eth/ethconfig"
"github.com/XinFinOrg/XDPoSChain/eth/filters"
"github.com/XinFinOrg/XDPoSChain/eth/gasprice"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/internal/ethapi"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/metrics/exp"
Expand All @@ -52,6 +54,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/p2p/nat"
"github.com/XinFinOrg/XDPoSChain/p2p/netutil"
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/rpc"
whisper "github.com/XinFinOrg/XDPoSChain/whisper/whisperv6"
"gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -313,6 +316,11 @@ var (
Usage: "Percentage of cache memory allowance to use for trie pruning",
Value: 25,
}
CacheLogSizeFlag = &cli.IntFlag{
Name: "cache.blocklogs",
Usage: "Size (in number of blocks) of the log cache for filtering",
Value: ethconfig.Defaults.FilterLogCacheSize,
}
// Miner settings
StakingEnabledFlag = cli.BoolFlag{
Name: "mine",
Expand Down Expand Up @@ -1208,6 +1216,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(GasPriceFlag.Name) {
cfg.GasPrice = GlobalBig(ctx, GasPriceFlag.Name)
}
if ctx.IsSet(CacheLogSizeFlag.Name) {
cfg.FilterLogCacheSize = ctx.Int(CacheLogSizeFlag.Name)
}
if ctx.GlobalIsSet(VMEnableDebugFlag.Name) {
// TODO(fjl): force-enable this in --dev mode
cfg.EnablePreimageRecording = ctx.GlobalBool(VMEnableDebugFlag.Name)
Expand Down Expand Up @@ -1407,6 +1418,19 @@ func WalkMatch(root, pattern string) ([]string, error) {
return matches, nil
}

// RegisterFilterAPI adds the eth log filtering RPC API to the node.
func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconfig.Config) *filters.FilterSystem {
isLightClient := ethcfg.SyncMode == downloader.LightSync
filterSystem := filters.NewFilterSystem(backend, filters.Config{
LogCacheSize: ethcfg.FilterLogCacheSize,
})
stack.RegisterAPIs([]rpc.API{{
Namespace: "eth",
Service: filters.NewFilterAPI(filterSystem, isLightClient),
}})
return filterSystem
}

func SetupMetrics(ctx *cli.Context) {
if metrics.Enabled {
log.Info("Enabling metrics collection")
Expand Down
3 changes: 1 addition & 2 deletions cmd/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ func RegisterShhService(stack *node.Node, cfg *whisper.Config) {
}
}

// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to
// th egiven node.
// RegisterEthStatsService configures the Ethereum Stats daemon and adds it to the node.
func RegisterEthStatsService(stack *node.Node, url string) {
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
// Retrieve both eth and les services
Expand Down
9 changes: 9 additions & 0 deletions core/chain_makers.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ func (b *BlockGen) AddTxWithChain(bc *BlockChain, tx *types.Transaction) {
}
}

// AddUncheckedTx forcefully adds a transaction to the block without any
// validation.
//
// AddUncheckedTx will cause consensus failures when used during real
// chain processing. This is best used in conjunction with raw block insertion.
func (b *BlockGen) AddUncheckedTx(tx *types.Transaction) {
b.txs = append(b.txs, tx)
}

// Number returns the block number of the block being generated.
func (b *BlockGen) Number() *big.Int {
return new(big.Int).Set(b.header.Number)
Expand Down
5 changes: 0 additions & 5 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ type OrderTxPreEvent struct{ Tx *types.OrderTransaction }
// LendingTxPreEvent is posted when a order transaction enters the order transaction pool.
type LendingTxPreEvent struct{ Tx *types.LendingTransaction }

// PendingLogsEvent is posted pre mining and notifies of pending logs.
type PendingLogsEvent struct {
Logs []*types.Log
}

// PendingStateEvent is posted pre mining and notifies of pending state changes.
type PendingStateEvent struct{}

Expand Down
Loading