Skip to content

Commit

Permalink
[stable] Fixes to discovery nodedb (#3691)
Browse files Browse the repository at this point in the history
* Update to erigon-lib stable

* Discovery: throttle node DB commits (#3581) (#3656)

UpdateFindFails/UpdateLastPingReceived/UpdateLastPongReceived events
are causing bursty DB commits (100 per minute).

This optimization throttles the disk writes to happen at most once in a few seconds,
because this info doesn't need to be persisted immediately.

This helps on HDD drives.

* Update erigon-lib

* Discovery: split node records to a sepatate DB table (#3581) (#3667)

Problem:
QuerySeeds will poke 150 random entries in the whole node DB and ignore hitting "field" entries.
In a bootstrap scenario it might hit hundreds of :lastping :lastpong entries,
and very few true "node record" entries.
After running for 15 minutes I've got totalEntryCount=1508 nodeRecordCount=114 entries.
There's a 1/16 chance of hitting a "node record" entry.
It means finding just about 10 nodes of 114 total on average from 150 attempts.

Solution:
Split "node record" entries to a separate table such that QuerySeeds doesn't do idle cycle hits.

* Discovery: add Context to Listen. (#3577)

Add explicit Context to ListenV4 and ListenV5.
This makes it possible to stop listening by an external signal.

* Discovery: refactor public key to node ID conversions. (#3634)

Encode and hash logic was duplicated in multiple places.
* Move encoding to p2p/discover/v4wire
* Move hashing to p2p/enode/idscheme

* Change newRandomLookup to create a proper random key on a curve.

* Discovery: speed up lookup tests (#3677)

* Update erigon-lib

Co-authored-by: Alexey Sharp <[email protected]>
Co-authored-by: battlmonstr <[email protected]>
  • Loading branch information
3 people authored Mar 14, 2022
1 parent a52e53a commit 461ac47
Show file tree
Hide file tree
Showing 20 changed files with 188 additions and 187 deletions.
9 changes: 7 additions & 2 deletions cmd/bootnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/ecdsa"
"flag"
"fmt"
"github.com/ledgerwatch/erigon-lib/common"
"net"
"os"

Expand Down Expand Up @@ -129,12 +130,16 @@ func main() {
PrivateKey: nodeKey,
NetRestrict: restrictList,
}

ctx, cancel := common.RootContext()
defer cancel()

if *runv5 {
if _, err := discover.ListenV5(conn, ln, cfg); err != nil {
if _, err := discover.ListenV5(ctx, conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
} else {
if _, err := discover.ListenUDP(conn, ln, cfg); err != nil {
if _, err := discover.ListenUDP(ctx, conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/sentry/download/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_s
return reply, nil
}

func (ss *SentryServerImpl) SetStatus(_ context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) {
func (ss *SentryServerImpl) SetStatus(ctx context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) {
genesisHash := gointerfaces.ConvertH256ToHash(statusData.ForkData.Genesis)

ss.lock.Lock()
Expand All @@ -879,7 +879,7 @@ func (ss *SentryServerImpl) SetStatus(_ context.Context, statusData *proto_sentr
}

// Add protocol
if err = srv.Start(); err != nil {
if err = srv.Start(ctx); err != nil {
srv.Stop()
return reply, fmt.Errorf("could not start server: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20220224065624-2b634b692f2b
github.com/ledgerwatch/log/v3 v3.4.0
github.com/ledgerwatch/erigon-lib v0.0.0-20220314143349-5c8ca0878ba4
github.com/ledgerwatch/log/v3 v3.4.1
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/logrusorgru/aurora/v3 v3.0.0
github.com/pelletier/go-toml v1.9.4
Expand All @@ -46,7 +46,7 @@ require (
github.com/spf13/cobra v1.2.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.1-0.20210427113832-6241f9ab9942
github.com/torquem-ch/mdbx-go v0.22.10
github.com/torquem-ch/mdbx-go v0.22.16
github.com/ugorji/go/codec v1.1.13
github.com/ugorji/go/codec/codecgen v1.1.13
github.com/urfave/cli v1.22.5
Expand Down
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,11 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220224065624-2b634b692f2b h1:vlh0dSuZRVUF3KXBMWI6acvusKxWCSjZldggtGcSleA=
github.com/ledgerwatch/erigon-lib v0.0.0-20220224065624-2b634b692f2b/go.mod h1:BQNYmN6i8RdHqedsztAj3XDnswYebacmFAnsznlisTc=
github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI=
github.com/ledgerwatch/erigon-lib v0.0.0-20220314143349-5c8ca0878ba4 h1:wivpdSUiMW/EZw8FoEC2lZTip7bg6MWOd3NCAqQPy6w=
github.com/ledgerwatch/erigon-lib v0.0.0-20220314143349-5c8ca0878ba4/go.mod h1:T1qFKmOyjrE1Uu2iImEsnQQDs7xBbQX/zwtrdfVOkWA=
github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=
github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
github.com/ledgerwatch/secp256k1 v1.0.0/go.mod h1:SPmqJFciiF/Q0mPt2jVs2dTr/1TZBTIA+kPMmKgBAak=
github.com/logrusorgru/aurora/v3 v3.0.0 h1:R6zcoZZbvVcGMvDCKo45A9U/lzYyzl5NfYIvznmDfE4=
Expand Down Expand Up @@ -749,8 +750,8 @@ github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ=
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/torquem-ch/mdbx-go v0.22.10 h1:reevtNP74E9SN7ESnogJr8Q8CI/0JcSMJ9tghfaLAEQ=
github.com/torquem-ch/mdbx-go v0.22.10/go.mod h1:T2fsoJDVppxfAPTLd1svUgH1kpPmeXdPESmroSHcL1E=
github.com/torquem-ch/mdbx-go v0.22.16 h1:uSuQOAKSZC7TvV4N4km+6kyER2YxaOuL/0qybsQtlUY=
github.com/torquem-ch/mdbx-go v0.22.16/go.mod h1:T2fsoJDVppxfAPTLd1svUgH1kpPmeXdPESmroSHcL1E=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM=
github.com/ugorji/go v1.1.13 h1:nB3O5kBSQGjEQAcfe1aLUYuxmXdFKmYgBZhY32rQb6Q=
Expand Down
5 changes: 3 additions & 2 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package discover

import (
"context"
"crypto/ecdsa"
"net"

Expand Down Expand Up @@ -63,8 +64,8 @@ func (cfg Config) withDefaults() Config {
}

// ListenUDP starts listening for discovery packets on the given UDP socket.
func ListenUDP(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(c, ln, cfg)
func ListenUDP(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(ctx, c, ln, cfg)
}

// ReadPacket is a packet that couldn't be handled. Those packets are sent to the unhandled
Expand Down
14 changes: 14 additions & 0 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type lookup struct {
result nodesByDistance
replyBuffer []*node
queries int
noSlowdown bool
}

type queryFunc func(*node) ([]*node, error)
Expand All @@ -50,6 +51,8 @@ func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *l
cancelCh: ctx.Done(),
queries: -1,
}
it.noSlowdown = isDisabledLookupSlowdown(ctx)

// Don't query further if we hit ourself.
// Unlikely to happen often in practice.
it.asked[tab.self().ID()] = true
Expand Down Expand Up @@ -129,7 +132,18 @@ func (it *lookup) startQueries() bool {
return it.queries > 0
}

func disableLookupSlowdown(ctx context.Context) context.Context {
return context.WithValue(ctx, "p2p.discover.lookup.noSlowdown", true)
}

func isDisabledLookupSlowdown(ctx context.Context) bool {
return ctx.Value("p2p.discover.lookup.noSlowdown") != nil
}

func (it *lookup) slowdown() {
if it.noSlowdown {
return
}
sleep := time.NewTimer(1 * time.Second)
defer sleep.Stop()
select {
Expand Down
33 changes: 0 additions & 33 deletions p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,9 @@
package discover

import (
"crypto/ecdsa"
"crypto/elliptic"
"errors"
"math/big"
"net"
"time"

"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/p2p/enode"
)

Expand All @@ -37,33 +31,6 @@ type node struct {
livenessChecks uint // how often liveness was checked
}

type encPubkey [64]byte

func encodePubkey(key *ecdsa.PublicKey) encPubkey {
var e encPubkey
math.ReadBits(key.X, e[:len(e)/2])
math.ReadBits(key.Y, e[len(e)/2:])
return e
}

func decodePubkey(curve elliptic.Curve, e []byte) (*ecdsa.PublicKey, error) {
if len(e) != len(encPubkey{}) {
return nil, errors.New("wrong size public key data")
}
p := &ecdsa.PublicKey{Curve: curve, X: new(big.Int), Y: new(big.Int)}
half := len(e) / 2
p.X.SetBytes(e[:half])
p.Y.SetBytes(e[half:])
if !p.Curve.IsOnCurve(p.X, p.Y) {
return nil, errors.New("invalid curve point")
}
return p, nil
}

func (e encPubkey) id() enode.ID {
return enode.ID(crypto.Keccak256Hash(e[:]))
}

func wrapNode(n *enode.Node) *node {
return &node{Node: *n}
}
Expand Down
1 change: 1 addition & 0 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (tab *Table) doRefresh(done chan struct{}) {

func (tab *Table) loadSeedNodes() {
seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
tab.log.Debug("QuerySeeds read nodes from the node DB", "count", len(seeds))
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func hexEncPrivkey(h string) *ecdsa.PrivateKey {
}

// hexEncPubkey decodes h as a public key.
func hexEncPubkey(h string) (ret encPubkey) {
func hexEncPubkey(h string) (ret enode.PubkeyEncoded) {
b, err := hex.DecodeString(h)
if err != nil {
panic(err)
Expand Down
18 changes: 9 additions & 9 deletions p2p/discover/v4_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestUDPv4_Lookup(t *testing.T) {
test := newUDPTest(t)

// Lookup on empty table returns no nodes.
targetKey, _ := decodePubkey(crypto.S256(), lookupTestnet.target[:])
targetKey, _ := v4wire.DecodePubkey(crypto.S256(), v4wire.Pubkey(lookupTestnet.target))
if results := test.udp.LookupPubkey(targetKey); len(results) > 0 {
t.Fatalf("lookup on empty table returned %d results: %#v", len(results), results)
}
Expand All @@ -61,7 +61,7 @@ func TestUDPv4_Lookup(t *testing.T) {
results := <-resultC
t.Logf("results:")
for _, e := range results {
t.Logf(" ld=%d, %x", enode.LogDist(lookupTestnet.target.id(), e.ID()), e.ID().Bytes())
t.Logf(" ld=%d, %x", enode.LogDist(lookupTestnet.target.ID(), e.ID()), e.ID().Bytes())
}
if len(results) != bucketSize {
t.Errorf("wrong number of results: got %d, want %d", len(results), bucketSize)
Expand Down Expand Up @@ -150,7 +150,7 @@ func serveTestnet(test *udpTest, testnet *preminedTestnet) {
case *v4wire.Ping:
test.packetInFrom(nil, key, to, &v4wire.Pong{Expiration: futureExp, ReplyTok: hash})
case *v4wire.Findnode:
dist := enode.LogDist(n.ID(), testnet.target.id())
dist := enode.LogDist(n.ID(), testnet.target.ID())
nodes := testnet.nodesAtDistance(dist - 1)
test.packetInFrom(nil, key, to, &v4wire.Neighbors{Expiration: futureExp, Nodes: nodes})
}
Expand All @@ -164,12 +164,12 @@ func checkLookupResults(t *testing.T, tn *preminedTestnet, results []*enode.Node
t.Helper()
t.Logf("results:")
for _, e := range results {
t.Logf(" ld=%d, %x", enode.LogDist(tn.target.id(), e.ID()), e.ID().Bytes())
t.Logf(" ld=%d, %x", enode.LogDist(tn.target.ID(), e.ID()), e.ID().Bytes())
}
if hasDuplicates(wrapNodes(results)) {
t.Errorf("result set contains duplicate entries")
}
if !sortedByDistanceTo(tn.target.id(), wrapNodes(results)) {
if !sortedByDistanceTo(tn.target.ID(), wrapNodes(results)) {
t.Errorf("result set not sorted by distance to target")
}
wantNodes := tn.closest(len(results))
Expand Down Expand Up @@ -239,7 +239,7 @@ var lookupTestnet = &preminedTestnet{
}

type preminedTestnet struct {
target encPubkey
target enode.PubkeyEncoded
dists [hashBits + 1][]*ecdsa.PrivateKey
}

Expand Down Expand Up @@ -311,7 +311,7 @@ func (tn *preminedTestnet) closest(n int) (nodes []*enode.Node) {
}
}
sort.Slice(nodes, func(i, j int) bool {
return enode.DistCmp(tn.target.id(), nodes[i].ID(), nodes[j].ID()) < 0
return enode.DistCmp(tn.target.ID(), nodes[i].ID(), nodes[j].ID()) < 0
})
return nodes[:n]
}
Expand All @@ -326,11 +326,11 @@ func (tn *preminedTestnet) mine() {
tn.dists[i] = nil
}

targetSha := tn.target.id()
targetSha := tn.target.ID()
found, need := 0, 40
for found < need {
k := newkey()
ld := enode.LogDist(targetSha, encodePubkey(&k.PublicKey).id())
ld := enode.LogDist(targetSha, enode.PubkeyToIDV4(&k.PublicKey))
if len(tn.dists[ld]) < 8 {
tn.dists[ld] = append(tn.dists[ld], k)
found++
Expand Down
30 changes: 16 additions & 14 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"container/list"
"context"
"crypto/ecdsa"
crand "crypto/rand"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -128,9 +127,9 @@ type reply struct {
matched chan<- bool
}

func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
func ListenV4(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
cfg = cfg.withDefaults()
closeCtx, cancel := context.WithCancel(context.Background())
closeCtx, cancel := context.WithCancel(ctx)
t := &UDPv4{
conn: c,
priv: cfg.PrivateKey,
Expand Down Expand Up @@ -266,7 +265,7 @@ func (t *UDPv4) LookupPubkey(key *ecdsa.PublicKey) []*enode.Node {
// case and run the bootstrapping logic.
<-t.tab.refresh()
}
return t.newLookup(t.closeCtx, encodePubkey(key)).run()
return t.newLookup(t.closeCtx, key).run()
}

// RandomNodes is an iterator yielding nodes from a random walk of the DHT.
Expand All @@ -281,20 +280,23 @@ func (t *UDPv4) lookupRandom() []*enode.Node {

// lookupSelf implements transport.
func (t *UDPv4) lookupSelf() []*enode.Node {
return t.newLookup(t.closeCtx, encodePubkey(&t.priv.PublicKey)).run()
return t.newLookup(t.closeCtx, &t.priv.PublicKey).run()
}

func (t *UDPv4) newRandomLookup(ctx context.Context) *lookup {
var target encPubkey
crand.Read(target[:])
return t.newLookup(ctx, target)
key, err := crypto.GenerateKey()
if err != nil {
t.log.Warn("Failed to generate a random node key for newRandomLookup", "err", err)
key = t.priv
}
return t.newLookup(ctx, &key.PublicKey)
}

func (t *UDPv4) newLookup(ctx context.Context, targetKey encPubkey) *lookup {
target := enode.ID(crypto.Keccak256Hash(targetKey[:]))
ekey := v4wire.Pubkey(targetKey)
func (t *UDPv4) newLookup(ctx context.Context, targetKey *ecdsa.PublicKey) *lookup {
targetKeyEnc := v4wire.EncodePubkey(targetKey)
target := enode.PubkeyEncoded(targetKeyEnc).ID()
it := newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
return t.findnode(n.ID(), n.addr(), ekey)
return t.findnode(n.ID(), n.addr(), targetKeyEnc)
})
return it
}
Expand Down Expand Up @@ -565,7 +567,7 @@ func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error {
return err
}
packet := t.wrapPacket(rawpacket)
fromID := fromKey.ID()
fromID := enode.PubkeyEncoded(fromKey).ID()
if packet.preverify != nil {
err = packet.preverify(packet, from, fromID, fromKey)
}
Expand Down Expand Up @@ -741,7 +743,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno
req := h.Packet.(*v4wire.Findnode)

// Determine closest nodes.
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
target := enode.PubkeyEncoded(req.Target).ID()
closest := t.tab.findnodeByID(target, bucketSize, true).entries

// Send neighbors in chunks with at most maxNeighbors per packet
Expand Down
Loading

0 comments on commit 461ac47

Please sign in to comment.