diff --git a/p2p/net/swarm/addr/addr.go b/p2p/net/swarm/addr/addr.go index 1e22bae261..d9ba87216a 100644 --- a/p2p/net/swarm/addr/addr.go +++ b/p2p/net/swarm/addr/addr.go @@ -40,12 +40,16 @@ func init() { SupportedTransportProtocols = transports } -// FilterAddrs is a filter that removes certain addresses, according to filter. -// if filter returns true, the address is kept. -func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiaddr { +// FilterAddrs is a filter that removes certain addresses, according the given filters. +// if all filters return true, the address is kept. +func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr { b := make([]ma.Multiaddr, 0, len(a)) for _, addr := range a { - if filter(addr) { + good := true + for _, filter := range filters { + good = good && filter(addr) + } + if good { b = append(b, addr) } } @@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd // from a list. the addresses removed are those known NOT // to work with our network. Namely, addresses with UTP. func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr { - return FilterAddrs(a, func(m ma.Multiaddr) bool { - return AddrUsable(m, false) - }) + return FilterAddrs(a, AddrUsableFunc) +} + +func AddrUsableFunc(m ma.Multiaddr) bool { + return AddrUsable(m, false) } // AddrOverNonLocalIP returns whether the addr uses a non-local ip link diff --git a/p2p/net/swarm/addr/filter.go b/p2p/net/swarm/addr/filter.go new file mode 100644 index 0000000000..d87ba816a7 --- /dev/null +++ b/p2p/net/swarm/addr/filter.go @@ -0,0 +1,31 @@ +package addrutil + +import ( + ma "github.com/jbenet/go-multiaddr" + mafmt "github.com/whyrusleeping/mafmt" +) + +// SubtractFilter returns a filter func that filters all of the given addresses +func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool { + addrmap := make(map[string]bool) + for _, a := range addrs { + addrmap[string(a.Bytes())] = true + } + + return func(a ma.Multiaddr) bool { + return !addrmap[string(a.Bytes())] + } +} + +// IsFDCostlyTransport returns true for transports that require a new file +// descriptor per connection created +func IsFDCostlyTransport(a ma.Multiaddr) bool { + return mafmt.TCP.Matches(a) +} + +// FilterNeg returns a negated version of the passed in filter +func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool { + return func(a ma.Multiaddr) bool { + return !f(a) + } +} diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 988f7f7f01..aea0f728cc 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -2,7 +2,6 @@ package swarm import ( "net" - "sort" "sync" "testing" "time" @@ -493,38 +492,3 @@ func TestDialBackoffClears(t *testing.T) { t.Log("correctly cleared backoff") } } - -func mkAddr(t *testing.T, s string) ma.Multiaddr { - a, err := ma.NewMultiaddr(s) - if err != nil { - t.Fatal(err) - } - - return a -} - -func TestAddressSorting(t *testing.T) { - u1 := mkAddr(t, "/ip4/152.12.23.53/udp/1234/utp") - u2l := mkAddr(t, "/ip4/127.0.0.1/udp/1234/utp") - local := mkAddr(t, "/ip4/127.0.0.1/tcp/1234") - norm := mkAddr(t, "/ip4/6.5.4.3/tcp/1234") - - l := AddrList{local, u1, u2l, norm} - sort.Sort(l) - - if !l[0].Equal(u2l) { - t.Fatal("expected utp local addr to be sorted first: ", l[0]) - } - - if !l[1].Equal(u1) { - t.Fatal("expected utp addr to be sorted second") - } - - if !l[2].Equal(local) { - t.Fatal("expected tcp localhost addr thid") - } - - if !l[3].Equal(norm) { - t.Fatal("expected normal addr last") - } -} diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go new file mode 100644 index 0000000000..94ce05bb86 --- /dev/null +++ b/p2p/net/swarm/limiter.go @@ -0,0 +1,160 @@ +package swarm + +import ( + "sync" + + peer "github.com/ipfs/go-libp2p-peer" + ma "github.com/jbenet/go-multiaddr" + context "golang.org/x/net/context" + + conn "github.com/ipfs/go-libp2p/p2p/net/conn" + addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr" +) + +type dialResult struct { + Conn conn.Conn + Err error +} + +type dialJob struct { + addr ma.Multiaddr + peer peer.ID + ctx context.Context + resp chan dialResult + success bool +} + +func (dj *dialJob) cancelled() bool { + select { + case <-dj.ctx.Done(): + return true + default: + return false + } +} + +type dialLimiter struct { + rllock sync.Mutex + fdConsuming int + fdLimit int + waitingOnFd []*dialJob + + dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error) + + activePerPeer map[peer.ID]int + perPeerLimit int + waitingOnPeerLimit map[peer.ID][]*dialJob +} + +type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error) + +func newDialLimiter(df dialfunc) *dialLimiter { + return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit) +} + +func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter { + return &dialLimiter{ + fdLimit: fdl, + perPeerLimit: ppl, + waitingOnPeerLimit: make(map[peer.ID][]*dialJob), + activePerPeer: make(map[peer.ID]int), + dialFunc: df, + } +} + +func (dl *dialLimiter) finishedDial(dj *dialJob) { + dl.rllock.Lock() + defer dl.rllock.Unlock() + + if addrutil.IsFDCostlyTransport(dj.addr) { + dl.fdConsuming-- + if len(dl.waitingOnFd) > 0 { + next := dl.waitingOnFd[0] + dl.waitingOnFd = dl.waitingOnFd[1:] + if len(dl.waitingOnFd) == 0 { + dl.waitingOnFd = nil // clear out memory + } + dl.fdConsuming++ + + go dl.executeDial(next) + } + } + + // release tokens in reverse order than we take them + dl.activePerPeer[dj.peer]-- + if dl.activePerPeer[dj.peer] == 0 { + delete(dl.activePerPeer, dj.peer) + } + + waitlist := dl.waitingOnPeerLimit[dj.peer] + if !dj.success && len(waitlist) > 0 { + next := waitlist[0] + if len(waitlist) == 1 { + delete(dl.waitingOnPeerLimit, dj.peer) + } else { + dl.waitingOnPeerLimit[dj.peer] = waitlist[1:] + } + dl.activePerPeer[dj.peer]++ // just kidding, we still want this token + + // can kick this off right here, dials in this list already + // have the other tokens needed + go dl.executeDial(next) + } + +} + +// AddDialJob tries to take the needed tokens for starting the given dial job. +// If it acquires all needed tokens, it immediately starts the dial, otherwise +// it will put it on the waitlist for the requested token. +func (dl *dialLimiter) AddDialJob(dj *dialJob) { + dl.rllock.Lock() + defer dl.rllock.Unlock() + + if dl.activePerPeer[dj.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[dj.peer] + dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj) + return + } + dl.activePerPeer[dj.peer]++ + + if addrutil.IsFDCostlyTransport(dj.addr) { + if dl.fdConsuming >= dl.fdLimit { + dl.waitingOnFd = append(dl.waitingOnFd, dj) + return + } + + // take token + dl.fdConsuming++ + } + + // take second needed token and start dial! + go dl.executeDial(dj) +} + +func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { + if dl.activePerPeer[j.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[j.peer] + dl.waitingOnPeerLimit[j.peer] = append(wlist, j) + return + } + + // take second needed token and start dial! + dl.activePerPeer[j.peer]++ + go dl.executeDial(j) +} + +// executeDial calls the dialFunc, and reports the result through the response +// channel when finished. Once the response is sent it also releases all tokens +// it held during the dial. +func (dl *dialLimiter) executeDial(j *dialJob) { + defer dl.finishedDial(j) + if j.cancelled() { + return + } + + con, err := dl.dialFunc(j.ctx, j.peer, j.addr) + select { + case j.resp <- dialResult{Conn: con, Err: err}: + case <-j.ctx.Done(): + } +} diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go new file mode 100644 index 0000000000..28733c5ab5 --- /dev/null +++ b/p2p/net/swarm/limiter_test.go @@ -0,0 +1,315 @@ +package swarm + +import ( + "fmt" + "math/rand" + "strconv" + "testing" + "time" + + peer "github.com/ipfs/go-libp2p-peer" + ma "github.com/jbenet/go-multiaddr" + mafmt "github.com/whyrusleeping/mafmt" + context "golang.org/x/net/context" + + conn "github.com/ipfs/go-libp2p/p2p/net/conn" +) + +func mustAddr(t *testing.T, s string) ma.Multiaddr { + a, err := ma.NewMultiaddr(s) + if err != nil { + t.Fatal(err) + } + return a +} + +func addrWithPort(t *testing.T, p int) ma.Multiaddr { + return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p)) +} + +// in these tests I use addresses with tcp ports over a certain number to +// signify 'good' addresses that will succeed, and addresses below that number +// will fail. This lets us more easily test these different scenarios. +func tcpPortOver(a ma.Multiaddr, n int) bool { + port, err := a.ValueForProtocol(ma.P_TCP) + if err != nil { + panic(err) + } + + pnum, err := strconv.Atoi(port) + if err != nil { + panic(err) + } + + return pnum > n +} + +func tryDialAddrs(ctx context.Context, l *dialLimiter, p peer.ID, addrs []ma.Multiaddr, res chan dialResult) { + for _, a := range addrs { + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: p, + addr: a, + resp: res, + }) + } +} + +func hangDialFunc(hang chan struct{}) dialfunc { + return func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if mafmt.UTP.Matches(a) { + return conn.Conn(nil), nil + } + + if tcpPortOver(a, 10) { + return conn.Conn(nil), nil + } else { + <-hang + return nil, fmt.Errorf("test bad dial") + } + } +} + +func TestLimiterBasicDials(t *testing.T) { + hang := make(chan struct{}) + defer close(hang) + + l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4) + + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + good := addrWithPort(t, 20) + + resch := make(chan dialResult) + pid := peer.ID("testpeer") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tryDialAddrs(ctx, l, pid, bads, resch) + + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pid, + addr: good, + resp: resch, + }) + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // complete a single hung dial + hang <- struct{}{} + + select { + case r := <-resch: + if r.Err == nil { + t.Fatal("should have gotten failed dial result") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for dial completion") + } + + select { + case r := <-resch: + if r.Err != nil { + t.Fatal("expected second result to be success!") + } + case <-time.After(time.Second): + } +} + +func TestFDLimiting(t *testing.T) { + hang := make(chan struct{}) + defer close(hang) + l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5) + + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"} + good_tcp := addrWithPort(t, 20) + + ctx := context.Background() + resch := make(chan dialResult) + + // take all fd limit tokens with hang dials + for _, pid := range pids { + tryDialAddrs(ctx, l, pid, bads, resch) + } + + // these dials should work normally, but will hang because we have taken + // up all the fd limiting + for _, pid := range pids { + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pid, + addr: good_tcp, + resp: resch, + }) + } + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + pid5 := peer.ID("testpeer5") + utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") + + // This should complete immediately since utp addresses arent blocked by fd rate limiting + l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) + + select { + case res := <-resch: + if res.Err != nil { + t.Fatal("should have gotten successful response") + } + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for utp addr success") + } +} + +func TestTokenRedistribution(t *testing.T) { + hangchs := make(map[peer.ID]chan struct{}) + df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if tcpPortOver(a, 10) { + return (conn.Conn)(nil), nil + } else { + <-hangchs[p] + return nil, fmt.Errorf("test bad dial") + } + } + l := newDialLimiterWithParams(df, 8, 4) + + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + pids := []peer.ID{"testpeer1", "testpeer2"} + + ctx := context.Background() + resch := make(chan dialResult) + + // take all fd limit tokens with hang dials + for _, pid := range pids { + hangchs[pid] = make(chan struct{}) + tryDialAddrs(ctx, l, pid, bads, resch) + } + + good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001") + + // add a good dial job for peer 1 + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pids[1], + addr: good, + resp: resch, + }) + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // unblock one dial for peer 0 + hangchs[pids[0]] <- struct{}{} + + select { + case res := <-resch: + if res.Err == nil { + t.Fatal("should have only been a failure here") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("expected a dial failure here") + } + + select { + case <-resch: + t.Fatal("no more dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // add a bad dial job to peer 0 to fill their rate limiter + // and test that more dials for this peer won't interfere with peer 1's successful dial incoming + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pids[0], + addr: addrWithPort(t, 7), + resp: resch, + }) + + hangchs[pids[1]] <- struct{}{} + + // now one failed dial from peer 1 should get through and fail + // which will in turn unblock the successful dial on peer 1 + select { + case res := <-resch: + if res.Err == nil { + t.Fatal("should have only been a failure here") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("expected a dial failure here") + } + + select { + case res := <-resch: + if res.Err != nil { + t.Fatal("should have succeeded!") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("should have gotten successful dial") + } +} + +func TestStressLimiter(t *testing.T) { + df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if tcpPortOver(a, 1000) { + return conn.Conn(nil), nil + } else { + time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100))) + return nil, fmt.Errorf("test bad dial") + } + } + + l := newDialLimiterWithParams(df, 20, 5) + + var bads []ma.Multiaddr + for i := 0; i < 100; i++ { + bads = append(bads, addrWithPort(t, i)) + } + + addresses := append(bads, addrWithPort(t, 2000)) + success := make(chan struct{}) + + for i := 0; i < 20; i++ { + go func(id peer.ID) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp := make(chan dialResult) + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + for _, i := range rand.Perm(len(addresses)) { + l.AddDialJob(&dialJob{ + addr: addresses[i], + ctx: ctx, + peer: id, + resp: resp, + }) + } + + for res := range resp { + if res.Err == nil { + success <- struct{}{} + return + } + } + }(peer.ID(fmt.Sprintf("testpeer%d", i))) + } + + for i := 0; i < 20; i++ { + select { + case <-success: + case <-time.After(time.Second * 5): + t.Fatal("expected a success within five seconds") + } + } +} diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 74e43dde44..54dd903056 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -90,6 +90,8 @@ type Swarm struct { proc goprocess.Process ctx context.Context bwc metrics.Reporter + + limiter *dialLimiter } // NewSwarm constructs a Swarm, with a Chan. @@ -122,6 +124,8 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, dialer: conn.NewDialer(local, peers.PrivKey(local), wrap), } + s.limiter = newDialLimiter(s.dialAddr) + // configure Swarm s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) s.SetConnHandler(nil) // make sure to setup our own conn handler. @@ -155,6 +159,7 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { } listenAddrs = filtered } + return listenAddrs, nil } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index aa4a3514b4..1100ca544d 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -1,10 +1,8 @@ package swarm import ( - "bytes" "errors" "fmt" - "sort" "sync" "time" @@ -13,7 +11,6 @@ import ( conn "github.com/ipfs/go-libp2p/p2p/net/conn" addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr" ma "github.com/jbenet/go-multiaddr" - "github.com/jbenet/go-multiaddr-net" context "golang.org/x/net/context" ) @@ -42,6 +39,9 @@ const dialAttempts = 1 // number of concurrent outbound dials over transports that consume file descriptors const concurrentFdDials = 160 +// number of concurrent outbound dials to make per peer +const defaultPerPeerRateLimit = 8 + // DialTimeout is the amount of time each dial attempt has. We can think about making // this larger down the road, or putting more granular timeouts (i.e. within each // subcomponent of Dial) @@ -319,32 +319,40 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") } - // get remote peer addrs - remoteAddrs := s.peers.Addrs(p) - // make sure we can use the addresses. - remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs) - // drop out any addrs that would just dial ourselves. use ListenAddresses - // as that is a more authoritative view than localAddrs. ila, _ := s.InterfaceListenAddresses() - remoteAddrs = addrutil.Subtract(remoteAddrs, ila) - remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local)) - - log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs) - if len(remoteAddrs) == 0 { - err := errors.New("peer has no addresses") - logdial["error"] = err - return nil, err - } - - remoteAddrs = s.filterAddrs(remoteAddrs) - if len(remoteAddrs) == 0 { - err := errors.New("all adresses for peer have been filtered out") - logdial["error"] = err - return nil, err + subtract_filter := addrutil.SubtractFilter(append(ila, s.peers.Addrs(s.local)...)...) + + // get live channel of addresses for peer, filtered by the given filters + /* + remoteAddrChan := s.peers.AddrsChan(ctx, p, + addrutil.AddrUsableFilter, + subtract_filter, + s.Filters.AddrBlocked) + */ + + ////// + /* + This code is temporary, the peerstore can currently provide + a channel as an interface for receiving addresses, but more thought + needs to be put into the execution. For now, this allows us to use + the improved rate limiter, while maintaining the outward behaviour + that we previously had (halting a dial when we run out of addrs) + */ + paddrs := s.peers.Addrs(p) + good_addrs := addrutil.FilterAddrs(paddrs, + addrutil.AddrUsableFunc, + subtract_filter, + addrutil.FilterNeg(s.Filters.AddrBlocked), + ) + remoteAddrChan := make(chan ma.Multiaddr, len(good_addrs)) + for _, a := range good_addrs { + remoteAddrChan <- a } + close(remoteAddrChan) + ///////// // try to get a connection to any addr - connC, err := s.dialAddrs(ctx, p, remoteAddrs) + connC, err := s.dialAddrs(ctx, p, remoteAddrChan) if err != nil { logdial["error"] = err return nil, err @@ -364,98 +372,64 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return swarmC, nil } -func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { - - // sort addresses so preferred addresses are dialed sooner - sort.Sort(AddrList(remoteAddrs)) - - // try to connect to one of the peer's known addresses. - // we dial concurrently to each of the addresses, which: - // * makes the process faster overall - // * attempts to get the fastest connection available. - // * mitigates the waste of trying bad addresses +func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (conn.Conn, error) { log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs) ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel work when we exit func - conns := make(chan conn.Conn) - errs := make(chan error, len(remoteAddrs)) + // use a single response type instead of errs and conns, reduces complexity *a ton* + respch := make(chan dialResult) - // dialSingleAddr is used in the rate-limited async thing below. - dialSingleAddr := func(addr ma.Multiaddr) { - // rebind chans in scope so we can nil them out easily - connsout := conns - errsout := errs + defaultDialFail := fmt.Errorf("failed to dial %s (default failure)", p) + exitErr := defaultDialFail - connC, err := s.dialAddr(ctx, p, addr) - if err != nil { - connsout = nil - } else if connC == nil { - // NOTE: this really should never happen - log.Errorf("failed to dial %s %s and got no error!", p, addr) - err = fmt.Errorf("failed to dial %s %s", p, addr) - connsout = nil - } else { - errsout = nil - } - - // check parent still wants our results + var active int + for { select { - case <-ctx.Done(): - if connC != nil { - connC.Close() + case addr, ok := <-remoteAddrs: + if !ok { + remoteAddrs = nil + if active == 0 { + return nil, exitErr + } + continue } - case errsout <- err: - case connsout <- connC: - } - } - // this whole thing is in a goroutine so we can use foundConn - // to end early. - go func() { - limiter := make(chan struct{}, 8) - for _, addr := range remoteAddrs { - // returns whatever ratelimiting is acceptable for workerAddr. - // may not rate limit at all. - rl := s.addrDialRateLimit(addr) - select { - case <-ctx.Done(): // our context was cancelled - return - case rl <- struct{}{}: - // take the token, move on + s.limitedDial(ctx, p, addr, respch) + active++ + case <-ctx.Done(): + if exitErr == defaultDialFail { + exitErr = ctx.Err() } - - select { - case <-ctx.Done(): // our context was cancelled - return - case limiter <- struct{}{}: - // take the token, move on + return nil, exitErr + case resp := <-respch: + active-- + if resp.Err != nil { + log.Info("got error on dial: ", resp.Err) + // Errors are normal, lots of dials will fail + exitErr = resp.Err + + if remoteAddrs == nil && active == 0 { + return nil, exitErr + } + } else if resp.Conn != nil { + return resp.Conn, nil } - - go func(rlc <-chan struct{}, a ma.Multiaddr) { - dialSingleAddr(a) - <-limiter - <-rlc - }(rl, addr) - } - }() - - // wair for the results. - exitErr := fmt.Errorf("failed to dial %s", p) - for range remoteAddrs { - select { - case exitErr = <-errs: // - log.Debug("dial error: ", exitErr) - case connC := <-conns: - // take the first + return asap - return connC, nil - case <-ctx.Done(): - // break out and return error - break } } - return nil, exitErr +} + +// limitedDial will start a dial to the given peer when +// it is able, respecting the various different types of rate +// limiting that occur without using extra goroutines per addr +func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) { + s.limiter.AddDialJob(&dialJob{ + addr: a, + peer: p, + resp: resp, + ctx: ctx, + }) } func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { @@ -485,16 +459,6 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (con return connC, nil } -func (s *Swarm) filterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - var out []ma.Multiaddr - for _, a := range addrs { - if !s.Filters.AddrBlocked(a) { - out = append(out, a) - } - } - return out -} - // dialConnSetup is the setup logic for a connection from the dial side. it // needs to add the Conn to the StreamSwarm, then run newConnSetup func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) { @@ -514,72 +478,3 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error return swarmC, err } - -// addrDialRateLimit returns a ratelimiting channel for dialing transport -// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited. -func (s *Swarm) addrDialRateLimit(a ma.Multiaddr) chan struct{} { - if isFDCostlyTransport(a) { - return s.fdRateLimit - } - - // do not rate limit it at all - return make(chan struct{}, 1) -} - -func isFDCostlyTransport(a ma.Multiaddr) bool { - return isTCPMultiaddr(a) -} - -func isTCPMultiaddr(a ma.Multiaddr) bool { - p := a.Protocols() - return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" -} - -type AddrList []ma.Multiaddr - -func (al AddrList) Len() int { - return len(al) -} - -func (al AddrList) Swap(i, j int) { - al[i], al[j] = al[j], al[i] -} - -func (al AddrList) Less(i, j int) bool { - a := al[i] - b := al[j] - - // dial localhost addresses next, they should fail immediately - lba := manet.IsIPLoopback(a) - lbb := manet.IsIPLoopback(b) - if lba { - if !lbb { - return true - } - } - - // dial utp and similar 'non-fd-consuming' addresses first - fda := isFDCostlyTransport(a) - fdb := isFDCostlyTransport(b) - if !fda { - if fdb { - return true - } - - // if neither consume fd's, assume equal ordering - return false - } - - // if 'b' doesnt take a file descriptor - if !fdb { - return false - } - - // if 'b' is loopback and both take file descriptors - if lbb { - return false - } - - // for the rest, just sort by bytes - return bytes.Compare(a.Bytes(), b.Bytes()) > 0 -} diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 6e9f121fcd..f5e454956c 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -303,7 +303,7 @@ func TestAddrBlocking(t *testing.T) { swarms := makeSwarms(ctx, t, 2) swarms[0].SetConnHandler(func(conn *Conn) { - t.Fatalf("no connections should happen! -- %s", conn) + t.Errorf("no connections should happen! -- %s", conn) }) _, block, err := net.ParseCIDR("127.0.0.1/8") diff --git a/package.json b/package.json index 5af729700c..70d1bf2952 100644 --- a/package.json +++ b/package.json @@ -1,4 +1,5 @@ { + "name":"go-libp2p", "author": "whyrusleeping", "bugs": { "url": "https://github.com/ipfs/go-libp2p"