Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: change maps with multiaddress keys to use strings #2284

Merged
merged 3 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions p2p/host/routed/routed.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,16 @@ func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
}

// Build lookup map
lookup := make(map[ma.Multiaddr]struct{}, len(addrs))
lookup := make(map[string]struct{}, len(addrs))
for _, addr := range addrs {
lookup[addr] = struct{}{}
lookup[string(addr.Bytes())] = struct{}{}
}

// if there's any address that's not in the previous set
// of addresses, try to connect again. If all addresses
// where known previously we return the original error.
for _, newAddr := range newAddrs {
if _, found := lookup[newAddr]; found {
if _, found := lookup[string(newAddr.Bytes())]; found {
continue
}

Expand Down
28 changes: 14 additions & 14 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ type dialResponse struct {
}

type pendRequest struct {
req dialRequest // the original request
err *DialError // dial error accumulator
addrs map[ma.Multiaddr]struct{} // pending addr dials
req dialRequest // the original request
err *DialError // dial error accumulator
addrs map[string]struct{} // pending address to dial. The key is a multiaddr
}

type addrDial struct {
Expand All @@ -46,7 +46,7 @@ type dialWorker struct {
reqch <-chan dialRequest
reqno int
requests map[int]*pendRequest
pending map[ma.Multiaddr]*addrDial
pending map[string]*addrDial // pending addresses to dial. The key is a multiaddr
resch chan dialResult

connected bool // true when a connection has been successfully established
Expand All @@ -66,7 +66,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest) *dialWorker {
peer: p,
reqch: reqch,
requests: make(map[int]*pendRequest),
pending: make(map[ma.Multiaddr]*addrDial),
pending: make(map[string]*addrDial),
resch: make(chan dialResult),
}
}
Expand Down Expand Up @@ -108,10 +108,10 @@ loop:
pr := &pendRequest{
req: req,
err: &DialError{Peer: w.peer},
addrs: make(map[ma.Multiaddr]struct{}),
addrs: make(map[string]struct{}),
}
for _, a := range addrs {
pr.addrs[a] = struct{}{}
pr.addrs[string(a.Bytes())] = struct{}{}
}

// check if any of the addrs has been successfully dialed and accumulate
Expand All @@ -120,7 +120,7 @@ loop:
var tojoin []*addrDial

for _, a := range addrs {
ad, ok := w.pending[a]
ad, ok := w.pending[string(a.Bytes())]
if !ok {
todial = append(todial, a)
continue
Expand All @@ -135,7 +135,7 @@ loop:
if ad.err != nil {
// dial to this addr errored, accumulate the error
pr.err.recordErr(a, ad.err)
delete(pr.addrs, a)
delete(pr.addrs, string(a.Bytes()))
continue
}

Expand Down Expand Up @@ -164,7 +164,7 @@ loop:

if len(todial) > 0 {
for _, a := range todial {
w.pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{w.reqno}}
w.pending[string(a.Bytes())] = &addrDial{addr: a, ctx: req.ctx, requests: []int{w.reqno}}
}

w.nextDial = append(w.nextDial, todial...)
Expand All @@ -177,7 +177,7 @@ loop:
case <-w.triggerDial:
for _, addr := range w.nextDial {
// spawn the dial
ad := w.pending[addr]
ad := w.pending[string(addr.Bytes())]
err := w.s.dialNextAddr(ad.ctx, w.peer, addr, w.resch)
if err != nil {
w.dispatchError(ad, err)
Expand All @@ -192,7 +192,7 @@ loop:
w.connected = true
}

ad := w.pending[res.Addr]
ad := w.pending[string(res.Addr.Bytes())]

if res.Conn != nil {
// we got a connection, add it to the swarm
Expand Down Expand Up @@ -247,7 +247,7 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) {
// accumulate the error
pr.err.recordErr(ad.addr, err)

delete(pr.addrs, ad.addr)
delete(pr.addrs, string(ad.addr.Bytes()))
if len(pr.addrs) == 0 {
// all addrs have erred, dispatch dial error
// but first do a last one check in case an acceptable connection has landed from
Expand All @@ -271,7 +271,7 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) {
// it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff
// regresses without this.
if err == ErrDialBackoff {
delete(w.pending, ad.addr)
delete(w.pending, string(ad.addr.Bytes()))
}
}

Expand Down
66 changes: 66 additions & 0 deletions p2p/net/swarm/dial_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/transport/tcp"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -342,3 +343,68 @@ func TestDialWorkerLoopConcurrentFailureStress(t *testing.T) {
close(reqch)
worker.wg.Wait()
}

func TestDialWorkerLoopAddrDedup(t *testing.T) {
s1 := makeSwarm(t)
s2 := makeSwarm(t)
defer s1.Close()
defer s2.Close()
t1 := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000))
t2 := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000))

// acceptAndClose accepts a connection and closes it
acceptAndClose := func(a ma.Multiaddr, ch chan struct{}, closech chan struct{}) {
list, err := manet.Listen(a)
if err != nil {
t.Error(err)
return
}
go func() {
ch <- struct{}{}
for {
conn, err := list.Accept()
if err != nil {
return
}
ch <- struct{}{}
conn.Close()
}
}()
<-closech
list.Close()
}
ch := make(chan struct{}, 1)
closeCh := make(chan struct{})
go acceptAndClose(t1, ch, closeCh)
defer close(closeCh)
<-ch // the routine has started listening on addr

s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{t1}, peerstore.PermanentAddrTTL)

reqch := make(chan dialRequest)
resch := make(chan dialResponse, 2)

worker := newDialWorker(s1, s2.LocalPeer(), reqch)
go worker.loop()
defer worker.wg.Wait()
defer close(reqch)

reqch <- dialRequest{ctx: context.Background(), resch: resch}
<-ch
<-resch
// Need to clear backoff otherwise the dial attempt would not be made
s1.Backoff().Clear(s2.LocalPeer())

s1.Peerstore().ClearAddrs(s2.LocalPeer())
s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{t2}, peerstore.PermanentAddrTTL)

reqch <- dialRequest{ctx: context.Background(), resch: resch}
select {
case r := <-resch:
require.Error(t, r.err)
case <-ch:
t.Errorf("didn't expect a connection attempt")
case <-time.After(5 * time.Second):
t.Errorf("expected a fail response")
}
}