From c98df0ff014ce28cdfa354f9b6bd83b8844526f5 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 11 May 2023 17:35:35 +0530 Subject: [PATCH] swarm: change maps with multiaddress keys to use strings --- p2p/host/routed/routed.go | 6 +-- p2p/net/swarm/dial_worker.go | 28 ++++++------- p2p/net/swarm/dial_worker_test.go | 68 +++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 17 deletions(-) diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index c4601a50dd..eb8e58ee7f 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -119,16 +119,16 @@ func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error { } // Build lookup map - lookup := make(map[ma.Multiaddr]struct{}, len(addrs)) + lookup := make(map[string]struct{}, len(addrs)) for _, addr := range addrs { - lookup[addr] = struct{}{} + lookup[string(addr.Bytes())] = struct{}{} } // if there's any address that's not in the previous set // of addresses, try to connect again. If all addresses // where known previously we return the original error. for _, newAddr := range newAddrs { - if _, found := lookup[newAddr]; found { + if _, found := lookup[string(newAddr.Bytes())]; found { continue } diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index f805371cc6..ba7ba87d4b 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -27,9 +27,9 @@ type dialResponse struct { } type pendRequest struct { - req dialRequest // the original request - err *DialError // dial error accumulator - addrs map[ma.Multiaddr]struct{} // pending addr dials + req dialRequest // the original request + err *DialError // dial error accumulator + addrs map[string]struct{} // pending address to dial. The key is a multiaddr } type addrDial struct { @@ -46,7 +46,7 @@ type dialWorker struct { reqch <-chan dialRequest reqno int requests map[int]*pendRequest - pending map[ma.Multiaddr]*addrDial + pending map[string]*addrDial // pending addresses to dial. The key is a multiaddr resch chan dialResult connected bool // true when a connection has been successfully established @@ -66,7 +66,7 @@ func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest) *dialWorker { peer: p, reqch: reqch, requests: make(map[int]*pendRequest), - pending: make(map[ma.Multiaddr]*addrDial), + pending: make(map[string]*addrDial), resch: make(chan dialResult), } } @@ -108,10 +108,10 @@ loop: pr := &pendRequest{ req: req, err: &DialError{Peer: w.peer}, - addrs: make(map[ma.Multiaddr]struct{}), + addrs: make(map[string]struct{}), } for _, a := range addrs { - pr.addrs[a] = struct{}{} + pr.addrs[string(a.Bytes())] = struct{}{} } // check if any of the addrs has been successfully dialed and accumulate @@ -120,7 +120,7 @@ loop: var tojoin []*addrDial for _, a := range addrs { - ad, ok := w.pending[a] + ad, ok := w.pending[string(a.Bytes())] if !ok { todial = append(todial, a) continue @@ -135,7 +135,7 @@ loop: if ad.err != nil { // dial to this addr errored, accumulate the error pr.err.recordErr(a, ad.err) - delete(pr.addrs, a) + delete(pr.addrs, string(a.Bytes())) continue } @@ -164,7 +164,7 @@ loop: if len(todial) > 0 { for _, a := range todial { - w.pending[a] = &addrDial{addr: a, ctx: req.ctx, requests: []int{w.reqno}} + w.pending[string(a.Bytes())] = &addrDial{addr: a, ctx: req.ctx, requests: []int{w.reqno}} } w.nextDial = append(w.nextDial, todial...) @@ -177,7 +177,7 @@ loop: case <-w.triggerDial: for _, addr := range w.nextDial { // spawn the dial - ad := w.pending[addr] + ad := w.pending[string(addr.Bytes())] err := w.s.dialNextAddr(ad.ctx, w.peer, addr, w.resch) if err != nil { w.dispatchError(ad, err) @@ -192,7 +192,7 @@ loop: w.connected = true } - ad := w.pending[res.Addr] + ad := w.pending[string(res.Addr.Bytes())] if res.Conn != nil { // we got a connection, add it to the swarm @@ -247,7 +247,7 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) { // accumulate the error pr.err.recordErr(ad.addr, err) - delete(pr.addrs, ad.addr) + delete(pr.addrs, string(ad.addr.Bytes())) if len(pr.addrs) == 0 { // all addrs have erred, dispatch dial error // but first do a last one check in case an acceptable connection has landed from @@ -271,7 +271,7 @@ func (w *dialWorker) dispatchError(ad *addrDial, err error) { // it is also necessary to preserve consisent behaviour with the old dialer -- TestDialBackoff // regresses without this. if err == ErrDialBackoff { - delete(w.pending, ad.addr) + delete(w.pending, string(ad.addr.Bytes())) } } diff --git a/p2p/net/swarm/dial_worker_test.go b/p2p/net/swarm/dial_worker_test.go index 2c441106b1..4c4b0233da 100644 --- a/p2p/net/swarm/dial_worker_test.go +++ b/p2p/net/swarm/dial_worker_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -24,6 +25,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/transport/tcp" ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" ) @@ -342,3 +344,69 @@ func TestDialWorkerLoopConcurrentFailureStress(t *testing.T) { close(reqch) worker.wg.Wait() } + +func TestDialWorkerLoopAddrDedup(t *testing.T) { + s1 := makeSwarm(t) + s2 := makeSwarm(t) + defer s1.Close() + defer s2.Close() + + t1 := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000)) + t2 := ma.StringCast(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000)) + + // connCount counts the number of connection attempts made + var connCount atomic.Int64 + + // acceptAndClose accepts two tcp connections and closes them + // we need to wait for the second connection before failing because otherwise the + // address would be placed on backoff + acceptAndClose := func(a ma.Multiaddr, closech chan struct{}) { + list, err := manet.Listen(a) + if err != nil { + t.Error(err) + return + } + go func() { + conn1, err := list.Accept() + if err != nil { + return + } + connCount.Add(1) + conn2, err := list.Accept() + if err != nil { + conn1.Close() + return + } + connCount.Add(1) + conn1.Close() + conn2.Close() + }() + <-closech + list.Close() + } + closeCh := make(chan struct{}) + go acceptAndClose(t1, closeCh) + defer close(closeCh) + + s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{t1}, peerstore.PermanentAddrTTL) + + reqch := make(chan dialRequest) + resch := make(chan dialResponse, 2) + + worker := newDialWorker(s1, s2.LocalPeer(), reqch) + go worker.loop() + defer worker.wg.Wait() + defer close(reqch) + + reqch <- dialRequest{ctx: context.Background(), resch: resch} + + s1.Peerstore().ClearAddrs(s2.LocalPeer()) + s1.Peerstore().AddAddrs(s2.LocalPeer(), []ma.Multiaddr{t2}, peerstore.PermanentAddrTTL) + + reqch <- dialRequest{ctx: context.Background(), resch: resch} + require.Never(t, func() bool { return connCount.Load() > 1 }, 3*time.Second, 100*time.Millisecond) + if connCount.Load() != 1 { + t.Errorf("did expect one connection. got 0") + } + +}