Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor swarm dialing logic #38

Merged
merged 6 commits into from
Jun 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions p2p/net/swarm/addr/addr.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ func init() {
SupportedTransportProtocols = transports
}

// FilterAddrs is a filter that removes certain addresses, according to filter.
// if filter returns true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiaddr {
// FilterAddrs is a filter that removes certain addresses, according the given filters.
// if all filters return true, the address is kept.
func FilterAddrs(a []ma.Multiaddr, filters ...func(ma.Multiaddr) bool) []ma.Multiaddr {
b := make([]ma.Multiaddr, 0, len(a))
for _, addr := range a {
if filter(addr) {
good := true
for _, filter := range filters {
good = good && filter(addr)
}
if good {
b = append(b, addr)
}
}
Expand All @@ -56,9 +60,11 @@ func FilterAddrs(a []ma.Multiaddr, filter func(ma.Multiaddr) bool) []ma.Multiadd
// from a list. the addresses removed are those known NOT
// to work with our network. Namely, addresses with UTP.
func FilterUsableAddrs(a []ma.Multiaddr) []ma.Multiaddr {
return FilterAddrs(a, func(m ma.Multiaddr) bool {
return AddrUsable(m, false)
})
return FilterAddrs(a, AddrUsableFunc)
}

func AddrUsableFunc(m ma.Multiaddr) bool {
return AddrUsable(m, false)
}

// AddrOverNonLocalIP returns whether the addr uses a non-local ip link
Expand Down
31 changes: 31 additions & 0 deletions p2p/net/swarm/addr/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package addrutil

import (
ma "github.com/jbenet/go-multiaddr"
mafmt "github.com/whyrusleeping/mafmt"
)

// SubtractFilter returns a filter func that filters all of the given addresses
func SubtractFilter(addrs ...ma.Multiaddr) func(ma.Multiaddr) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A short godoc here would be helpful, to explain what this does and why it's useful.

addrmap := make(map[string]bool)
for _, a := range addrs {
addrmap[string(a.Bytes())] = true
}

return func(a ma.Multiaddr) bool {
return !addrmap[string(a.Bytes())]
}
}

// IsFDCostlyTransport returns true for transports that require a new file
// descriptor per connection created
func IsFDCostlyTransport(a ma.Multiaddr) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "costly" mean in this context? Could a short godoc here clarify?

return mafmt.TCP.Matches(a)
}

// FilterNeg returns a negated version of the passed in filter
func FilterNeg(f func(ma.Multiaddr) bool) func(ma.Multiaddr) bool {
Copy link
Contributor

@hackergrrl hackergrrl May 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: quick godoc (what & why)

return func(a ma.Multiaddr) bool {
return !f(a)
}
}
36 changes: 0 additions & 36 deletions p2p/net/swarm/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package swarm

import (
"net"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -493,38 +492,3 @@ func TestDialBackoffClears(t *testing.T) {
t.Log("correctly cleared backoff")
}
}

func mkAddr(t *testing.T, s string) ma.Multiaddr {
a, err := ma.NewMultiaddr(s)
if err != nil {
t.Fatal(err)
}

return a
}

func TestAddressSorting(t *testing.T) {
u1 := mkAddr(t, "/ip4/152.12.23.53/udp/1234/utp")
u2l := mkAddr(t, "/ip4/127.0.0.1/udp/1234/utp")
local := mkAddr(t, "/ip4/127.0.0.1/tcp/1234")
norm := mkAddr(t, "/ip4/6.5.4.3/tcp/1234")

l := AddrList{local, u1, u2l, norm}
sort.Sort(l)

if !l[0].Equal(u2l) {
t.Fatal("expected utp local addr to be sorted first: ", l[0])
}

if !l[1].Equal(u1) {
t.Fatal("expected utp addr to be sorted second")
}

if !l[2].Equal(local) {
t.Fatal("expected tcp localhost addr thid")
}

if !l[3].Equal(norm) {
t.Fatal("expected normal addr last")
}
}
160 changes: 160 additions & 0 deletions p2p/net/swarm/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package swarm

import (
"sync"

peer "github.com/ipfs/go-libp2p-peer"
ma "github.com/jbenet/go-multiaddr"
context "golang.org/x/net/context"

conn "github.com/ipfs/go-libp2p/p2p/net/conn"
addrutil "github.com/ipfs/go-libp2p/p2p/net/swarm/addr"
)

