Skip to content

Commit

Permalink
core/filtermaps: guaranteed valid range
Browse files Browse the repository at this point in the history
  • Loading branch information
zsfelfoldi committed Sep 22, 2024
1 parent 32c82f0 commit c6c805f
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 68 deletions.
60 changes: 15 additions & 45 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package filtermaps

import (
"context"
"crypto/sha256"
"encoding/binary"
"errors"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)

Expand All @@ -28,6 +28,13 @@ const (
headCacheSize = 8 // maximum number of recent filter maps cached in memory
)

type blockchain interface {
CurrentBlock() *types.Header
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
GetHeader(hash common.Hash, number uint64) *types.Header
GetCanonicalHash(number uint64) common.Hash
}

// FilterMaps is the in-memory representation of the log index structure that is
// responsible for building and updating the index according to the canonical
// chain.
Expand All @@ -38,10 +45,10 @@ type FilterMaps struct {
lock sync.RWMutex
db ethdb.Database
closeCh chan chan struct{}

filterMapsRange
chain *core.BlockChain

chain blockchain
matcherSyncCh chan struct{}
matchers map[*FilterMapsMatcherBackend]struct{}
// filterMapCache caches certain filter maps (headCacheSize most recent maps
// and one tail map) that are expected to be frequently accessed and modified
// while updating the structure. Note that the set of cached maps depends
Expand Down Expand Up @@ -86,7 +93,7 @@ type filterMapsRange struct {

// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
// the structure in sync with the given blockchain.
func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps {
func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
rs, err := rawdb.ReadFilterMapsRange(db)
if err != nil {
log.Error("Error reading log index range", "error", err)
Expand All @@ -104,6 +111,8 @@ func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps {
headBlockHash: rs.HeadBlockHash,
tailParentHash: rs.TailParentHash,
},
matcherSyncCh: make(chan struct{},1),
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
filterMapCache: make(map[uint32]*filterMap),
blockPtrCache: lru.NewCache[uint32, uint64](1000),
lvPointerCache: lru.NewCache[uint64, uint64](1000),
Expand All @@ -129,46 +138,6 @@ func (f *FilterMaps) Close() {
<-ch
}

// FilterMapsMatcherBackend implements MatcherBackend.
type FilterMapsMatcherBackend FilterMaps

// GetFilterMapRow returns the given row of the given map. If the row is empty
// then a non-nil zero length row is returned.
// Note that the returned slices should not be modified, they should be copied
// on write.
// GetFilterMapRow implements MatcherBackend.
func (ff *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) {
f := (*FilterMaps)(ff)
return f.getFilterMapRow(mapIndex, rowIndex)
}

// GetBlockLvPointer returns the starting log value index where the log values
// generated by the given block are located. If blockNumber is beyond the current
// head then the first unoccupied log value index is returned.
// GetBlockLvPointer implements MatcherBackend.
func (ff *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) {
f := (*FilterMaps)(ff)
f.lock.RLock()
defer f.lock.RUnlock()

return f.getBlockLvPointer(blockNumber)
}

// GetLogByLvIndex returns the log at the given log value index. If the index does
// not point to the first log value entry of a log then no log and no error are
// returned as this can happen when the log value index was a false positive.
// Note that this function assumes that the log index structure is consistent
// with the canonical chain at the point where the given log value index points.
// If this is not the case then an invalid result or an error may be returned.
// GetLogByLvIndex implements MatcherBackend.
func (ff *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) {
f := (*FilterMaps)(ff)
f.lock.RLock()
defer f.lock.RUnlock()

return f.getLogByLvIndex(lvIndex)
}

// reset un-initializes the FilterMaps structure and removes all related data from
// the database.
// Note that this function assumes that the read/write lock is being held.
Expand Down Expand Up @@ -224,6 +193,7 @@ func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) {
}
rawdb.WriteFilterMapsRange(batch, rs)
f.updateMapCache()
f.limitMatcherRange()
}

// updateMapCache updates the maps covered by the filterMapCache according to the
Expand Down
56 changes: 42 additions & 14 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,34 @@ const (
// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
headEventCh := make(chan core.ChainHeadEvent)
sub := f.chain.SubscribeChainHeadEvent(headEventCh)
defer sub.Unsubscribe()
var (
headEventCh = make(chan core.ChainHeadEvent)
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
head *types.Header
stop, matcherSync bool
)

head := f.chain.CurrentBlock()
if head == nil {
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case ch := <-f.closeCh:
close(ch)
return
defer func() {
sub.Unsubscribe()
if matcherSync {
f.sendMatcherSync(head)
}
}
fmr := f.getRange()
}()

var stop bool
wait := func() {
if matcherSync {
f.sendMatcherSync(head)
matcherSync = false
}
if stop {
return
}
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case <-f.matcherSyncCh:
head = f.chain.CurrentBlock()
matcherSync = true
case <-time.After(time.Second * 20):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
Expand All @@ -54,10 +58,21 @@ func (f *FilterMaps) updateLoop() {
stop = true
}
}
for head == nil {
wait()
if stop {
return
}
}
fmr := f.getRange()

for !stop {
if !fmr.initialized {
f.tryInit(head)
if matcherSync {
f.sendMatcherSync(head)
matcherSync = false
}
fmr = f.getRange()
if !fmr.initialized {
wait()
Expand All @@ -73,12 +88,19 @@ func (f *FilterMaps) updateLoop() {
continue
}
}
if matcherSync {
f.sendMatcherSync(head)
matcherSync = false
}
// log index head is at latest chain head; process tail blocks if possible
f.tryExtendTail(func() bool {
// return true if tail processing needs to be stopped
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case <-f.matcherSyncCh:
head = f.chain.CurrentBlock()
matcherSync = true
case ch := <-f.closeCh:
close(ch)
stop = true
Expand Down Expand Up @@ -549,6 +571,9 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) {
// number from memory cache or from the database if available. If no such revert
// point is available then it returns no result and no error.
func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
f.lock.RLock()
defer f.lock.RUnlock()

if blockNumber > f.headBlockNumber {
blockNumber = f.headBlockNumber
}
Expand Down Expand Up @@ -577,6 +602,9 @@ func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {

// revertTo reverts the log index to the given revert point.
func (f *FilterMaps) revertTo(rp *revertPoint) error {
f.lock.Lock()
defer f.lock.Unlock()

batch := f.db.NewBatch()
afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
if rp.mapIndex >= afterLastMap {
Expand Down
2 changes: 2 additions & 0 deletions core/filtermaps/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type MatcherBackend interface {
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error)
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
Close()
Sync(ctx context.Context) validRange //TODO exported
}

// GetPotentialMatches returns a list of logs that are potential matches for the
Expand Down
155 changes: 155 additions & 0 deletions core/filtermaps/matcher_backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package filtermaps

import (
"context"

"github.com/ethereum/go-ethereum/core/types"
)

// FilterMapsMatcherBackend implements MatcherBackend.
type FilterMapsMatcherBackend struct {
f *FilterMaps
valid, closed bool
firstValid, lastValid uint64
syncCh chan validRange
}

type validRange struct {
head *types.Header
// block range indexed according to the given chain head
indexed bool
firstIndexed, lastIndexed uint64
// block range where the index has not changed since the last matcher sync
// and therefore the set of matches found in this region is guaranteed to
// be valid and complete.
valid bool
firstValid, lastValid uint64
}

func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend {
f.lock.Lock()
defer f.lock.Unlock()

fm := &FilterMapsMatcherBackend{
f: f,
valid: f.initialized,
firstValid: f.tailBlockNumber,
lastValid: f.headBlockNumber,
}
f.matchers[fm] = struct{}{}
return fm
}

// revert utan automatikusan hivjuk, write lock alatt
func (f *FilterMaps) limitMatcherRange() {
for fm := range f.matchers {
if !f.initialized {
fm.valid = false
}
if !fm.valid {
continue
}
if fm.firstValid < f.tailBlockNumber {
fm.firstValid = f.tailBlockNumber
}
if fm.lastValid > f.headBlockNumber {
fm.lastValid = f.headBlockNumber
}
if fm.firstValid > fm.lastValid {
fm.valid = false
}
}
}

func (f *FilterMaps) sendMatcherSync(head *types.Header) {
f.lock.Lock()
defer f.lock.Unlock()

for fm := range f.matchers {
if fm.syncCh == nil {
continue
}
fm.syncCh <- validRange{
head: head,
valid: fm.valid,
firstValid: fm.firstValid,
lastValid: fm.lastValid,
indexed: f.initialized,
firstIndexed: f.tailBlockNumber,
lastIndexed: f.headBlockNumber,
}
fm.valid = f.initialized
fm.firstValid = f.tailBlockNumber
fm.lastValid = f.headBlockNumber
fm.syncCh = nil
}
}

func (fm *FilterMapsMatcherBackend) Close() {
fm.f.lock.Lock()
defer fm.f.lock.Unlock()

if fm.syncCh != nil {
close(fm.syncCh)
fm.syncCh = nil
}
fm.closed = true
delete(fm.f.matchers, fm)
}

// GetFilterMapRow returns the given row of the given map. If the row is empty
// then a non-nil zero length row is returned.
// Note that the returned slices should not be modified, they should be copied
// on write.
// GetFilterMapRow implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) {
return fm.f.getFilterMapRow(mapIndex, rowIndex)
}

// GetBlockLvPointer returns the starting log value index where the log values
// generated by the given block are located. If blockNumber is beyond the current
// head then the first unoccupied log value index is returned.
// GetBlockLvPointer implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) {
fm.f.lock.RLock()
defer fm.f.lock.RUnlock()

return fm.f.getBlockLvPointer(blockNumber)
}

// GetLogByLvIndex returns the log at the given log value index. If the index does
// not point to the first log value entry of a log then no log and no error are
// returned as this can happen when the log value index was a false positive.
// Note that this function assumes that the log index structure is consistent
// with the canonical chain at the point where the given log value index points.
// If this is not the case then an invalid result or an error may be returned.
// GetLogByLvIndex implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) {
fm.f.lock.RLock()
defer fm.f.lock.RUnlock()

return fm.f.getLogByLvIndex(lvIndex)
}

func (fm *FilterMapsMatcherBackend) Sync(ctx context.Context) validRange {
syncCh := make(chan validRange, 1)
fm.f.lock.Lock()
closed := fm.closed
if !closed {
fm.syncCh = syncCh
}
fm.f.lock.Unlock()
if closed {
return validRange{}
}
select {
case fm.f.matcherSyncCh <- struct{}{}:
default:
}
select {
case vr := <-syncCh:
return vr
case <-ctx.Done():
return validRange{}
}
}
Loading

0 comments on commit c6c805f

Please sign in to comment.