Skip to content

Commit

Permalink
handle static relays like peer sources
Browse files Browse the repository at this point in the history
  • Loading branch information
juligasa committed Nov 3, 2022
1 parent 8a64be8 commit 96ece99
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 59 deletions.
2 changes: 1 addition & 1 deletion p2p/host/autorelay/autorelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewAutoRelay(bhost *basic.BasicHost, opts ...Option) (*AutoRelay, error) {
}
r.ctx, r.ctxCancel = context.WithCancel(context.Background())
r.conf = &conf
r.relayFinder = newRelayFinder(bhost, conf.peerSource, &conf)
r.relayFinder = newRelayFinder(bhost, conf.peerSource, conf.staticRelaySource, &conf)
bhost.AddrsFactory = r.hostAddrs

r.refCount.Add(1)
Expand Down
45 changes: 23 additions & 22 deletions p2p/host/autorelay/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"time"

"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -15,8 +16,10 @@ type config struct {
clock clock.Clock
peerSource func(ctx context.Context, num int) <-chan peer.AddrInfo
// minimum interval used to call the peerSource callback
minInterval time.Duration
staticRelays []peer.AddrInfo
peerMinInterval time.Duration
staticRelaySource func(ctx context.Context, num int) <-chan peer.AddrInfo
// minimum interval used to call the staticRelaySource callback
staticRelayMinInterval time.Duration
// see WithMinCandidates
minCandidates int
// see WithMaxCandidates
Expand Down Expand Up @@ -44,11 +47,6 @@ var defaultConfig = config{
maxCandidateAge: 30 * time.Minute,
}

var (
errStaticRelaysMinCandidates = errors.New("cannot use WithMinCandidates and WithStaticRelays")
errStaticRelaysPeerSource = errors.New("cannot use WithPeerSource and WithStaticRelays")
)

