Skip to content

Commit

Permalink
Logs sub (#3666)
Browse files Browse the repository at this point in the history
* save

* Add onLogs

* Fix lint

* Add proper logs

* Update go.mod

* goimports

* Add unwind
  • Loading branch information
primalcs authored and Alex Sharp committed Mar 26, 2022
1 parent c6c02fe commit 64a07f0
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 8 deletions.
58 changes: 57 additions & 1 deletion eth/stagedsync/stage_finish.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package stagedsync

import (
"bytes"
"context"
"encoding/binary"
"fmt"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
types2 "github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon-lib/kv"
common2 "github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/ethdb/cbor"
"github.com/ledgerwatch/erigon/params"
"github.com/ledgerwatch/log/v3"
)
Expand Down Expand Up @@ -114,8 +121,10 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS
}
// Notify all headers we have (either canonical or not) in a maximum range span of 1024
var notifyFrom uint64
var isUnwind bool
if unwindTo != nil && *unwindTo != 0 && (*unwindTo) < finishStageBeforeSync {
notifyFrom = *unwindTo
isUnwind = true
} else {
heightSpan := finishStageAfterSync - finishStageBeforeSync
if heightSpan > 1024 {
Expand All @@ -139,7 +148,54 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS
return err
}
notifier.OnNewHeader(headersRlp)

log.Info("RPC Daemon notified of new headers", "from", notifyFrom-1, "to", notifyTo)
logs, err := ReadLogs(tx, notifyFrom, isUnwind)
if err != nil {
return err
}
notifier.OnLogs(logs)

return nil
}

