Skip to content

Commit

Permalink
111
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Sep 19, 2024
1 parent 3ca2602 commit 5a12b94
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 21 deletions.
18 changes: 16 additions & 2 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)

Expand All @@ -28,6 +29,13 @@ const (
headCacheSize = 8 // maximum number of recent filter maps cached in memory
)

type blockchain interface {
CurrentBlock() *types.Header
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
GetHeader(hash common.Hash, number uint64) *types.Header
GetCanonicalHash(number uint64) common.Hash
}

// FilterMaps is the in-memory representation of the log index structure that is
// responsible for building and updating the index according to the canonical
// chain.
Expand All @@ -40,7 +48,12 @@ type FilterMaps struct {
closeCh chan chan struct{}

filterMapsRange
chain *core.BlockChain
chain blockchain

chainHeadLock sync.Mutex
chainHeadCh chan *types.Header
chainHead *types.Header
chainHeadCount uint64

// filterMapCache caches certain filter maps (headCacheSize most recent maps
// and one tail map) that are expected to be frequently accessed and modified
Expand Down Expand Up @@ -86,7 +99,7 @@ type filterMapsRange struct {

// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
// the structure in sync with the given blockchain.
func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps {
func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
rs, err := rawdb.ReadFilterMapsRange(db)
if err != nil {
log.Error("Error reading log index range", "error", err)
Expand All @@ -104,6 +117,7 @@ func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps {
headBlockHash: rs.HeadBlockHash,
tailParentHash: rs.TailParentHash,
},
chainHeadCh: make(chan *types.Header, 10),
filterMapCache: make(map[uint32]*filterMap),
blockPtrCache: lru.NewCache[uint32, uint64](1000),
lvPointerCache: lru.NewCache[uint64, uint64](1000),
Expand Down
68 changes: 49 additions & 19 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,67 @@ const (
cachedRevertPoints = 64 // revert points for most recent blocks in memory
)

// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
headEventCh := make(chan core.ChainHeadEvent)
sub := f.chain.SubscribeChainHeadEvent(headEventCh)
defer sub.Unsubscribe()
func (f *FilterMaps) UpdateHead() bool {
f.chainHeadLock.Lock()
defer f.chainHeadLock.Unlock()

head := f.chain.CurrentBlock()
if head == nil {
return false
}
if f.chainHead == nil || head.Hash() != f.chainHead.Hash() {
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case ch := <-f.closeCh:
close(ch)
return
case f.chainHeadCh <- head:
f.chainHead = head
f.chainHeadCount++
return true
default:
}
}
fmr := f.getRange()
return false
}

var stop bool
// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
headEventCh := make(chan core.ChainHeadEvent)
sub := f.chain.SubscribeChainHeadEvent(headEventCh)
defer sub.Unsubscribe()

var (
head *types.Header
stop bool
)
wait := func() {
if stop {
return
}
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case <-headEventCh:
if f.UpdateHead() {
head = <-f.chainHeadCh
}
case head = <-f.chainHeadCh:
case <-time.After(time.Second * 20):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
if f.UpdateHead() {
head = <-f.chainHeadCh
}
case ch := <-f.closeCh:
close(ch)
stop = true
}
}

f.UpdateHead()
for head == nil {
wait()
if stop {
return
}
}
fmr := f.getRange()

for !stop {
if !fmr.initialized {
f.tryInit(head)
Expand All @@ -77,14 +102,19 @@ func (f *FilterMaps) updateLoop() {
f.tryExtendTail(func() bool {
// return true if tail processing needs to be stopped
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case <-headEventCh:
if f.UpdateHead() {
head = <-f.chainHeadCh
}
case head = <-f.chainHeadCh:
case ch := <-f.closeCh:
close(ch)
stop = true
return true
default:
head = f.chain.CurrentBlock()
if f.UpdateHead() {
head = <-f.chainHeadCh
}
}
// stop if there is a new chain head (always prioritize head updates)
return fmr.headBlockHash != head.Hash()
Expand Down

0 comments on commit 5a12b94

Please sign in to comment.