From d899b07f178191693850ddb7d3e824da535036bf Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 14 Apr 2016 16:43:42 -0700 Subject: [PATCH 1/6] Refactor the swarm dialer --- p2p/net/swarm/addr/addr.go | 16 +- p2p/net/swarm/addr/filter.go | 27 ++++ p2p/net/swarm/dial_test.go | 36 ----- p2p/net/swarm/limiter.go | 136 +++++++++++++++++ p2p/net/swarm/limiter_test.go | 265 ++++++++++++++++++++++++++++++++++ p2p/net/swarm/swarm.go | 5 + p2p/net/swarm/swarm_dial.go | 253 +++++++++----------------------- p2p/net/swarm/swarm_test.go | 2 +- 8 files changed, 516 insertions(+), 224 deletions(-) create mode 100644 p2p/net/swarm/addr/filter.go create mode 100644 p2p/net/swarm/limiter.go create mode 100644 p2p/net/swarm/limiter_test.go diff --git a/p2p/net/swarm/addr/addr.go b/p2p/net/swarm/addr/addr.go index 1e22bae261..8c0c94b2d6 100644 --- a/p2p/net/swarm/addr/addr.go +++ b/p2p/net/swarm/addr/addr.go @@ -42,10 +42,14 @@ func init() { // FilterAddrs is a filter that removes certain addresses, according to filter. // if filter returns true, the address is kept. -func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiaddr { +func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr { b := make([]ma.Multiaddr, 0, len(a)) for _, addr := range a { - if filter(addr) { + good := true + for _, filter := range filters { + good = good && filter(addr) + } + if good { b = append(b, addr) } } @@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd // from a list. the addresses removed are those known NOT // to work with our network. Namely, addresses with UTP. func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr { - return FilterAddrs(a, func(m ma.Multiaddr) bool { - return AddrUsable(m, false) - }) + return FilterAddrs(a, AddrUsableFunc) +} + +func AddrUsableFunc(m ma.Multiaddr) bool { + return AddrUsable(m, false) } // AddrOverNonLocalIP returns whether the addr uses a non-local ip link diff --git a/p2p/net/swarm/addr/filter.go b/p2p/net/swarm/addr/filter.go new file mode 100644 index 0000000000..c67e1c624e --- /dev/null +++ b/p2p/net/swarm/addr/filter.go @@ -0,0 +1,27 @@ +package addrutil + +import ( + ma "github.com/jbenet/go-multiaddr" + mafmt "github.com/whyrusleeping/mafmt" +) + +func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool { + addrmap := make(map[string]bool) + for _, a := range addrs { + addrmap[string(a.Bytes())] = true + } + + return func(a ma.Multiaddr) bool { + return !addrmap[string(a.Bytes())] + } +} + +func IsFDCostlyTransport(a ma.Multiaddr) bool { + return mafmt.TCP.Matches(a) +} + +func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool { + return func(a ma.Multiaddr) bool { + return !f(a) + } +} diff --git a/p2p/net/swarm/dial_test.go b/p2p/net/swarm/dial_test.go index 988f7f7f01..aea0f728cc 100644 --- a/p2p/net/swarm/dial_test.go +++ b/p2p/net/swarm/dial_test.go @@ -2,7 +2,6 @@ package swarm import ( "net" - "sort" "sync" "testing" "time" @@ -493,38 +492,3 @@ func TestDialBackoffClears(t *testing.T) { t.Log("correctly cleared backoff") } } - -func mkAddr(t *testing.T, s string) ma.Multiaddr { - a, err := ma.NewMultiaddr(s) - if err != nil { - t.Fatal(err) - } - - return a -} - -func TestAddressSorting(t *testing.T) { - u1 := mkAddr(t, "/ip4/152.12.23.53/udp/1234/utp") - u2l := mkAddr(t, "/ip4/127.0.0.1/udp/1234/utp") - local := mkAddr(t, "/ip4/127.0.0.1/tcp/1234") - norm := mkAddr(t, "/ip4/6.5.4.3/tcp/1234") - - l := AddrList{local, u1, u2l, norm} - sort.Sort(l) - - if !l[0].Equal(u2l) { - t.Fatal("expected utp local addr to be sorted first: ", l[0]) - } - - if !l[1].Equal(u1) { - t.Fatal("expected utp addr to be sorted second") - } - - if !l[2].Equal(local) { - t.Fatal("expected tcp localhost addr thid") - } - - if !l[3].Equal(norm) { - t.Fatal("expected normal addr last") - } -} diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go new file mode 100644 index 0000000000..fe8162d7f1 --- /dev/null +++ b/p2p/net/swarm/limiter.go @@ -0,0 +1,136 @@ +package swarm + +import ( + "sync" + + peer "github.com/ipfs/go-libp2p-peer" + ma "github.com/jbenet/go-multiaddr" + context "golang.org/x/net/context" + + conn "github.com/ipfs/go-libp2p/p2p/net/conn" + addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr" +) + +type dialResult struct { + Conn conn.Conn + Err error +} + +type dialJob struct { + addr ma.Multiaddr + peer peer.ID + ctx context.Context + resp chan dialResult + success bool +} + +type dialLimiter struct { + rllock sync.Mutex + fdConsuming int + fdLimit int + waitingOnFd []*dialJob + + dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error) + + activePerPeer map[peer.ID]int + perPeerLimit int + waitingOnPeerLimit map[peer.ID][]*dialJob +} + +type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error) + +func newDialLimiter(df dialfunc) *dialLimiter { + return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit) +} + +func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter { + return &dialLimiter{ + fdLimit: fdl, + perPeerLimit: ppl, + waitingOnPeerLimit: make(map[peer.ID][]*dialJob), + activePerPeer: make(map[peer.ID]int), + dialFunc: df, + } +} + +func (dl *dialLimiter) finishedDial(dj *dialJob) { + dl.rllock.Lock() + defer dl.rllock.Unlock() + + // release tokens in reverse order than we take them + dl.activePerPeer[dj.peer]-- + if dl.activePerPeer[dj.peer] == 0 { + delete(dl.activePerPeer, dj.peer) + } + + waitlist := dl.waitingOnPeerLimit[dj.peer] + if !dj.success && len(waitlist) > 0 { + next := waitlist[0] + if len(waitlist) == 1 { + delete(dl.waitingOnPeerLimit, dj.peer) + } else { + dl.waitingOnPeerLimit[dj.peer] = waitlist[1:] + } + dl.activePerPeer[dj.peer]++ // just kidding, we still want this token + + // can kick this off right here, dials in this list already + // have the other tokens needed + go dl.executeDial(next) + } + + if addrutil.IsFDCostlyTransport(dj.addr) { + dl.fdConsuming-- + if len(dl.waitingOnFd) > 0 { + next := dl.waitingOnFd[0] + dl.waitingOnFd = dl.waitingOnFd[1:] + dl.fdConsuming++ + + // now, attempt to take the 'per peer limit' token + dl.schedulePerPeerDial(next) + } + } +} + +// AddDialJob tries to take the needed tokens for starting the given dial job. +// If it acquires all needed tokens, it immediately starts the dial, otherwise +// it will put it on the waitlist for the requested token. +func (dl *dialLimiter) AddDialJob(dj *dialJob) { + dl.rllock.Lock() + defer dl.rllock.Unlock() + + if addrutil.IsFDCostlyTransport(dj.addr) { + if dl.fdConsuming >= dl.fdLimit { + dl.waitingOnFd = append(dl.waitingOnFd, dj) + return + } + + // take token + dl.fdConsuming++ + } + + dl.schedulePerPeerDial(dj) +} + +// executeDial calls the dialFunc, and reports the result through the response +// channel when finished. Once the response is sent it also releases all tokens +// it held during the dial. +func (dl *dialLimiter) executeDial(j *dialJob) { + defer dl.finishedDial(j) + con, err := dl.dialFunc(j.ctx, j.peer, j.addr) + select { + case j.resp <- dialResult{Conn: con, Err: err}: + case <-j.ctx.Done(): + } +} + +func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { + if dl.activePerPeer[j.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[j.peer] + dl.waitingOnPeerLimit[j.peer] = append(wlist, j) + return + } + + // take second needed token and start dial! + dl.activePerPeer[j.peer]++ + go dl.executeDial(j) +} diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go new file mode 100644 index 0000000000..b5bbde847e --- /dev/null +++ b/p2p/net/swarm/limiter_test.go @@ -0,0 +1,265 @@ +package swarm + +import ( + "fmt" + "strconv" + "testing" + "time" + + peer "github.com/ipfs/go-libp2p-peer" + ma "github.com/jbenet/go-multiaddr" + mafmt "github.com/whyrusleeping/mafmt" + context "golang.org/x/net/context" + + conn "github.com/ipfs/go-libp2p/p2p/net/conn" +) + +func mustAddr(t *testing.T, s string) ma.Multiaddr { + a, err := ma.NewMultiaddr(s) + if err != nil { + t.Fatal(err) + } + return a +} + +func addrWithPort(t *testing.T, p int) ma.Multiaddr { + return mustAddr(t, fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", p)) +} + +// in these tests I use addresses with tcp ports over a certain number to +// signify 'good' addresses that will succeed, and addresses below that number +// will fail. This lets us more easily test these different scenarios. +func tcpPortOver(a ma.Multiaddr, n int) bool { + port, err := a.ValueForProtocol(ma.P_TCP) + if err != nil { + panic(err) + } + + pnum, err := strconv.Atoi(port) + if err != nil { + panic(err) + } + + return pnum > n +} + +func tryDialAddrs(ctx context.Context, l *dialLimiter, p peer.ID, addrs []ma.Multiaddr, res chan dialResult) { + for _, a := range addrs { + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: p, + addr: a, + resp: res, + }) + } +} + +func hangDialFunc(hang chan struct{}) dialfunc { + return func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if mafmt.UTP.Matches(a) { + return conn.Conn(nil), nil + } + + if tcpPortOver(a, 10) { + return conn.Conn(nil), nil + } else { + <-hang + return nil, fmt.Errorf("test bad dial") + } + } +} + +func TestLimiterBasicDials(t *testing.T) { + hang := make(chan struct{}) + defer close(hang) + + l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4) + + bads := []ma.Multiaddr{ + addrWithPort(t, 1), + addrWithPort(t, 2), + addrWithPort(t, 3), + addrWithPort(t, 4), + } + + good := addrWithPort(t, 20) + + resch := make(chan dialResult) + pid := peer.ID("testpeer") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tryDialAddrs(ctx, l, pid, bads, resch) + + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pid, + addr: good, + resp: resch, + }) + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // complete a single hung dial + hang <- struct{}{} + + select { + case r := <-resch: + if r.Err == nil { + t.Fatal("should have gotten failed dial result") + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for dial completion") + } + + select { + case r := <-resch: + if r.Err != nil { + t.Fatal("expected second result to be success!") + } + case <-time.After(time.Second): + } +} + +func TestFDLimiting(t *testing.T) { + hang := make(chan struct{}) + defer close(hang) + l := newDialLimiterWithParams(hangDialFunc(hang), 16, 5) + + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + pids := []peer.ID{"testpeer1", "testpeer2", "testpeer3", "testpeer4"} + good_tcp := addrWithPort(t, 20) + + ctx := context.Background() + resch := make(chan dialResult) + + // take all fd limit tokens with hang dials + for _, pid := range pids { + tryDialAddrs(ctx, l, pid, bads, resch) + } + + // these dials should work normally, but will hang because we have taken + // up all the fd limiting + for _, pid := range pids { + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pid, + addr: good_tcp, + resp: resch, + }) + } + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + pid5 := peer.ID("testpeer5") + utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") + + l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) + + select { + case res := <-resch: + if res.Err != nil { + t.Fatal("should have gotten successful response") + } + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for utp addr success") + } +} + +func TestTokenRedistribution(t *testing.T) { + hangchs := make(map[peer.ID]chan struct{}) + df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if tcpPortOver(a, 10) { + return (conn.Conn)(nil), nil + } else { + <-hangchs[p] + return nil, fmt.Errorf("test bad dial") + } + } + l := newDialLimiterWithParams(df, 8, 4) + + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} + pids := []peer.ID{"testpeer1", "testpeer2"} + + ctx := context.Background() + resch := make(chan dialResult) + + // take all fd limit tokens with hang dials + for _, pid := range pids { + hangchs[pid] = make(chan struct{}) + tryDialAddrs(ctx, l, pid, bads, resch) + } + + good := mustAddr(t, "/ip4/127.0.0.1/tcp/1001") + + // add a good dial job for peer 1 + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pids[1], + addr: good, + resp: resch, + }) + + select { + case <-resch: + t.Fatal("no dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // unblock one dial for peer 0 + hangchs[pids[0]] <- struct{}{} + + select { + case res := <-resch: + if res.Err == nil { + t.Fatal("should have only been a failure here") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("expected a dial failure here") + } + + select { + case <-resch: + t.Fatal("no more dials should have completed!") + case <-time.After(time.Millisecond * 100): + } + + // add a bad dial job to peer 0 to fill their rate limiter + // and test that more dials for this peer won't interfere with peer 1's successful dial incoming + l.AddDialJob(&dialJob{ + ctx: ctx, + peer: pids[0], + addr: addrWithPort(t, 7), + resp: resch, + }) + + hangchs[pids[1]] <- struct{}{} + + // now one failed dial from peer 1 should get through and fail + // which will in turn unblock the successful dial on peer 1 + select { + case res := <-resch: + if res.Err == nil { + t.Fatal("should have only been a failure here") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("expected a dial failure here") + } + + select { + case res := <-resch: + if res.Err != nil { + t.Fatal("should have succeeded!") + } + case <-time.After(time.Millisecond * 100): + t.Fatal("should have gotten successful dial") + } +} diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 74e43dde44..54dd903056 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -90,6 +90,8 @@ type Swarm struct { proc goprocess.Process ctx context.Context bwc metrics.Reporter + + limiter *dialLimiter } // NewSwarm constructs a Swarm, with a Chan. @@ -122,6 +124,8 @@ func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, dialer: conn.NewDialer(local, peers.PrivKey(local), wrap), } + s.limiter = newDialLimiter(s.dialAddr) + // configure Swarm s.proc = goprocessctx.WithContextAndTeardown(ctx, s.teardown) s.SetConnHandler(nil) // make sure to setup our own conn handler. @@ -155,6 +159,7 @@ func filterAddrs(listenAddrs []ma.Multiaddr) ([]ma.Multiaddr, error) { } listenAddrs = filtered } + return listenAddrs, nil } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index aa4a3514b4..c1b3a73922 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -1,10 +1,8 @@ package swarm import ( - "bytes" "errors" "fmt" - "sort" "sync" "time" @@ -13,7 +11,6 @@ import ( conn "github.com/ipfs/go-libp2p/p2p/net/conn" addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr" ma "github.com/jbenet/go-multiaddr" - "github.com/jbenet/go-multiaddr-net" context "golang.org/x/net/context" ) @@ -42,6 +39,9 @@ const dialAttempts = 1 // number of concurrent outbound dials over transports that consume file descriptors const concurrentFdDials = 160 +// number of concurrent outbound dials to make per peer +const defaultPerPeerRateLimit = 8 + // DialTimeout is the amount of time each dial attempt has. We can think about making // this larger down the road, or putting more granular timeouts (i.e. within each // subcomponent of Dial) @@ -319,32 +319,34 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.") } - // get remote peer addrs - remoteAddrs := s.peers.Addrs(p) - // make sure we can use the addresses. - remoteAddrs = addrutil.FilterUsableAddrs(remoteAddrs) - // drop out any addrs that would just dial ourselves. use ListenAddresses - // as that is a more authoritative view than localAddrs. ila, _ := s.InterfaceListenAddresses() - remoteAddrs = addrutil.Subtract(remoteAddrs, ila) - remoteAddrs = addrutil.Subtract(remoteAddrs, s.peers.Addrs(s.local)) - - log.Debugf("%s swarm dialing %s -- local:%s remote:%s", s.local, p, s.ListenAddresses(), remoteAddrs) - if len(remoteAddrs) == 0 { - err := errors.New("peer has no addresses") - logdial["error"] = err - return nil, err - } - - remoteAddrs = s.filterAddrs(remoteAddrs) - if len(remoteAddrs) == 0 { - err := errors.New("all adresses for peer have been filtered out") - logdial["error"] = err - return nil, err + subtract_filter := addrutil.SubtractFilter(append(ila, s.peers.Addrs(s.local)...)...) + + // get live channel of addresses for peer, filtered by the given filters + /* + remoteAddrChan := s.peers.AddrsChan(ctx, p, + addrutil.AddrUsableFilter, + subtract_filter, + s.Filters.AddrBlocked) + */ + + ////// TEMP UNTIL PEERSTORE GETS UPGRADED + // Ref: https://github.com/ipfs/go-libp2p-peer/pull/1 + paddrs := s.peers.Addrs(p) + good_addrs := addrutil.FilterAddrs(paddrs, + addrutil.AddrUsableFunc, + subtract_filter, + addrutil.FilterNeg(s.Filters.AddrBlocked), + ) + remoteAddrChan := make(chan ma.Multiaddr, len(good_addrs)) + for _, a := range good_addrs { + remoteAddrChan <- a } + close(remoteAddrChan) + ///////// // try to get a connection to any addr - connC, err := s.dialAddrs(ctx, p, remoteAddrs) + connC, err := s.dialAddrs(ctx, p, remoteAddrChan) if err != nil { logdial["error"] = err return nil, err @@ -364,98 +366,64 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { return swarmC, nil } -func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs []ma.Multiaddr) (conn.Conn, error) { - - // sort addresses so preferred addresses are dialed sooner - sort.Sort(AddrList(remoteAddrs)) - - // try to connect to one of the peer's known addresses. - // we dial concurrently to each of the addresses, which: - // * makes the process faster overall - // * attempts to get the fastest connection available. - // * mitigates the waste of trying bad addresses +func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (conn.Conn, error) { log.Debugf("%s swarm dialing %s %s", s.local, p, remoteAddrs) ctx, cancel := context.WithCancel(ctx) defer cancel() // cancel work when we exit func - conns := make(chan conn.Conn) - errs := make(chan error, len(remoteAddrs)) + // use a single response type instead of errs and conns, reduces complexity *a ton* + respch := make(chan dialResult) - // dialSingleAddr is used in the rate-limited async thing below. - dialSingleAddr := func(addr ma.Multiaddr) { - // rebind chans in scope so we can nil them out easily - connsout := conns - errsout := errs + defaultDialFail := fmt.Errorf("failed to dial %s (default failure)", p) + exitErr := defaultDialFail - connC, err := s.dialAddr(ctx, p, addr) - if err != nil { - connsout = nil - } else if connC == nil { - // NOTE: this really should never happen - log.Errorf("failed to dial %s %s and got no error!", p, addr) - err = fmt.Errorf("failed to dial %s %s", p, addr) - connsout = nil - } else { - errsout = nil - } - - // check parent still wants our results + var active int + for { select { - case <-ctx.Done(): - if connC != nil { - connC.Close() + case addr, ok := <-remoteAddrs: + if !ok { + remoteAddrs = nil + if active == 0 { + return nil, exitErr + } + continue } - case errsout <- err: - case connsout <- connC: - } - } - // this whole thing is in a goroutine so we can use foundConn - // to end early. - go func() { - limiter := make(chan struct{}, 8) - for _, addr := range remoteAddrs { - // returns whatever ratelimiting is acceptable for workerAddr. - // may not rate limit at all. - rl := s.addrDialRateLimit(addr) - select { - case <-ctx.Done(): // our context was cancelled - return - case rl <- struct{}{}: - // take the token, move on + // limitedDial will start a dial to the given peer when + // it is able, respecting the various different types of rate + // limiting that occur without using extra goroutines per addr + s.limitedDial(ctx, p, addr, respch) + active++ + case <-ctx.Done(): + if exitErr == defaultDialFail { + exitErr = ctx.Err() } - - select { - case <-ctx.Done(): // our context was cancelled - return - case limiter <- struct{}{}: - // take the token, move on + return nil, exitErr + case resp := <-respch: + active-- + if resp.Err != nil { + log.Error("got error on dial: ", resp.Err) + // Errors are normal, lots of dials will fail + exitErr = resp.Err + + if remoteAddrs == nil && active == 0 { + return nil, exitErr + } + } else if resp.Conn != nil { + return resp.Conn, nil } - - go func(rlc <-chan struct{}, a ma.Multiaddr) { - dialSingleAddr(a) - <-limiter - <-rlc - }(rl, addr) - } - }() - - // wair for the results. - exitErr := fmt.Errorf("failed to dial %s", p) - for range remoteAddrs { - select { - case exitErr = <-errs: // - log.Debug("dial error: ", exitErr) - case connC := <-conns: - // take the first + return asap - return connC, nil - case <-ctx.Done(): - // break out and return error - break } } - return nil, exitErr +} + +func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) { + s.limiter.AddDialJob(&dialJob{ + addr: a, + peer: p, + resp: resp, + ctx: ctx, + }) } func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (conn.Conn, error) { @@ -485,16 +453,6 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (con return connC, nil } -func (s *Swarm) filterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { - var out []ma.Multiaddr - for _, a := range addrs { - if !s.Filters.AddrBlocked(a) { - out = append(out, a) - } - } - return out -} - // dialConnSetup is the setup logic for a connection from the dial side. it // needs to add the Conn to the StreamSwarm, then run newConnSetup func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error) { @@ -514,72 +472,3 @@ func dialConnSetup(ctx context.Context, s *Swarm, connC conn.Conn) (*Conn, error return swarmC, err } - -// addrDialRateLimit returns a ratelimiting channel for dialing transport -// addrs like a. for example, tcp is fd-ratelimited. utp is not ratelimited. -func (s *Swarm) addrDialRateLimit(a ma.Multiaddr) chan struct{} { - if isFDCostlyTransport(a) { - return s.fdRateLimit - } - - // do not rate limit it at all - return make(chan struct{}, 1) -} - -func isFDCostlyTransport(a ma.Multiaddr) bool { - return isTCPMultiaddr(a) -} - -func isTCPMultiaddr(a ma.Multiaddr) bool { - p := a.Protocols() - return len(p) == 2 && (p[0].Name == "ip4" || p[0].Name == "ip6") && p[1].Name == "tcp" -} - -type AddrList []ma.Multiaddr - -func (al AddrList) Len() int { - return len(al) -} - -func (al AddrList) Swap(i, j int) { - al[i], al[j] = al[j], al[i] -} - -func (al AddrList) Less(i, j int) bool { - a := al[i] - b := al[j] - - // dial localhost addresses next, they should fail immediately - lba := manet.IsIPLoopback(a) - lbb := manet.IsIPLoopback(b) - if lba { - if !lbb { - return true - } - } - - // dial utp and similar 'non-fd-consuming' addresses first - fda := isFDCostlyTransport(a) - fdb := isFDCostlyTransport(b) - if !fda { - if fdb { - return true - } - - // if neither consume fd's, assume equal ordering - return false - } - - // if 'b' doesnt take a file descriptor - if !fdb { - return false - } - - // if 'b' is loopback and both take file descriptors - if lbb { - return false - } - - // for the rest, just sort by bytes - return bytes.Compare(a.Bytes(), b.Bytes()) > 0 -} diff --git a/p2p/net/swarm/swarm_test.go b/p2p/net/swarm/swarm_test.go index 6e9f121fcd..f5e454956c 100644 --- a/p2p/net/swarm/swarm_test.go +++ b/p2p/net/swarm/swarm_test.go @@ -303,7 +303,7 @@ func TestAddrBlocking(t *testing.T) { swarms := makeSwarms(ctx, t, 2) swarms[0].SetConnHandler(func(conn *Conn) { - t.Fatalf("no connections should happen! -- %s", conn) + t.Errorf("no connections should happen! -- %s", conn) }) _, block, err := net.ParseCIDR("127.0.0.1/8") From b6f19a5591fe987f8296cc6dd321447eeb87c0e7 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 6 May 2016 12:39:08 -0700 Subject: [PATCH 2/6] don't execute cancelled jobs --- p2p/net/swarm/limiter.go | 13 +++++++ p2p/net/swarm/limiter_test.go | 64 +++++++++++++++++++++++++++++++---- 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go index fe8162d7f1..7835fe5729 100644 --- a/p2p/net/swarm/limiter.go +++ b/p2p/net/swarm/limiter.go @@ -24,6 +24,15 @@ type dialJob struct { success bool } +func (dj *dialJob) cancelled() bool { + select { + case <-dj.ctx.Done(): + return true + default: + return false + } +} + type dialLimiter struct { rllock sync.Mutex fdConsuming int @@ -116,6 +125,10 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { // it held during the dial. func (dl *dialLimiter) executeDial(j *dialJob) { defer dl.finishedDial(j) + if j.cancelled() { + return + } + con, err := dl.dialFunc(j.ctx, j.peer, j.addr) select { case j.resp <- dialResult{Conn: con, Err: err}: diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go index b5bbde847e..28733c5ab5 100644 --- a/p2p/net/swarm/limiter_test.go +++ b/p2p/net/swarm/limiter_test.go @@ -2,6 +2,7 @@ package swarm import ( "fmt" + "math/rand" "strconv" "testing" "time" @@ -75,13 +76,7 @@ func TestLimiterBasicDials(t *testing.T) { l := newDialLimiterWithParams(hangDialFunc(hang), concurrentFdDials, 4) - bads := []ma.Multiaddr{ - addrWithPort(t, 1), - addrWithPort(t, 2), - addrWithPort(t, 3), - addrWithPort(t, 4), - } - + bads := []ma.Multiaddr{addrWithPort(t, 1), addrWithPort(t, 2), addrWithPort(t, 3), addrWithPort(t, 4)} good := addrWithPort(t, 20) resch := make(chan dialResult) @@ -162,6 +157,7 @@ func TestFDLimiting(t *testing.T) { pid5 := peer.ID("testpeer5") utpaddr := mustAddr(t, "/ip4/127.0.0.1/udp/7777/utp") + // This should complete immediately since utp addresses arent blocked by fd rate limiting l.AddDialJob(&dialJob{ctx: ctx, peer: pid5, addr: utpaddr, resp: resch}) select { @@ -263,3 +259,57 @@ func TestTokenRedistribution(t *testing.T) { t.Fatal("should have gotten successful dial") } } + +func TestStressLimiter(t *testing.T) { + df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + if tcpPortOver(a, 1000) { + return conn.Conn(nil), nil + } else { + time.Sleep(time.Millisecond * time.Duration(5+rand.Intn(100))) + return nil, fmt.Errorf("test bad dial") + } + } + + l := newDialLimiterWithParams(df, 20, 5) + + var bads []ma.Multiaddr + for i := 0; i < 100; i++ { + bads = append(bads, addrWithPort(t, i)) + } + + addresses := append(bads, addrWithPort(t, 2000)) + success := make(chan struct{}) + + for i := 0; i < 20; i++ { + go func(id peer.ID) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp := make(chan dialResult) + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + for _, i := range rand.Perm(len(addresses)) { + l.AddDialJob(&dialJob{ + addr: addresses[i], + ctx: ctx, + peer: id, + resp: resp, + }) + } + + for res := range resp { + if res.Err == nil { + success <- struct{}{} + return + } + } + }(peer.ID(fmt.Sprintf("testpeer%d", i))) + } + + for i := 0; i < 20; i++ { + select { + case <-success: + case <-time.After(time.Second * 5): + t.Fatal("expected a success within five seconds") + } + } +} From b54202e768180695e9da8e700ec671e0fd4b48cb Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 13 May 2016 09:28:52 -0700 Subject: [PATCH 3/6] refactor locking order structure --- p2p/net/swarm/limiter.go | 59 +++++++++++++++++++++-------------- p2p/net/swarm/limiter_test.go | 4 +++ 2 files changed, 39 insertions(+), 24 deletions(-) diff --git a/p2p/net/swarm/limiter.go b/p2p/net/swarm/limiter.go index 7835fe5729..94ce05bb86 100644 --- a/p2p/net/swarm/limiter.go +++ b/p2p/net/swarm/limiter.go @@ -66,6 +66,20 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { dl.rllock.Lock() defer dl.rllock.Unlock() + if addrutil.IsFDCostlyTransport(dj.addr) { + dl.fdConsuming-- + if len(dl.waitingOnFd) > 0 { + next := dl.waitingOnFd[0] + dl.waitingOnFd = dl.waitingOnFd[1:] + if len(dl.waitingOnFd) == 0 { + dl.waitingOnFd = nil // clear out memory + } + dl.fdConsuming++ + + go dl.executeDial(next) + } + } + // release tokens in reverse order than we take them dl.activePerPeer[dj.peer]-- if dl.activePerPeer[dj.peer] == 0 { @@ -87,17 +101,6 @@ func (dl *dialLimiter) finishedDial(dj *dialJob) { go dl.executeDial(next) } - if addrutil.IsFDCostlyTransport(dj.addr) { - dl.fdConsuming-- - if len(dl.waitingOnFd) > 0 { - next := dl.waitingOnFd[0] - dl.waitingOnFd = dl.waitingOnFd[1:] - dl.fdConsuming++ - - // now, attempt to take the 'per peer limit' token - dl.schedulePerPeerDial(next) - } - } } // AddDialJob tries to take the needed tokens for starting the given dial job. @@ -107,6 +110,13 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { dl.rllock.Lock() defer dl.rllock.Unlock() + if dl.activePerPeer[dj.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[dj.peer] + dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj) + return + } + dl.activePerPeer[dj.peer]++ + if addrutil.IsFDCostlyTransport(dj.addr) { if dl.fdConsuming >= dl.fdLimit { dl.waitingOnFd = append(dl.waitingOnFd, dj) @@ -117,7 +127,20 @@ func (dl *dialLimiter) AddDialJob(dj *dialJob) { dl.fdConsuming++ } - dl.schedulePerPeerDial(dj) + // take second needed token and start dial! + go dl.executeDial(dj) +} + +func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { + if dl.activePerPeer[j.peer] >= dl.perPeerLimit { + wlist := dl.waitingOnPeerLimit[j.peer] + dl.waitingOnPeerLimit[j.peer] = append(wlist, j) + return + } + + // take second needed token and start dial! + dl.activePerPeer[j.peer]++ + go dl.executeDial(j) } // executeDial calls the dialFunc, and reports the result through the response @@ -135,15 +158,3 @@ func (dl *dialLimiter) executeDial(j *dialJob) { case <-j.ctx.Done(): } } - -func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) { - if dl.activePerPeer[j.peer] >= dl.perPeerLimit { - wlist := dl.waitingOnPeerLimit[j.peer] - dl.waitingOnPeerLimit[j.peer] = append(wlist, j) - return - } - - // take second needed token and start dial! - dl.activePerPeer[j.peer]++ - go dl.executeDial(j) -} diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go index 28733c5ab5..fb1be191ee 100644 --- a/p2p/net/swarm/limiter_test.go +++ b/p2p/net/swarm/limiter_test.go @@ -3,6 +3,7 @@ package swarm import ( "fmt" "math/rand" + "runtime" "strconv" "testing" "time" @@ -262,6 +263,7 @@ func TestTokenRedistribution(t *testing.T) { func TestStressLimiter(t *testing.T) { df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { + fmt.Println("dial for peer: ", string(p)) if tcpPortOver(a, 1000) { return conn.Conn(nil), nil } else { @@ -305,6 +307,8 @@ func TestStressLimiter(t *testing.T) { }(peer.ID(fmt.Sprintf("testpeer%d", i))) } + time.Sleep(time.Millisecond * 1000) + fmt.Println("NUM GOROS: ", runtime.NumGoroutine()) for i := 0; i < 20; i++ { select { case <-success: From 84d4e76e2faf2b09b916bdb889a93fee66aff3c0 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 14 May 2016 19:32:28 -0700 Subject: [PATCH 4/6] address CR feedback --- p2p/net/swarm/addr/addr.go | 4 ++-- p2p/net/swarm/addr/filter.go | 4 ++++ p2p/net/swarm/swarm_dial.go | 8 ++++---- package.json | 1 + 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/p2p/net/swarm/addr/addr.go b/p2p/net/swarm/addr/addr.go index 8c0c94b2d6..d9ba87216a 100644 --- a/p2p/net/swarm/addr/addr.go +++ b/p2p/net/swarm/addr/addr.go @@ -40,8 +40,8 @@ func init() { SupportedTransportProtocols = transports } -// FilterAddrs is a filter that removes certain addresses, according to filter. -// if filter returns true, the address is kept. +// FilterAddrs is a filter that removes certain addresses, according the given filters. +// if all filters return true, the address is kept. func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr { b := make([]ma.Multiaddr, 0, len(a)) for _, addr := range a { diff --git a/p2p/net/swarm/addr/filter.go b/p2p/net/swarm/addr/filter.go index c67e1c624e..d87ba816a7 100644 --- a/p2p/net/swarm/addr/filter.go +++ b/p2p/net/swarm/addr/filter.go @@ -5,6 +5,7 @@ import ( mafmt "github.com/whyrusleeping/mafmt" ) +// SubtractFilter returns a filter func that filters all of the given addresses func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool { addrmap := make(map[string]bool) for _, a := range addrs { @@ -16,10 +17,13 @@ func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool { } } +// IsFDCostlyTransport returns true for transports that require a new file +// descriptor per connection created func IsFDCostlyTransport(a ma.Multiaddr) bool { return mafmt.TCP.Matches(a) } +// FilterNeg returns a negated version of the passed in filter func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool { return func(a ma.Multiaddr) bool { return !f(a) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index c1b3a73922..33b087efc3 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -390,9 +390,6 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. continue } - // limitedDial will start a dial to the given peer when - // it is able, respecting the various different types of rate - // limiting that occur without using extra goroutines per addr s.limitedDial(ctx, p, addr, respch) active++ case <-ctx.Done(): @@ -403,7 +400,7 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. case resp := <-respch: active-- if resp.Err != nil { - log.Error("got error on dial: ", resp.Err) + log.Info("got error on dial: ", resp.Err) // Errors are normal, lots of dials will fail exitErr = resp.Err @@ -417,6 +414,9 @@ func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma. } } +// limitedDial will start a dial to the given peer when +// it is able, respecting the various different types of rate +// limiting that occur without using extra goroutines per addr func (s *Swarm) limitedDial(ctx context.Context, p peer.ID, a ma.Multiaddr, resp chan dialResult) { s.limiter.AddDialJob(&dialJob{ addr: a, diff --git a/package.json b/package.json index 5af729700c..70d1bf2952 100644 --- a/package.json +++ b/package.json @@ -1,4 +1,5 @@ { + "name":"go-libp2p", "author": "whyrusleeping", "bugs": { "url": "https://github.com/ipfs/go-libp2p" From 9edbb2d9325ea1cdc51a999c47ddfc8ac7cb3070 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sat, 14 May 2016 22:51:25 -0700 Subject: [PATCH 5/6] test cleanup --- p2p/net/swarm/limiter_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/p2p/net/swarm/limiter_test.go b/p2p/net/swarm/limiter_test.go index fb1be191ee..28733c5ab5 100644 --- a/p2p/net/swarm/limiter_test.go +++ b/p2p/net/swarm/limiter_test.go @@ -3,7 +3,6 @@ package swarm import ( "fmt" "math/rand" - "runtime" "strconv" "testing" "time" @@ -263,7 +262,6 @@ func TestTokenRedistribution(t *testing.T) { func TestStressLimiter(t *testing.T) { df := func(ctx context.Context, p peer.ID, a ma.Multiaddr) (conn.Conn, error) { - fmt.Println("dial for peer: ", string(p)) if tcpPortOver(a, 1000) { return conn.Conn(nil), nil } else { @@ -307,8 +305,6 @@ func TestStressLimiter(t *testing.T) { }(peer.ID(fmt.Sprintf("testpeer%d", i))) } - time.Sleep(time.Millisecond * 1000) - fmt.Println("NUM GOROS: ", runtime.NumGoroutine()) for i := 0; i < 20; i++ { select { case <-success: From 6dddefe20c492008c092788820c32cd34e585aed Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 1 Jun 2016 10:02:26 -0700 Subject: [PATCH 6/6] improve comment on temp code --- p2p/net/swarm/swarm_dial.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 33b087efc3..1100ca544d 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -330,8 +330,14 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) { s.Filters.AddrBlocked) */ - ////// TEMP UNTIL PEERSTORE GETS UPGRADED - // Ref: https://github.com/ipfs/go-libp2p-peer/pull/1 + ////// + /* + This code is temporary, the peerstore can currently provide + a channel as an interface for receiving addresses, but more thought + needs to be put into the execution. For now, this allows us to use + the improved rate limiter, while maintaining the outward behaviour + that we previously had (halting a dial when we run out of addrs) + */ paddrs := s.peers.Addrs(p) good_addrs := addrutil.FilterAddrs(paddrs, addrutil.AddrUsableFunc,