Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p/discover: add Table configuration and Nodes method #27387

Merged
merged 3 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package discover
import (
"crypto/ecdsa"
"net"
"time"

"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log"
Expand All @@ -35,29 +36,39 @@ type UDPConn interface {
LocalAddr() net.Addr
}

type V5Config struct {
ProtocolID *[6]byte
}

// Config holds settings for the discovery listener.
type Config struct {
// These settings are required and configure the UDP listener:
PrivateKey *ecdsa.PrivateKey

// These settings are optional:
// All remaining settings are optional.

// Packet handling configuration:
NetRestrict *netutil.Netlist // list of allowed IP networks
Bootnodes []*enode.Node // list of bootstrap nodes
Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
Log log.Logger // if set, log messages go here

// V5ProtocolID configures the discv5 protocol identifier.
V5ProtocolID *[6]byte
// Node table configuration:
Bootnodes []*enode.Node // list of bootstrap nodes
PingInterval time.Duration // speed of node liveness check
RefreshInterval time.Duration // used in bucket refresh

// The options below are useful in very specific cases, like in unit tests.
V5ProtocolID *[6]byte
Log log.Logger // if set, log messages go here
ValidSchemes enr.IdentityScheme // allowed identity schemes
Clock mclock.Clock
}

func (cfg Config) withDefaults() Config {
// Node table configuration:
if cfg.PingInterval == 0 {
cfg.PingInterval = 10 * time.Second
}
if cfg.RefreshInterval == 0 {
cfg.RefreshInterval = 30 * time.Minute
}

// Debug/test settings:
if cfg.Log == nil {
cfg.Log = log.Root()
}
Expand Down
85 changes: 48 additions & 37 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,10 @@ const (
bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
tableIPLimit, tableSubnet = 10, 24

refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
seedMinTableTime = 5 * time.Minute
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
copyNodesInterval = 30 * time.Second
seedMinTableTime = 5 * time.Minute
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
)

// Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps
Expand All @@ -71,9 +69,12 @@ type Table struct {
rand *mrand.Rand // source of randomness, periodically reseeded
ips netutil.DistinctNetSet

log log.Logger
db *enode.DB // database of known nodes
net transport
db *enode.DB // database of known nodes
net transport
cfg Config
log log.Logger

// loop channels
refreshReq chan chan struct{}
initDone chan struct{}
closeReq chan struct{}
Expand All @@ -99,19 +100,21 @@ type bucket struct {
ips netutil.DistinctNetSet
}

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
cfg = cfg.withDefaults()
tab := &Table{
net: t,
db: db,
cfg: cfg,
log: cfg.Log,
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
log: log,
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
if err := tab.setFallbackNodes(cfg.Bootnodes); err != nil {
return nil, err
}
for i := range tab.buckets {
Expand All @@ -125,25 +128,12 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
return tab, nil
}

func (tab *Table) self() *enode.Node {
return tab.net.Self()
}

func (tab *Table) seedRand() {
var b [8]byte
crand.Read(b[:])

tab.mutex.Lock()
tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:])))
tab.mutex.Unlock()
}

// ReadRandomNodes fills the given slice with random nodes from the table. The results
// are guaranteed to be unique for a single invocation, no node will appear twice.
func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {
// Nodes returns all nodes contained in the table.
func (tab *Table) Nodes() []*enode.Node {
if !tab.isInitDone() {
return 0
return nil
}

tab.mutex.Lock()
defer tab.mutex.Unlock()

Expand All @@ -153,12 +143,20 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {
nodes = append(nodes, unwrapNode(n))
}
}
// Shuffle.
for i := 0; i < len(nodes); i++ {
j := tab.rand.Intn(len(nodes))
nodes[i], nodes[j] = nodes[j], nodes[i]
}
return copy(buf, nodes)
return nodes
}

func (tab *Table) self() *enode.Node {
return tab.net.Self()
}

func (tab *Table) seedRand() {
var b [8]byte
crand.Read(b[:])

tab.mutex.Lock()
tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:])))
tab.mutex.Unlock()
}