func ReadLogs(tx kv.Tx, from uint64, isUnwind bool) ([]*remote.SubscribeLogsReply, error) {
logs, err := tx.Cursor(kv.Log)
if err != nil {
return nil, err
}
defer logs.Close()
reply := make([]*remote.SubscribeLogsReply, 0)
reader := bytes.NewReader(nil)

for k, v, err := logs.Seek(dbutils.LogKey(from, 0)); k != nil; k, v, err = logs.Next() {
if err != nil {
return nil, err
}
blockNum := binary.BigEndian.Uint64(k[:8])
var ll types.Logs
reader.Reset(v)
if err := cbor.Unmarshal(&ll, reader); err != nil {
return nil, fmt.Errorf("receipt unmarshal failed: %w, blocl=%d", err, blockNum)
}

for _, l := range ll {
r := &remote.SubscribeLogsReply{
Address: gointerfaces.ConvertAddressToH160(l.Address),
BlockHash: gointerfaces.ConvertHashToH256(l.BlockHash),
BlockNumber: l.BlockNumber,
Data: l.Data,
LogIndex: uint64(l.Index),
Topics: make([]*types2.H256, 0),
TransactionHash: gointerfaces.ConvertHashToH256(l.TxHash),
TransactionIndex: uint64(l.TxIndex),
Removed: isUnwind,
}
for _, topic := range l.Topics {
r.Topics = append(r.Topics, gointerfaces.ConvertHashToH256(topic))
}
reply = append(reply, r)
}
}

return reply, nil
}
2 changes: 2 additions & 0 deletions eth/stagedsync/stagebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stagedsync
import (
"context"

"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
Expand All @@ -13,6 +14,7 @@ import (
type ChainEventNotifier interface {
OnNewHeader(newHeadersRlp [][]byte)
OnNewPendingLogs(types.Logs)
OnLogs([]*remote.SubscribeLogsReply)
}

type Notifications struct {
Expand Down
23 changes: 17 additions & 6 deletions ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package privateapi

import (
"context"
"fmt"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
Expand All @@ -16,14 +17,15 @@ import (
// 2.0.0 - move all mining-related methods to 'txpool/mining' server
// 2.1.0 - add NetPeerCount function
// 2.2.0 - add NodesInfo function
var EthBackendAPIVersion = &types2.VersionReply{Major: 2, Minor: 2, Patch: 0}
// 2.3.0 - add Subscribe to logs
var EthBackendAPIVersion = &types2.VersionReply{Major: 2, Minor: 3, Patch: 0}

type EthBackendServer struct {
remote.UnimplementedETHBACKENDServer // must be embedded to have forward compatible implementations.

ctx context.Context
eth EthBackend
events *Events
ctx context.Context
eth EthBackend
events *Events
logsFilter *LogsFilterAggregator
}

type EthBackend interface {
Expand All @@ -34,7 +36,9 @@ type EthBackend interface {
}

func NewEthBackendServer(ctx context.Context, eth EthBackend, events *Events) *EthBackendServer {
return &EthBackendServer{ctx: ctx, eth: eth, events: events}
s := &EthBackendServer{ctx: ctx, eth: eth, events: events, logsFilter: NewLogsFilterAggregator()}
s.events.AddLogsSubscription(s.logsFilter.distributeLogs)
return s
}

func (s *EthBackendServer) Version(context.Context, *emptypb.Empty) (*types2.VersionReply, error) {
Expand Down Expand Up @@ -116,3 +120,10 @@ func (s *EthBackendServer) NodeInfo(_ context.Context, r *remote.NodesInfoReques
}
return nodesInfo, nil
}

func (s *EthBackendServer) SubscribeLogs(server remote.ETHBACKEND_SubscribeLogsServer) error {
if s.logsFilter != nil {
return s.logsFilter.subscribeLogs(server)
}
return fmt.Errorf("no logs filter available")
}
20 changes: 20 additions & 0 deletions ethdb/privateapi/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package privateapi
import (
"sync"

"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon/core/types"
)

Expand All @@ -12,6 +13,7 @@ type HeaderSubscription func(headerRLP []byte) error
type PendingLogsSubscription func(types.Logs) error
type PendingBlockSubscription func(*types.Block) error
type PendingTxsSubscription func([]types.Transaction) error
type LogsSubscription func([]*remote.SubscribeLogsReply) error

// Events manages event subscriptions and dissimination. Thread-safe
type Events struct {
Expand All @@ -20,6 +22,7 @@ type Events struct {
pendingLogsSubscriptions map[int]PendingLogsSubscription
pendingBlockSubscriptions map[int]PendingBlockSubscription
pendingTxsSubscriptions map[int]PendingTxsSubscription
logsSubscriptions map[int]LogsSubscription
lock sync.RWMutex
}

Expand All @@ -29,6 +32,7 @@ func NewEvents() *Events {
pendingLogsSubscriptions: map[int]PendingLogsSubscription{},
pendingBlockSubscriptions: map[int]PendingBlockSubscription{},
pendingTxsSubscriptions: map[int]PendingTxsSubscription{},
logsSubscriptions: map[int]LogsSubscription{},
}
}

Expand Down Expand Up @@ -57,6 +61,12 @@ func (e *Events) AddPendingBlockSubscription(s PendingBlockSubscription) {
e.pendingBlockSubscriptions[len(e.pendingBlockSubscriptions)] = s
}

func (e *Events) AddLogsSubscription(s LogsSubscription) {
e.lock.Lock()
defer e.lock.Unlock()
e.logsSubscriptions[len(e.logsSubscriptions)] = s
}

func (e *Events) OnNewHeader(newHeadersRlp [][]byte) {
e.lock.Lock()
defer e.lock.Unlock()
Expand Down Expand Up @@ -84,3 +94,13 @@ func (e *Events) OnNewPendingLogs(logs types.Logs) {
}
}
}

func (e *Events) OnLogs(logs []*remote.SubscribeLogsReply) {
e.lock.Lock()
defer e.lock.Unlock()
for i, sub := range e.logsSubscriptions {
if err := sub(logs); err != nil {
delete(e.logsSubscriptions, i)
}
}
}
170 changes: 170 additions & 0 deletions ethdb/privateapi/logsfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package privateapi

import (
"fmt"
"io"
"sync"

"github.com/ledgerwatch/erigon-lib/gointerfaces"
"github.com/ledgerwatch/erigon-lib/gointerfaces/remote"
"github.com/ledgerwatch/erigon-lib/gointerfaces/types"
"github.com/ledgerwatch/erigon/common"
)

type LogsFilterAggregator struct {
aggLogsFilter LogsFilter // Aggregation of all current log filters
logsFilters map[uint64]*LogsFilter // Filter for each subscriber, keyed by filterID
logsFilterLock sync.Mutex
nextFilterId uint64
}

// LogsFilter is used for both representing log filter for a specific subscriber (RPC daemon usually)
// and "aggregated" log filter representing a union of all subscribers. Therefore, the values in
// the mappings are counters (of type int) and they get deleted when counter goes back to 0
// Also, addAddr and allTopic are int instead of bool because they are also counter, counting
// how many subscribers have this set on
type LogsFilter struct {
allAddrs int
addrs map[common.Address]int
allTopics int
topics map[common.Hash]int
sender remote.ETHBACKEND_SubscribeLogsServer // nil for aggregate subscriber, for appropriate stream server otherwise
}

func NewLogsFilterAggregator() *LogsFilterAggregator {
return &LogsFilterAggregator{
aggLogsFilter: LogsFilter{
addrs: make(map[common.Address]int),
topics: make(map[common.Hash]int),
},
logsFilters: make(map[uint64]*LogsFilter),
nextFilterId: 0,
}
}

func (a *LogsFilterAggregator) insertLogsFilter(sender remote.ETHBACKEND_SubscribeLogsServer) (uint64, *LogsFilter) {
a.logsFilterLock.Lock()
defer a.logsFilterLock.Unlock()
filterId := a.nextFilterId
a.nextFilterId++
filter := &LogsFilter{addrs: make(map[common.Address]int), topics: make(map[common.Hash]int), sender: sender}
a.logsFilters[filterId] = filter
return filterId, filter
}

func (a *LogsFilterAggregator) removeLogsFilter(filterId uint64, filter *LogsFilter) {
a.logsFilterLock.Lock()
defer a.logsFilterLock.Unlock()
a.subtractLogFilters(filter)
delete(a.logsFilters, filterId)
}

func (a *LogsFilterAggregator) updateLogsFilter(filter *LogsFilter, filterReq *remote.LogsFilterRequest) {
a.logsFilterLock.Lock()
defer a.logsFilterLock.Unlock()
a.subtractLogFilters(filter)
filter.addrs = make(map[common.Address]int)
if filterReq.GetAllAddresses() {
filter.allAddrs = 1
} else {
filter.allAddrs = 0
for _, addr := range filterReq.GetAddresses() {
filter.addrs[gointerfaces.ConvertH160toAddress(addr)] = 1
}
}
filter.topics = make(map[common.Hash]int)
if filterReq.GetAllTopics() {
filter.allTopics = 1
} else {
filter.allTopics = 0
for _, topic := range filterReq.GetTopics() {
filter.topics[gointerfaces.ConvertH256ToHash(topic)] = 1
}
}
a.addLogsFilters(filter)
}

func (a *LogsFilterAggregator) subtractLogFilters(f *LogsFilter) {
a.aggLogsFilter.allAddrs -= f.allAddrs
for addr, count := range f.addrs {
a.aggLogsFilter.addrs[addr] -= count
if a.aggLogsFilter.addrs[addr] == 0 {
delete(a.aggLogsFilter.addrs, addr)
}
}
a.aggLogsFilter.allTopics -= f.allTopics
for topic, count := range f.topics {
a.aggLogsFilter.topics[topic] -= count
if a.aggLogsFilter.topics[topic] == 0 {
delete(a.aggLogsFilter.topics, topic)
}
}
}

func (a *LogsFilterAggregator) addLogsFilters(f *LogsFilter) {
a.aggLogsFilter.allAddrs += f.allAddrs
for addr, count := range f.addrs {
a.aggLogsFilter.addrs[addr] += count
}
a.aggLogsFilter.allTopics += f.allTopics
for topic, count := range f.topics {
a.aggLogsFilter.topics[topic] += count
}
}

// SubscribeLogs
// Only one subscription is needed to serve all the users, LogsFilterRequest allows to dynamically modifying the subscription
func (a *LogsFilterAggregator) subscribeLogs(server remote.ETHBACKEND_SubscribeLogsServer) error {
filterId, filter := a.insertLogsFilter(server)
defer a.removeLogsFilter(filterId, filter)
// Listen to filter updates and modify the filters, until terminated
var filterReq *remote.LogsFilterRequest
var recvErr error
for filterReq, recvErr = server.Recv(); recvErr != nil; filterReq, recvErr = server.Recv() {
a.updateLogsFilter(filter, filterReq)
}
if recvErr != nil && recvErr != io.EOF { // termination
return fmt.Errorf("receiving log filter request: %w", recvErr)
}
return nil
}

func (a *LogsFilterAggregator) distributeLogs(logs []*remote.SubscribeLogsReply) error {
a.logsFilterLock.Lock()
defer a.logsFilterLock.Unlock()
filtersToDelete := make(map[uint64]*LogsFilter)
filterLoop:
for filterId, filter := range a.logsFilters {
for _, log := range logs {
_, addrOk := filter.addrs[gointerfaces.ConvertH160toAddress(log.Address)]
if !addrOk {
continue
}
if filter.allTopics == 0 {
if !a.chooseTopics(filter.topics, log.GetTopics()) {
continue
}
}
if err := filter.sender.Send(log); err != nil {
filtersToDelete[filterId] = filter
continue filterLoop
}
}
}
// remove malfunctioned filters
for filterId, filter := range filtersToDelete {
a.subtractLogFilters(filter)
delete(a.logsFilters, filterId)
}

return nil
}

func (a *LogsFilterAggregator) chooseTopics(filterTopics map[common.Hash]int, logTopics []*types.H256) bool {
for _, logTopic := range logTopics {
if _, ok := filterTopics[gointerfaces.ConvertH256ToHash(logTopic)]; ok {
return true
}
}
return false
}
1 change: 0 additions & 1 deletion interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"math/big"

"github.com/holiman/uint256"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/types"
)
Expand Down

0 comments on commit 64a07f0

Please sign in to comment.