type dialResult struct {
Conn conn.Conn
Err error
}

type dialJob struct {
addr ma.Multiaddr
peer peer.ID
ctx context.Context
resp chan dialResult
success bool
}

func (dj *dialJob) cancelled() bool {
select {
case <-dj.ctx.Done():
return true
default:
return false
}
}

type dialLimiter struct {
rllock sync.Mutex
fdConsuming int
fdLimit int
waitingOnFd []*dialJob

dialFunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)

activePerPeer map[peer.ID]int
perPeerLimit int
waitingOnPeerLimit map[peer.ID][]*dialJob
}

type dialfunc func(context.Context, peer.ID, ma.Multiaddr) (conn.Conn, error)

func newDialLimiter(df dialfunc) *dialLimiter {
return newDialLimiterWithParams(df, concurrentFdDials, defaultPerPeerRateLimit)
}

func newDialLimiterWithParams(df dialfunc, fdl, ppl int) *dialLimiter {
return &dialLimiter{
fdLimit: fdl,
perPeerLimit: ppl,
waitingOnPeerLimit: make(map[peer.ID][]*dialJob),
activePerPeer: make(map[peer.ID]int),
dialFunc: df,
}
}

func (dl *dialLimiter) finishedDial(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()

if addrutil.IsFDCostlyTransport(dj.addr) {
dl.fdConsuming--
if len(dl.waitingOnFd) > 0 {
next := dl.waitingOnFd[0]
dl.waitingOnFd = dl.waitingOnFd[1:]
if len(dl.waitingOnFd) == 0 {
dl.waitingOnFd = nil // clear out memory
}
dl.fdConsuming++

go dl.executeDial(next)
}
}

// release tokens in reverse order than we take them
Copy link
Contributor

@hackergrrl hackergrrl May 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see lots of references to the term "token", but only in comments. Could there be a comment somewhere relevant that explains what a token is in this context? (Or, better, name the relevant object token)

dl.activePerPeer[dj.peer]--
if dl.activePerPeer[dj.peer] == 0 {
delete(dl.activePerPeer, dj.peer)
}

waitlist := dl.waitingOnPeerLimit[dj.peer]
if !dj.success && len(waitlist) > 0 {
next := waitlist[0]
if len(waitlist) == 1 {
delete(dl.waitingOnPeerLimit, dj.peer)
} else {
dl.waitingOnPeerLimit[dj.peer] = waitlist[1:]
}
dl.activePerPeer[dj.peer]++ // just kidding, we still want this token

// can kick this off right here, dials in this list already
// have the other tokens needed
go dl.executeDial(next)
}

}

// AddDialJob tries to take the needed tokens for starting the given dial job.
// If it acquires all needed tokens, it immediately starts the dial, otherwise
// it will put it on the waitlist for the requested token.
func (dl *dialLimiter) AddDialJob(dj *dialJob) {
dl.rllock.Lock()
defer dl.rllock.Unlock()

if dl.activePerPeer[dj.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[dj.peer]
dl.waitingOnPeerLimit[dj.peer] = append(wlist, dj)
return
}
dl.activePerPeer[dj.peer]++

if addrutil.IsFDCostlyTransport(dj.addr) {
if dl.fdConsuming >= dl.fdLimit {
dl.waitingOnFd = append(dl.waitingOnFd, dj)
return
}

// take token
dl.fdConsuming++
}

// take second needed token and start dial!
go dl.executeDial(dj)
}

func (dl *dialLimiter) schedulePerPeerDial(j *dialJob) {
if dl.activePerPeer[j.peer] >= dl.perPeerLimit {
wlist := dl.waitingOnPeerLimit[j.peer]
dl.waitingOnPeerLimit[j.peer] = append(wlist, j)
return
}

// take second needed token and start dial!
dl.activePerPeer[j.peer]++
go dl.executeDial(j)
}

// executeDial calls the dialFunc, and reports the result through the response
// channel when finished. Once the response is sent it also releases all tokens
// it held during the dial.
func (dl *dialLimiter) executeDial(j *dialJob) {
defer dl.finishedDial(j)
if j.cancelled() {
return
}

con, err := dl.dialFunc(j.ctx, j.peer, j.addr)
select {
case j.resp <- dialResult{Conn: con, Err: err}:
case <-j.ctx.Done():
}
}
Loading