Skip to content

Commit

Permalink
Merge pull request #38 from ipfs/sketch/dial-redo
Browse files Browse the repository at this point in the history
refactor swarm dialing logic
  • Loading branch information
whyrusleeping committed Jun 1, 2016
2 parents 5285574 + 6dddefe commit 4a2bc51
Show file tree
Hide file tree
Showing 9 changed files with 603 additions and 226 deletions.
20 changes: 13 additions & 7 deletions p2p/net/swarm/addr/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ func init() {
SupportedTransportProtocols = transports
}

// FilterAddrs is a filter that removes certain addresses, according to filter.
// if filter returns true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiaddr {
// FilterAddrs is a filter that removes certain addresses, according the given filters.
// if all filters return true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr {
b := make([]ma.Multiaddr, 0, len(a))
for _, addr := range a {
if filter(addr) {
good := true
for _, filter := range filters {
good = good && filter(addr)
}
if good {
b = append(b, addr)
}
}
Expand All @@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd
// from a list. the addresses removed are those known NOT
// to work with our network. Namely, addresses with UTP.
func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr {
return FilterAddrs(a, func(m ma.Multiaddr) bool {
return AddrUsable(m, false)
})
return FilterAddrs(a, AddrUsableFunc)
}

func AddrUsableFunc(m ma.Multiaddr) bool {
return AddrUsable(m, false)
}

// AddrOverNonLocalIP returns whether the addr uses a non-local ip link
Expand Down
31 changes: 31 additions & 0 deletions p2p/net/swarm/addr/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package addrutil

import (
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)

// SubtractFilter returns a filter func that filters all of the given addresses
func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool {
addrmap := make(map[string]bool)
for _, a := range addrs {
addrmap[string(a.Bytes())] = true
}

return func(a ma.Multiaddr) bool {
return !addrmap[string(a.Bytes())]
}
}

// IsFDCostlyTransport returns true for transports that require a new file
// descriptor per connection created
func IsFDCostlyTransport(a ma.Multiaddr) bool {
return mafmt.TCP.Matches(a)
}

// FilterNeg returns a negated version of the passed in filter
func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool {
return func(a ma.Multiaddr) bool {
return !f(a)
}
}
36 changes: 0 additions & 36 deletions p2p/net/swarm/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package swarm

import (
"net"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -494,38 +493,3 @@ func TestDialBackoffClears(t *testing.T) {
t.Log("correctly cleared backoff")
}
}

func mkAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}

return a
}

func TestAddressSorting(t *testing.T) {
u1 := mkAddr(t, "/ip4/152.12.23.53/udp/1234/utp")
u2l := mkAddr(t, "/ip4/127.0.0.1/udp/1234/utp")
local := mkAddr(t, "/ip4/127.0.0.1/tcp/1234")
norm := mkAddr(t, "/ip4/6.5.4.3/tcp/1234")

l := AddrList{local, u1, u2l, norm}
sort.Sort(l)

if !l[0].Equal(u2l) {
t.Fatal("expected utp local addr to be sorted first: ", l[0])
}

if !l[1].Equal(u1) {
t.Fatal("expected utp addr to be sorted second")
}

if !l[2].Equal(local) {
t.Fatal("expected tcp localhost addr thid")
}

if !l[3].Equal(norm) {
t.Fatal("expected normal addr last")
}
}
160 changes: 160 additions & 0 deletions p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package swarm

import (
"sync"

peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
context "golang.org/x/net/context"

conn "github.com/ipfs/go-libp2p/p2p/net/conn"
addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
)

type dialResult struct {
Conn conn.Conn
Err error
}

type dialJob struct {
addr ma.Multiaddr
peer peer.ID
ctx context.Context
resp chan dialResult
success bool
}

func (dj *dialJob) cancelled() bool {
select {
case <-dj.ctx.Done():
return true
default:
return false
}
}

type dialLimiter struct {
rllock sync.Mutex
fdConsuming int
fdLimit int
waitingOnFd []*dialJob

dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)

activePerPeer map[peer.ID]int
perPeerLimit int
waitingOnPeerLimit map[peer.ID][]*dialJob
}

type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)

func newDialLimiter(df dialfunc) *dialLimiter {
return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}

func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
return &dialLimiter{
fdLimit: fdl,
perPeerLimit: ppl,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
activePerPeer: make(map[peer.ID]int),
dialFunc: df,
}
}

func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()

if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++

go dl.executeDial(next)
}
}

// release tokens in reverse order than we take them
dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 {
delete(dl.activePerPeer, dj.peer)
}

waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]
if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
} else {
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
}

}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()

if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++

if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, dj)
return
}

// take token
dl.fdConsuming++
}

// take second needed token and start dial!
go dl.executeDial(dj)
}

func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[j.peer]
dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
return
}

// take second needed token and start dial!
dl.activePerPeer[j.peer]++
go dl.executeDial(j)
}

// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j)
if j.cancelled() {
return
}

con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
select {
case j.resp <- dialResult{Conn: con, Err: err}:
case <-j.ctx.Done():
}
}
Loading

0 comments on commit 4a2bc51

Please sign in to comment.