Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stable] Fixes to discovery nodedb #3691

Merged
merged 8 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/bootnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/ecdsa"
"flag"
"fmt"
"github.com/ledgerwatch/erigon-lib/common"
"net"
"os"

Expand Down Expand Up @@ -129,12 +130,16 @@ func main() {
PrivateKey: nodeKey,
NetRestrict: restrictList,
}

ctx, cancel := common.RootContext()
defer cancel()

if *runv5 {
if _, err := discover.ListenV5(conn, ln, cfg); err != nil {
if _, err := discover.ListenV5(ctx, conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
} else {
if _, err := discover.ListenUDP(conn, ln, cfg); err != nil {
if _, err := discover.ListenUDP(ctx, conn, ln, cfg); err != nil {
utils.Fatalf("%v", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/sentry/download/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ func (ss *SentryServerImpl) HandShake(context.Context, *emptypb.Empty) (*proto_s
return reply, nil
}

func (ss *SentryServerImpl) SetStatus(_ context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) {
func (ss *SentryServerImpl) SetStatus(ctx context.Context, statusData *proto_sentry.StatusData) (*proto_sentry.SetStatusReply, error) {
genesisHash := gointerfaces.ConvertH256ToHash(statusData.ForkData.Genesis)

ss.lock.Lock()
Expand All @@ -879,7 +879,7 @@ func (ss *SentryServerImpl) SetStatus(_ context.Context, statusData *proto_sentr
}

// Add protocol
if err = srv.Start(); err != nil {
if err = srv.Start(ctx); err != nil {
srv.Stop()
return reply, fmt.Errorf("could not start server: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ require (
github.com/json-iterator/go v1.1.12
github.com/julienschmidt/httprouter v1.3.0
github.com/kevinburke/go-bindata v3.21.0+incompatible
github.com/ledgerwatch/erigon-lib v0.0.0-20220224065624-2b634b692f2b
github.com/ledgerwatch/log/v3 v3.4.0
github.com/ledgerwatch/erigon-lib v0.0.0-20220314143349-5c8ca0878ba4
github.com/ledgerwatch/log/v3 v3.4.1
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/logrusorgru/aurora/v3 v3.0.0
github.com/pelletier/go-toml v1.9.4
Expand All @@ -46,7 +46,7 @@ require (
github.com/spf13/cobra v1.2.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.1-0.20210427113832-6241f9ab9942
github.com/torquem-ch/mdbx-go v0.22.10
github.com/torquem-ch/mdbx-go v0.22.16
github.com/ugorji/go/codec v1.1.13
github.com/ugorji/go/codec/codecgen v1.1.13
github.com/urfave/cli v1.22.5
Expand Down
11 changes: 6 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -493,10 +493,11 @@ github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758 h1:0D5M2HQSGD3P
github.com/kylelemons/godebug v0.0.0-20170224010052-a616ab194758/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledgerwatch/erigon-lib v0.0.0-20220224065624-2b634b692f2b h1:vlh0dSuZRVUF3KXBMWI6acvusKxWCSjZldggtGcSleA=
github.com/ledgerwatch/erigon-lib v0.0.0-20220224065624-2b634b692f2b/go.mod h1:BQNYmN6i8RdHqedsztAj3XDnswYebacmFAnsznlisTc=
github.com/ledgerwatch/log/v3 v3.4.0 h1:SEIOcv5a2zkG3PmoT5jeTU9m/0nEUv0BJS5bzsjwKCI=
github.com/ledgerwatch/erigon-lib v0.0.0-20220314143349-5c8ca0878ba4 h1:wivpdSUiMW/EZw8FoEC2lZTip7bg6MWOd3NCAqQPy6w=
github.com/ledgerwatch/erigon-lib v0.0.0-20220314143349-5c8ca0878ba4/go.mod h1:T1qFKmOyjrE1Uu2iImEsnQQDs7xBbQX/zwtrdfVOkWA=
github.com/ledgerwatch/log/v3 v3.4.0/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/log/v3 v3.4.1 h1:/xGwlVulXnsO9Uq+tzaExc8OWmXXHU0dnLalpbnY5Bc=
github.com/ledgerwatch/log/v3 v3.4.1/go.mod h1:VXcz6Ssn6XEeU92dCMc39/g1F0OYAjw1Mt+dGP5DjXY=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
github.com/ledgerwatch/secp256k1 v1.0.0/go.mod h1:SPmqJFciiF/Q0mPt2jVs2dTr/1TZBTIA+kPMmKgBAak=
github.com/logrusorgru/aurora/v3 v3.0.0 h1:R6zcoZZbvVcGMvDCKo45A9U/lzYyzl5NfYIvznmDfE4=
Expand Down Expand Up @@ -749,8 +750,8 @@ github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ=
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/torquem-ch/mdbx-go v0.22.10 h1:reevtNP74E9SN7ESnogJr8Q8CI/0JcSMJ9tghfaLAEQ=
github.com/torquem-ch/mdbx-go v0.22.10/go.mod h1:T2fsoJDVppxfAPTLd1svUgH1kpPmeXdPESmroSHcL1E=
github.com/torquem-ch/mdbx-go v0.22.16 h1:uSuQOAKSZC7TvV4N4km+6kyER2YxaOuL/0qybsQtlUY=
github.com/torquem-ch/mdbx-go v0.22.16/go.mod h1:T2fsoJDVppxfAPTLd1svUgH1kpPmeXdPESmroSHcL1E=
github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q=
github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM=
github.com/ugorji/go v1.1.13 h1:nB3O5kBSQGjEQAcfe1aLUYuxmXdFKmYgBZhY32rQb6Q=
Expand Down
5 changes: 3 additions & 2 deletions p2p/discover/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package discover

import (
"context"
"crypto/ecdsa"
"net"

Expand Down Expand Up @@ -63,8 +64,8 @@ func (cfg Config) withDefaults() Config {
}

// ListenUDP starts listening for discovery packets on the given UDP socket.
func ListenUDP(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(c, ln, cfg)
func ListenUDP(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
return ListenV4(ctx, c, ln, cfg)
}

// ReadPacket is a packet that couldn't be handled. Those packets are sent to the unhandled
Expand Down
14 changes: 14 additions & 0 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type lookup struct {
result nodesByDistance
replyBuffer []*node
queries int
noSlowdown bool
}

type queryFunc func(*node) ([]*node, error)
Expand All @@ -50,6 +51,8 @@ func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *l
cancelCh: ctx.Done(),
queries: -1,
}
it.noSlowdown = isDisabledLookupSlowdown(ctx)

// Don't query further if we hit ourself.
// Unlikely to happen often in practice.
it.asked[tab.self().ID()] = true
Expand Down Expand Up @@ -129,7 +132,18 @@ func (it *lookup) startQueries() bool {
return it.queries > 0
}

func disableLookupSlowdown(ctx context.Context) context.Context {
return context.WithValue(ctx, "p2p.discover.lookup.noSlowdown", true)
}

func isDisabledLookupSlowdown(ctx context.Context) bool {
return ctx.Value("p2p.discover.lookup.noSlowdown") != nil
}

func (it *lookup) slowdown() {
if it.noSlowdown {
return
}
sleep := time.NewTimer(1 * time.Second)
defer sleep.Stop()
select {
Expand Down
33 changes: 0 additions & 33 deletions p2p/discover/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,9 @@
package discover

import (
"crypto/ecdsa"
"crypto/elliptic"
"errors"
"math/big"
"net"
"time"

"github.com/ledgerwatch/erigon/common/math"
"github.com/ledgerwatch/erigon/crypto"
"github.com/ledgerwatch/erigon/p2p/enode"
)

Expand All @@ -37,33 +31,6 @@ type node struct {
livenessChecks uint // how often liveness was checked
}

type encPubkey [64]byte

func encodePubkey(key *ecdsa.PublicKey) encPubkey {
var e encPubkey
math.ReadBits(key.X, e[:len(e)/2])
math.ReadBits(key.Y, e[len(e)/2:])
return e
}

func decodePubkey(curve elliptic.Curve, e []byte) (*ecdsa.PublicKey, error) {
if len(e) != len(encPubkey{}) {
return nil, errors.New("wrong size public key data")
}
p := &ecdsa.PublicKey{Curve: curve, X: new(big.Int), Y: new(big.Int)}
half := len(e) / 2
p.X.SetBytes(e[:half])
p.Y.SetBytes(e[half:])
if !p.Curve.IsOnCurve(p.X, p.Y) {
return nil, errors.New("invalid curve point")
}
return p, nil
}

func (e encPubkey) id() enode.ID {
return enode.ID(crypto.Keccak256Hash(e[:]))
}

func wrapNode(n *enode.Node) *node {
return &node{Node: *n}
}
Expand Down
1 change: 1 addition & 0 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func (tab *Table) doRefresh(done chan struct{}) {

func (tab *Table) loadSeedNodes() {
seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
tab.log.Debug("QuerySeeds read nodes from the node DB", "count", len(seeds))
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func hexEncPrivkey(h string) *ecdsa.PrivateKey {
}

// hexEncPubkey decodes h as a public key.
func hexEncPubkey(h string) (ret encPubkey) {
func hexEncPubkey(h string) (ret enode.PubkeyEncoded) {
b, err := hex.DecodeString(h)
if err != nil {
panic(err)
Expand Down
18 changes: 9 additions & 9 deletions p2p/discover/v4_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestUDPv4_Lookup(t *testing.T) {
test := newUDPTest(t)

// Lookup on empty table returns no nodes.
targetKey, _ := decodePubkey(crypto.S256(), lookupTestnet.target[:])
targetKey, _ := v4wire.DecodePubkey(crypto.S256(), v4wire.Pubkey(lookupTestnet.target))
if results := test.udp.LookupPubkey(targetKey); len(results) > 0 {
t.Fatalf("lookup on empty table returned %d results: %#v", len(results), results)
}
Expand All @@ -61,7 +61,7 @@ func TestUDPv4_Lookup(t *testing.T) {
results := <-resultC
t.Logf("results:")
for _, e := range results {
t.Logf(" ld=%d, %x", enode.LogDist(lookupTestnet.target.id(), e.ID()), e.ID().Bytes())
t.Logf(" ld=%d, %x", enode.LogDist(lookupTestnet.target.ID(), e.ID()), e.ID().Bytes())
}
if len(results) != bucketSize {
t.Errorf("wrong number of results: got %d, want %d", len(results), bucketSize)
Expand Down Expand Up @@ -150,7 +150,7 @@ func serveTestnet(test *udpTest, testnet *preminedTestnet) {
case *v4wire.Ping:
test.packetInFrom(nil, key, to, &v4wire.Pong{Expiration: futureExp, ReplyTok: hash})
case *v4wire.Findnode:
dist := enode.LogDist(n.ID(), testnet.target.id())
dist := enode.LogDist(n.ID(), testnet.target.ID())
nodes := testnet.nodesAtDistance(dist - 1)
test.packetInFrom(nil, key, to, &v4wire.Neighbors{Expiration: futureExp, Nodes: nodes})
}
Expand All @@ -164,12 +164,12 @@ func checkLookupResults(t *testing.T, tn *preminedTestnet, results []*enode.Node
t.Helper()
t.Logf("results:")
for _, e := range results {
t.Logf(" ld=%d, %x", enode.LogDist(tn.target.id(), e.ID()), e.ID().Bytes())
t.Logf(" ld=%d, %x", enode.LogDist(tn.target.ID(), e.ID()), e.ID().Bytes())
}
if hasDuplicates(wrapNodes(results)) {
t.Errorf("result set contains duplicate entries")
}
if !sortedByDistanceTo(tn.target.id(), wrapNodes(results)) {
if !sortedByDistanceTo(tn.target.ID(), wrapNodes(results)) {
t.Errorf("result set not sorted by distance to target")
}
wantNodes := tn.closest(len(results))
Expand Down Expand Up @@ -239,7 +239,7 @@ var lookupTestnet = &preminedTestnet{
}

type preminedTestnet struct {
target encPubkey
target enode.PubkeyEncoded
dists [hashBits + 1][]*ecdsa.PrivateKey
}

Expand Down Expand Up @@ -311,7 +311,7 @@ func (tn *preminedTestnet) closest(n int) (nodes []*enode.Node) {
}
}
sort.Slice(nodes, func(i, j int) bool {
return enode.DistCmp(tn.target.id(), nodes[i].ID(), nodes[j].ID()) < 0
return enode.DistCmp(tn.target.ID(), nodes[i].ID(), nodes[j].ID()) < 0
})
return nodes[:n]
}
Expand All @@ -326,11 +326,11 @@ func (tn *preminedTestnet) mine() {
tn.dists[i] = nil
}

targetSha := tn.target.id()
targetSha := tn.target.ID()
found, need := 0, 40
for found < need {
k := newkey()
ld := enode.LogDist(targetSha, encodePubkey(&k.PublicKey).id())
ld := enode.LogDist(targetSha, enode.PubkeyToIDV4(&k.PublicKey))
if len(tn.dists[ld]) < 8 {
tn.dists[ld] = append(tn.dists[ld], k)
found++
Expand Down
30 changes: 16 additions & 14 deletions p2p/discover/v4_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"container/list"
"context"
"crypto/ecdsa"
crand "crypto/rand"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -128,9 +127,9 @@ type reply struct {
matched chan<- bool
}

func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
func ListenV4(ctx context.Context, c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
cfg = cfg.withDefaults()
closeCtx, cancel := context.WithCancel(context.Background())
closeCtx, cancel := context.WithCancel(ctx)
t := &UDPv4{
conn: c,
priv: cfg.PrivateKey,
Expand Down Expand Up @@ -266,7 +265,7 @@ func (t *UDPv4) LookupPubkey(key *ecdsa.PublicKey) []*enode.Node {
// case and run the bootstrapping logic.
<-t.tab.refresh()
}
return t.newLookup(t.closeCtx, encodePubkey(key)).run()
return t.newLookup(t.closeCtx, key).run()
}

// RandomNodes is an iterator yielding nodes from a random walk of the DHT.
Expand All @@ -281,20 +280,23 @@ func (t *UDPv4) lookupRandom() []*enode.Node {

// lookupSelf implements transport.
func (t *UDPv4) lookupSelf() []*enode.Node {
return t.newLookup(t.closeCtx, encodePubkey(&t.priv.PublicKey)).run()
return t.newLookup(t.closeCtx, &t.priv.PublicKey).run()
}

func (t *UDPv4) newRandomLookup(ctx context.Context) *lookup {
var target encPubkey
crand.Read(target[:])
return t.newLookup(ctx, target)
key, err := crypto.GenerateKey()
if err != nil {
t.log.Warn("Failed to generate a random node key for newRandomLookup", "err", err)
key = t.priv
}
return t.newLookup(ctx, &key.PublicKey)
}

func (t *UDPv4) newLookup(ctx context.Context, targetKey encPubkey) *lookup {
target := enode.ID(crypto.Keccak256Hash(targetKey[:]))
ekey := v4wire.Pubkey(targetKey)
func (t *UDPv4) newLookup(ctx context.Context, targetKey *ecdsa.PublicKey) *lookup {
targetKeyEnc := v4wire.EncodePubkey(targetKey)
target := enode.PubkeyEncoded(targetKeyEnc).ID()
it := newLookup(ctx, t.tab, target, func(n *node) ([]*node, error) {
return t.findnode(n.ID(), n.addr(), ekey)
return t.findnode(n.ID(), n.addr(), targetKeyEnc)
})
return it
}
Expand Down Expand Up @@ -565,7 +567,7 @@ func (t *UDPv4) handlePacket(from *net.UDPAddr, buf []byte) error {
return err
}
packet := t.wrapPacket(rawpacket)
fromID := fromKey.ID()
fromID := enode.PubkeyEncoded(fromKey).ID()
if packet.preverify != nil {
err = packet.preverify(packet, from, fromID, fromKey)
}
Expand Down Expand Up @@ -741,7 +743,7 @@ func (t *UDPv4) handleFindnode(h *packetHandlerV4, from *net.UDPAddr, fromID eno
req := h.Packet.(*v4wire.Findnode)

// Determine closest nodes.
target := enode.ID(crypto.Keccak256Hash(req.Target[:]))
target := enode.PubkeyEncoded(req.Target).ID()
closest := t.tab.findnodeByID(target, bucketSize, true).entries

// Send neighbors in chunks with at most maxNeighbors per packet
Expand Down
Loading