Skip to content

Commit

Permalink
Discovery: throttle node DB commits (#3581)
Browse files Browse the repository at this point in the history
UpdateFindFails/UpdateLastPingReceived/UpdateLastPongReceived events
are causing bursty DB commits (100 per minute).

This optimization throttles the commits to happen at most once in a few seconds,
because this info doesn't need to be persisted immediately.
  • Loading branch information
battlmonstr committed Mar 7, 2022
1 parent 0715561 commit a4ba877
Showing 1 changed file with 104 additions and 12 deletions.
116 changes: 104 additions & 12 deletions p2p/enode/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const (
dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
dbCleanupCycle = time.Hour // Time period for running the expiration task.
dbVersion = 9

// Timeout for throttling committing events to the DB.
dbEventsCommitThrottle = 2 * time.Second
)

var (
Expand All @@ -70,22 +73,52 @@ var (

var zeroIP = make(net.IP, 16)

type fetchEvent struct {
key []byte
resultChan chan<- []byte
}

type storeEvent struct {
key []byte
blob []byte
}

// DB is the node database, storing previously seen nodes and any collected metadata about
// them for QoS purposes.
type DB struct {
kv kv.RwDB // Interface to the database itself
runner sync.Once // Ensures we can start at most one expirer
quit chan struct{} // Channel to signal the expiring thread to stop

// Channel for fetching Ping/Pong/FindFail events
fetchEventChan chan<- fetchEvent
// Channel for storing Ping/Pong/FindFail events
storeEventChan chan<- storeEvent
}

// OpenDB opens a node database for storing and retrieving infos about known peers in the
// network. If no path is given an in-memory, temporary database is constructed.
func OpenDB(path string) (*DB, error) {
logger := log.New() //TODO: move higher

var db *DB
var err error
if path == "" {
return newMemoryDB(logger)
db, err = newMemoryDB(logger)
} else {
db, err = newPersistentDB(logger, path)
}
return newPersistentDB(logger, path)
if err != nil {
return nil, err
}

fetchEventChan := make(chan fetchEvent)
storeEventChan := make(chan storeEvent)
db.fetchEventChan = fetchEventChan
db.storeEventChan = storeEventChan
go db.eventsTransactionLoop(fetchEventChan, storeEventChan, logger.New("context", "nodedb.eventsTransactionLoop"))

return db, nil
}

var bucketsConfig = func(defaultBuckets kv.TableCfg) kv.TableCfg {
Expand Down Expand Up @@ -216,18 +249,22 @@ func localItemKey(id ID, field string) []byte {
// fetchInt64 retrieves an integer associated with a particular key.
func (db *DB) fetchInt64(key []byte) int64 {
var val int64
if err := db.kv.View(context.Background(), func(tx kv.Tx) error {
blob, errGet := tx.GetOne(kv.Inodes, key)
if errGet != nil {
return errGet
}

resultChan := make(chan []byte)

select {
case db.fetchEventChan <- fetchEvent{key, resultChan}:
case <-db.quit:
}

select {
case blob := <-resultChan:
if blob != nil {
if v, read := binary.Varint(blob); read > 0 {
val = v
}
}
return nil
}); err != nil {
case <-db.quit:
return 0
}

Expand All @@ -238,9 +275,64 @@ func (db *DB) fetchInt64(key []byte) int64 {
func (db *DB) storeInt64(key []byte, n int64) error {
blob := make([]byte, binary.MaxVarintLen64)
blob = blob[:binary.PutVarint(blob, n)]
return db.kv.Update(context.Background(), func(tx kv.RwTx) error {
return tx.Put(kv.Inodes, common.CopyBytes(key), blob)
})
select {
case db.storeEventChan <- storeEvent{common.CopyBytes(key), blob}:
case <-db.quit:
return errors.New("node DB is closed")
}
return nil
}

// eventsTransactionLoop manages a transaction for storing and fetching Ping/Pong/FindFail events.
func (db *DB) eventsTransactionLoop(fetchEventChan <-chan fetchEvent, storeEventChan <-chan storeEvent, logger log.Logger) {
tx, err := db.kv.BeginRw(context.Background())
if err != nil {
logger.Error("failed to begin tx", "err", err)
return
}
defer tx.Rollback()
isCommitNeeded := false

tick := time.NewTicker(dbEventsCommitThrottle)
defer tick.Stop()

for {
select {
case event := <-storeEventChan:
err := tx.Put(kv.Inodes, event.key, event.blob)
if err != nil {
logger.Error("failed to handle a store event", "err", err)
}
isCommitNeeded = true
case event := <-fetchEventChan:
blob, err := tx.GetOne(kv.Inodes, event.key)
if err != nil {
logger.Error("failed to handle a fetch event", "err", err)
}
select {
case event.resultChan <- blob:
case <-db.quit:
return
}
case <-tick.C:
if !isCommitNeeded {
break
}
isCommitNeeded = false
err := tx.Commit()
if err != nil {
logger.Error("failed to commit events", "err", err)
tx.Rollback()
}
tx, err = db.kv.BeginRw(context.Background())
if err != nil {
logger.Error("failed to begin tx", "err", err)
return
}
case <-db.quit:
return
}
}
}

// fetchUint64 retrieves an integer associated with a particular key.
Expand Down

0 comments on commit a4ba877

Please sign in to comment.