// getNode returns the node with the given ID or nil if it isn't in the table.
Expand Down Expand Up @@ -218,7 +216,7 @@ func (tab *Table) refresh() <-chan struct{} {
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
refresh = time.NewTimer(tab.nextRefreshTime())
copyNodes = time.NewTicker(copyNodesInterval)
refreshDone = make(chan struct{}) // where doRefresh reports completion
revalidateDone chan struct{} // where doRevalidate reports completion
Expand Down Expand Up @@ -251,6 +249,7 @@ loop:
close(ch)
}
waiting, refreshDone = nil, nil
refresh.Reset(tab.nextRefreshTime())
case <-revalidate.C:
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
Expand Down Expand Up @@ -373,7 +372,15 @@ func (tab *Table) nextRevalidateTime() time.Duration {
tab.mutex.Lock()
defer tab.mutex.Unlock()

return time.Duration(tab.rand.Int63n(int64(revalidateInterval)))
return time.Duration(tab.rand.Int63n(int64(tab.cfg.PingInterval)))
}

func (tab *Table) nextRefreshTime() time.Duration {
tab.mutex.Lock()
defer tab.mutex.Unlock()

half := tab.cfg.RefreshInterval / 2
return half + time.Duration(tab.rand.Int63n(int64(half)))
}

// copyLiveNodes adds nodes from the table to the database if they have been in the table
Expand Down Expand Up @@ -481,10 +488,12 @@ func (tab *Table) addSeenNode(n *node) {
// Can't add: IP limit reached.
return
}

// Add to end of bucket:
b.entries = append(b.entries, n)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()

if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
Expand Down Expand Up @@ -523,10 +532,12 @@ func (tab *Table) addVerifiedNode(n *node) {
// Can't add: IP limit reached.
return
}

// Add to front of bucket.
b.entries, _ = pushNode(b.entries, n, bucketSize)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()

if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
Expand Down
35 changes: 0 additions & 35 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,41 +247,6 @@ func TestTable_findnodeByID(t *testing.T) {
}
}

func TestTable_ReadRandomNodesGetAll(t *testing.T) {
cfg := &quick.Config{
MaxCount: 200,
Rand: rand.New(rand.NewSource(time.Now().Unix())),
Values: func(args []reflect.Value, rand *rand.Rand) {
args[0] = reflect.ValueOf(make([]*enode.Node, rand.Intn(1000)))
},
}
test := func(buf []*enode.Node) bool {
transport := newPingRecorder()
tab, db := newTestTable(transport)
defer db.Close()
defer tab.close()
<-tab.initDone

for i := 0; i < len(buf); i++ {
ld := cfg.Rand.Intn(len(tab.buckets))
fillTable(tab, []*node{nodeAtDistance(tab.self().ID(), ld, intIP(ld))})
}
gotN := tab.ReadRandomNodes(buf)
if gotN != tab.len() {
t.Errorf("wrong number of nodes, got %d, want %d", gotN, tab.len())
return false
}
if hasDuplicates(wrapNodes(buf[:gotN])) {
t.Errorf("result contains duplicates")
return false
}
return true
}
if err := quick.Check(test, cfg); err != nil {
t.Error(err)
}
}

type closeTest struct {
Self enode.ID
Target enode.ID
Expand Down
4 changes: 2 additions & 2 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sync"

"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
)
Expand All @@ -42,8 +41,9 @@ func init() {
}

func newTestTable(t transport) (*Table, *enode.DB) {
cfg := Config{}
db, _ := enode.OpenDB("")
tab, _ := newTable(t, db, nil, log.Root())
tab, _ := newTable(t, db, cfg)
go tab.loop()
return tab, db
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
log: cfg.Log,
}

tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
tab, err := newTable(t, ln.Database(), cfg)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
cancelCloseCtx: cancelCloseCtx,
}
t.talk = newTalkSystem(t)
tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log)
tab, err := newTable(t, t.db, cfg)
if err != nil {
return nil, err
}
Expand Down