diff --git a/p2p/enode/nodedb.go b/p2p/enode/nodedb.go index 621159c6da1..6582c5c0110 100644 --- a/p2p/enode/nodedb.go +++ b/p2p/enode/nodedb.go @@ -62,6 +62,9 @@ const ( dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped. dbCleanupCycle = time.Hour // Time period for running the expiration task. dbVersion = 9 + + // Timeout for throttling committing events to the DB. + dbEventsCommitThrottle = 2 * time.Second ) var ( @@ -70,22 +73,52 @@ var ( var zeroIP = make(net.IP, 16) +type fetchEvent struct { + key []byte + resultChan chan<- []byte +} + +type storeEvent struct { + key []byte + blob []byte +} + // DB is the node database, storing previously seen nodes and any collected metadata about // them for QoS purposes. type DB struct { kv kv.RwDB // Interface to the database itself runner sync.Once // Ensures we can start at most one expirer quit chan struct{} // Channel to signal the expiring thread to stop + + // Channel for fetching Ping/Pong/FindFail events + fetchEventChan chan<- fetchEvent + // Channel for storing Ping/Pong/FindFail events + storeEventChan chan<- storeEvent } // OpenDB opens a node database for storing and retrieving infos about known peers in the // network. If no path is given an in-memory, temporary database is constructed. func OpenDB(path string) (*DB, error) { logger := log.New() //TODO: move higher + + var db *DB + var err error if path == "" { - return newMemoryDB(logger) + db, err = newMemoryDB(logger) + } else { + db, err = newPersistentDB(logger, path) } - return newPersistentDB(logger, path) + if err != nil { + return nil, err + } + + fetchEventChan := make(chan fetchEvent) + storeEventChan := make(chan storeEvent) + db.fetchEventChan = fetchEventChan + db.storeEventChan = storeEventChan + go db.eventsTransactionLoop(fetchEventChan, storeEventChan, logger.New("context", "nodedb.eventsTransactionLoop")) + + return db, nil } var bucketsConfig = func(defaultBuckets kv.TableCfg) kv.TableCfg { @@ -216,18 +249,22 @@ func localItemKey(id ID, field string) []byte { // fetchInt64 retrieves an integer associated with a particular key. func (db *DB) fetchInt64(key []byte) int64 { var val int64 - if err := db.kv.View(context.Background(), func(tx kv.Tx) error { - blob, errGet := tx.GetOne(kv.Inodes, key) - if errGet != nil { - return errGet - } + + resultChan := make(chan []byte) + + select { + case db.fetchEventChan <- fetchEvent{key, resultChan}: + case <-db.quit: + } + + select { + case blob := <-resultChan: if blob != nil { if v, read := binary.Varint(blob); read > 0 { val = v } } - return nil - }); err != nil { + case <-db.quit: return 0 } @@ -238,9 +275,64 @@ func (db *DB) fetchInt64(key []byte) int64 { func (db *DB) storeInt64(key []byte, n int64) error { blob := make([]byte, binary.MaxVarintLen64) blob = blob[:binary.PutVarint(blob, n)] - return db.kv.Update(context.Background(), func(tx kv.RwTx) error { - return tx.Put(kv.Inodes, common.CopyBytes(key), blob) - }) + select { + case db.storeEventChan <- storeEvent{common.CopyBytes(key), blob}: + case <-db.quit: + return errors.New("node DB is closed") + } + return nil +} + +// eventsTransactionLoop manages a transaction for storing and fetching Ping/Pong/FindFail events. +func (db *DB) eventsTransactionLoop(fetchEventChan <-chan fetchEvent, storeEventChan <-chan storeEvent, logger log.Logger) { + tx, err := db.kv.BeginRw(context.Background()) + if err != nil { + logger.Error("failed to begin tx", "err", err) + return + } + defer tx.Rollback() + isCommitNeeded := false + + tick := time.NewTicker(dbEventsCommitThrottle) + defer tick.Stop() + + for { + select { + case event := <-storeEventChan: + err := tx.Put(kv.Inodes, event.key, event.blob) + if err != nil { + logger.Error("failed to handle a store event", "err", err) + } + isCommitNeeded = true + case event := <-fetchEventChan: + blob, err := tx.GetOne(kv.Inodes, event.key) + if err != nil { + logger.Error("failed to handle a fetch event", "err", err) + } + select { + case event.resultChan <- blob: + case <-db.quit: + return + } + case <-tick.C: + if !isCommitNeeded { + break + } + isCommitNeeded = false + err := tx.Commit() + if err != nil { + logger.Error("failed to commit events", "err", err) + tx.Rollback() + } + tx, err = db.kv.BeginRw(context.Background()) + if err != nil { + logger.Error("failed to begin tx", "err", err) + return + } + case <-db.quit: + return + } + } } // fetchUint64 retrieves an integer associated with a particular key.