// DefaultRelays are the known PL-operated v1 relays; will be decommissioned in 2022.
var DefaultRelays = []string{
"/ip4/147.75.80.110/tcp/4001/p2p/QmbFgm5zan8P6eWWmeyfncR5feYEMPbht5b1FW1C37aQ7y",
Expand All @@ -74,18 +72,27 @@ func init() {
type Option func(*config) error

func WithStaticRelays(static []peer.AddrInfo) Option {

staticRelayChan := make(chan peer.AddrInfo, len(static))

return func(c *config) error {
if c.setMinCandidates {
return errStaticRelaysMinCandidates
}
if c.peerSource != nil {
return errStaticRelaysPeerSource
}
if len(c.staticRelays) > 0 {
if c.staticRelaySource != nil {
return errors.New("can't set static relays, static relays already configured")
}
c.minCandidates = len(static)
c.staticRelays = static
c.staticRelaySource = func(ctx context.Context, num int) <-chan peer.AddrInfo {
c.staticRelayMinInterval = time.Hour
rand.Seed(time.Now().UnixNano())
var static_candidates = static
rand.Shuffle(len(static_candidates), func(i, j int) {
static_candidates[i], static_candidates[j] = static_candidates[j], static_candidates[i]
})
for i := 0; i < num%c.minCandidates; i++ {
staticRelayChan <- static_candidates[i]
}
return staticRelayChan
}

return nil
}
}
Expand All @@ -107,11 +114,8 @@ func WithDefaultStaticRelays() Option {
// If the channel is canceled you MUST close the output channel at some point.
func WithPeerSource(f func(ctx context.Context, numPeers int) <-chan peer.AddrInfo, minInterval time.Duration) Option {
return func(c *config) error {
if len(c.staticRelays) > 0 {
return errStaticRelaysPeerSource
}
c.peerSource = f
c.minInterval = minInterval
c.peerMinInterval = minInterval
return nil
}
}
Expand Down Expand Up @@ -140,9 +144,6 @@ func WithMaxCandidates(n int) Option {
// This is to make sure that we don't just randomly connect to the first candidate that we discover.
func WithMinCandidates(n int) Option {
return func(c *config) error {
if len(c.staticRelays) > 0 {
return errStaticRelaysMinCandidates
}
if n > c.maxCandidates {
n = c.maxCandidates
}
Expand Down
58 changes: 22 additions & 36 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type relayFinder struct {
ctxCancel context.CancelFunc
ctxCancelMx sync.Mutex

peerSource func(context.Context, int) <-chan peer.AddrInfo
peerSource func(context.Context, int) <-chan peer.AddrInfo
staticRelaySource func(context.Context, int) <-chan peer.AddrInfo

candidateFound chan struct{} // receives every time we find a new relay candidate
candidateMx sync.Mutex
Expand All @@ -82,12 +83,14 @@ type relayFinder struct {
cachedAddrsExpiry time.Time
}

func newRelayFinder(host *basic.BasicHost, peerSource func(context.Context, int) <-chan peer.AddrInfo, conf *config) *relayFinder {
func newRelayFinder(host *basic.BasicHost, peerSource, staticRelaySource func(context.Context, int) <-chan peer.AddrInfo,
conf *config) *relayFinder {
return &relayFinder{
bootTime: conf.clock.Now(),
host: host,
conf: conf,
peerSource: peerSource,
staticRelaySource: staticRelaySource,
candidates: make(map[peer.ID]*candidate),
backoff: make(map[peer.ID]time.Time),
candidateFound: make(chan struct{}, 1),
Expand All @@ -99,19 +102,11 @@ func newRelayFinder(host *basic.BasicHost, peerSource func(context.Context, int)
}

func (rf *relayFinder) background(ctx context.Context) {
if rf.usesStaticRelay() {
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.handleStaticRelays(ctx)
}()
} else {
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()
}
rf.refCount.Add(1)
go func() {
defer rf.refCount.Done()
rf.findNodes(ctx)
}()

rf.refCount.Add(1)
go func() {
Expand Down Expand Up @@ -206,10 +201,18 @@ func (rf *relayFinder) background(ctx context.Context) {
// It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available.
func (rf *relayFinder) findNodes(ctx context.Context) {
peerChan := rf.peerSource(ctx, rf.conf.maxCandidates)
var peerChan <-chan peer.AddrInfo = nil
//var staticRelayChan <-chan peer.AddrInfo = nil
if rf.peerSource != nil {
peerChan = rf.peerSource(ctx, rf.conf.maxCandidates)
}
//if rf.staticRelaySource != nil {
// staticRelayChan = rf.staticRelaySource(ctx, rf.conf.maxCandidates)
//}

var wg sync.WaitGroup
lastCallToPeerSource := rf.conf.clock.Now()

//lastCallToStaticRelaySource := rf.conf.clock.Now()
timer := newTimer(rf.conf.clock)
for {
rf.candidateMx.Lock()
Expand All @@ -218,7 +221,7 @@ func (rf *relayFinder) findNodes(ctx context.Context) {

if peerChan == nil {
now := rf.conf.clock.Now()
nextAllowedCallToPeerSource := lastCallToPeerSource.Add(rf.conf.minInterval).Sub(now)
nextAllowedCallToPeerSource := lastCallToPeerSource.Add(rf.conf.peerMinInterval).Sub(now)
if numCandidates < rf.conf.minCandidates {
log.Debugw("not enough candidates. Resetting timer", "num", numCandidates, "desired", rf.conf.minCandidates)
timer.Reset(nextAllowedCallToPeerSource)
Expand Down Expand Up @@ -270,23 +273,6 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
}
}

func (rf *relayFinder) handleStaticRelays(ctx context.Context) {
sem := make(chan struct{}, 4)
var wg sync.WaitGroup
wg.Add(len(rf.conf.staticRelays))
for _, pi := range rf.conf.staticRelays {
sem <- struct{}{}
go func(pi peer.AddrInfo) {
defer wg.Done()
defer func() { <-sem }()
rf.handleNewNode(ctx, pi)
}(pi)
}
wg.Wait()
log.Debug("processed all static relays")
rf.notifyNewCandidate()
}

func (rf *relayFinder) notifyMaybeConnectToRelay() {
select {
case rf.maybeConnectToRelayTrigger <- struct{}{}:
Expand Down Expand Up @@ -640,7 +626,7 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
}

func (rf *relayFinder) usesStaticRelay() bool {
return len(rf.conf.staticRelays) > 0
return rf.conf.staticRelaySource != nil
}

func (rf *relayFinder) Start() error {
Expand Down

0 comments on commit 96ece99

Please sign in to comment.