-
Notifications
You must be signed in to change notification settings - Fork 37
WIP Dialer v2: Pluggable and composable dialer #88
Conversation
I don't think we should swap components based on NAT status, but rather have a single component that handles both NATed and unNATed case; the issue is deciding what to do when faced with a NAT on the dial target. |
For completeness, this is the critical case we discussed during hack week:
The new dialer must be able to handle this by default. |
@vyzo thanks for writing those down. Implementing those behaviours would make a good test of the modularity principles of the design being introduced here. I'll add a point in the TODO checklist.
These would be a dynamic filter in the
This would be a This is a reactive model, but if we really want a proactive one, the Planner can launch a goroutine to drip dial jobs based on a timer or delay schedule. Regardless, it'll get called back each time a dial completes. |
👍 By pipeline, I assume that all stages are actually run in parallel, right?
Specifically this. We need to be able to both dial and find new addresses in parallel.
These two are in contention. However, I agree that the executor shouldn't block (the throttler should handle limiting jobs).
How does the throttler learn about these results? I think we'll need a way to attach listeners/callbacks to individual dial jobs. |
return false | ||
} | ||
|
||
// AddBackoff lets other nodes know that we've entered backoff with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets other nodes know? that doesnt seem right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree it's confusing; copy-pasted from the original implementation though:
Line 129 in ccc587c
// AddBackoff lets other nodes know that we've entered backoff with |
While we are here, @whyrusleeping... the Swarm currently exposes the Backoff via an accessor that's not part of the Network interface:
Line 435 in 4bf3943
func (s *Swarm) Backoff() *DialBackoff { |
I think it's there mostly for testing purposes, or do we want to allow other components to fiddle with the backoff logic? With it now being a just another RequestPreparer
, I'm reluctant to exposing it like this.
dial/pipeline.go
Outdated
id peer.ID | ||
|
||
// Addresses to be populated by RequestPreparers. | ||
addrs []ma.Multiaddr |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're going to need to be able to feed addresses as we get them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did grapple with the idea of making this a chan
or a slice, and settled on a slice because I thought the Planner
needs to know all addresses before it can make sound planning decisions. However, that comes at a time cost: you can't start dialing suboptimal addresses while you fetch better ones in parallel.
I had originally thought about an AddressResolver
component, with several implementations:
- DefaultAddressResolver => fetches from the peerstore.
- DHTAddressResolver => queries the DHT if there are no hits in the peerstore, or if they are stale.
I'll move back in that direction, as it provides a better encapsulation for the chan
variant.
dial/interfaces.go
Outdated
// | ||
// When the planner is satisfied and has no more dials to request, it must signal so by closing | ||
// the dialCh channel. | ||
Next(req *Request, dialed dialJobs, last *Job, dialCh chan dialJobs) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
- Breaking this into two steps: Planner -> Plan.
- Letting the plan track the state instead of feeding it back in every time.
type Planner interface {
Plan(req *Request) (Plan, error)
}
type Plan interface {
Complete(j *Job) error // may need to pass in a connection?
Next() (*Job, error) // or maybe a channel?
}
Not sure about the actual interface definitions, I just think breaking this up will help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, good point!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seconding this, i think this general structure highlights the FLOW as well
dial/interfaces.go
Outdated
// Start spawns the goroutine that is in charge of throttling. It receives planned jobs via inCh and emits | ||
// jobs to execute on dialCh. The throttler can apply any logic in-between: it may throttle jobs based on | ||
// system resources, time, inflight dials, network conditions, fail rate, etc. | ||
Start(ctx context.Context, inCh <-chan *Job, dialCh chan<- *Job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to have:
Start(ctx context.Context, inCh <-chan *Job) <-chan *Job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are buffered channels, I preferred the pipeline to be responsible for them.
dial/interfaces.go
Outdated
// the Selector picks the optimal successful connection to return to the consumer. The Pipeline takes care of | ||
// closing unselected successful connections. | ||
type Selector interface { | ||
Select(successful dialJobs) (tpt.Conn, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, the planner needs to be able to determine when to finish anyways. I'd be surprised if it didn't have enough information to perform the selection as well. What's the motivation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Selector
will be used in two cases:
- The
Planner
explicitly continues dialling upon a successful dial, with the expectation of finding a better connection. - The
Planner
is a first-wins one, but it scheduled multiple dials in parallel, and more than 1 came back positive.
In (1), I do agree that the selection heuristic should be part of the Planner by design; if it's deliberately seeking a better something, it must know what that something is. With (2), I'm not so sure it's part of the Planner
to resolve that conflict; we may have different selection strategies going forward: by transport, by latency (do a PING on all connections), by packet loss, by traceroute, etc.
dial/pipeline.go
Outdated
} | ||
} | ||
|
||
func (j *Job) AddCallback(cb func()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
dial/pipeline.go
Outdated
|
||
reqPreparers []RequestPreparer | ||
planner Planner | ||
throttler Throttler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally, I'd allow multiple of these. Throttler's can easily throttle a single job type, passing the rest on to the next throttler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... would there be a case to have multiple Planner
s as well? Rather than modelling the cardinality here, we could provide Composite*
wrappers that from the outside behave the same, with their constructors accepting a number of Planners, Throttlers, etc.
We could even kill the n-cardinality on the {Request/Job}Preparers
, in favour of composites. WDYT?
dial/pipeline.go
Outdated
addConnFn AddConnFn | ||
} | ||
|
||
func (p *Pipeline) Component(name string, comp interface{}) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do this, or we could just have separate RegisterPlanner
, etc... functions. What was the motivation?
One potential motivation is that this version allows us to use the same component multiple times (e.g., we can have a single component implement the preparer and the planner).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely. This choice is simplistic and controversial on purpose to incite dialogue. I haven't landed on a pipeline configuration API I feel good about yet.
I'm leaning towards providing functions to manage {Request,Job}Preparers
positionally, e.g. AddLast, AddFirst, AddAfter, AddBefore, Replace
(hence name argument).
I also think the API should make it possible to change components at runtime (some components embed io.Closer
to this avail.
dial/pipeline.go
Outdated
} | ||
|
||
func (p *Pipeline) Component(name string, comp interface{}) error { | ||
switch comp.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you can use switch c := comp.(type) {
and then use c
to avoid the casts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I knew this was too clumsy.
dial/pipeline.go
Outdated
pipeline := &Pipeline{ | ||
ctx: ctx, | ||
net: net, | ||
addConnFn: addConnFn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this callback or is there some way we can completely separate this from the network? That is, it would be nice if the pipeline could be completely modular and return connections to the network instead of feeding them in like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially because this makes it hard for users to define custom pipelines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, you're right. The pipeline could simply return a connection, and the Swarm
could add it locally. I like that much better.
dial/addr_resolver.go
Outdated
|
||
var _ RequestPreparer = (*validator)(nil) | ||
|
||
func NewAddrResolver(staticFilters []func(ma.Multiaddr) bool, dynamicFilters []func(req *Request) func(ma.Multiaddr) bool) RequestPreparer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you make some type definitions for the funcs? The declaration is alomst comical.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main culprit is type MultiaddrFilter func(ma.Multiaddr) bool
would go a long way towards simplifying.
👍 on the overall design. For failure states, it looks like if no connection can be created the pipeline will ultimately return an error. If multiple jobs are executed in the pipeline, and various points in the pipeline can result in a returned error, what information is the pipeline providing for the consuming application to make informed decisions? This is a request we've seen pretty frequently on the js side of things and have been working to improve the clarity of those errors. |
Good work @raulk.
I think a better place would be the planner where you have access to all the multiaddrs of a peer. go-libp2p-swarm/dial/planner.go Lines 18 to 21 in 2e1b16e
We can maybe keep dialling to the same peer sequential (trying one address at a time and if it fails try the next one) and dialling to new peer concurrent (connection to new peer will yield a new goroutine).
|
ight here we go 🍽 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a really great start. code is generally structured well and is infinitely more readable/grokable. excited to see this move fwd.
dial/interfaces.go
Outdated
// | ||
// A Preparer may cancel the dial preemptively in error or in success, by calling Complete() on the Request. | ||
type Preparer interface { | ||
Prepare(req *Request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason to not have this return a new modified request for easier debugability? also where is Request
defined? i can't find it...
edit github was glitchin
dial/interfaces.go
Outdated
// | ||
// When the planner is satisfied and has no more dials to request, it must signal so by closing | ||
// the dialCh channel. | ||
Next(req *Request, dialed dialJobs, last *Job, dialCh chan dialJobs) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seconding this, i think this general structure highlights the FLOW as well
dial/pipeline.go
Outdated
return pipeline | ||
} | ||
|
||
func (p *Pipeline) Start(ctx context.Context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe close existing channels and cancel existing contexts if present
dial/pipeline.go
Outdated
req := NewDialRequest(ctx, p.net, id) | ||
|
||
// Prepare the dial. | ||
if p.preparer.Prepare(req); req.IsComplete() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
places like this are where i think copying the req would make code more clear
dial/pipeline.go
Outdated
} | ||
|
||
if len(req.addrs) == 0 { | ||
return nil, errors.New("no addresses to dial") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's factor this error into a const so people can compare against it
dial/pipeline.go
Outdated
PlanExecute: | ||
for { | ||
select { | ||
case jobs, more := <-planCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by convention may be worth changing more
to ok
? don't feel too strongly about it but it did trip me up
dial/planner.go
Outdated
// ErrNoSuccessfulDials is returned by Select() when all dials failed. | ||
var ErrNoSuccessfulDials = errors.New("no successful dials") | ||
|
||
type singleBurstPlanner struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docstring
dial/throttler.go
Outdated
// Process any token releases to avoid those channels getting backed up. | ||
select { | ||
case id := <-t.peerTokenFreed: | ||
waitlist := t.waitingOnPeerLimit[id] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like we have a case where jobs waiting on the per-peer limit can then bypass the FD limits since we don't check here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't this is possible because when we get a new job, we check the peer limit first, then the FD limit (by virtue of the implicit ordering of the &&
here). If a job fails the peer limit, we queue until we get a slot. Then we check the FD limit before actually doing the dial.
} | ||
continue ThrottleLoop | ||
|
||
case <-t.fdTokenFreed: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same, this can bypass per-peer limits
return nil | ||
} | ||
|
||
func (t *throttler) throttleFd(j *Job) (throttle bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we condense these two throttle funcs into one that also accepts an optional current state so it can transition as necessary
@raulk is this blocked on review (trying to keep it out of PR review purgatory)? |
@Stebalien, thanks for checking in. This PR is awaiting further changes from me. Will be done soon and will ping you all for another round of reviews. Sorry for the slowness; a few things jumped the queue. |
Take your time, I just wanted to make sure it wasn't stuck on something. |
Deprecated in favour of #122; thanks a lot for your comments. I've considered them in the new iteration. |
Heavy work in progress. Early preview. Incubating. Unstable. Embryonic. And any other way to say this is early WIP. ;-)
Some tests pass, some fail. Code needs to be polished, docs are scarce, and the public API (for configuring the pipeline and its components) needs attention and feedback. However, I do feel good enough about this design and PoC to start soliciting feedback.
NOTE: This PR is comprehensive in terms of docs, because it aspires to become as a spec once this work is merged. Also to facilitate the communication of the design decisions I've taken.
Rationale
As libp2p moves forward as the networking stack of choice for decentralised applications, each use case may require a different behaviour from the dialer. Not only in terms of configuration parameters (e.g. throttling), but also in terms of behaviour (how addresses are selected, which addresses are dialed first, how successful connections are selected, etc.)
With the current dialer we are unable to seamlessly do the following without special-casing:
RoutedHost
abstraction. With a composable/pluggable dialer, we could inject a processor to resolve addresses from the DHT.Design
There are five phases to dialing: PREPARE, PLAN, THROTTLE, EXECUTE, SELECT. Conceptual model of dialer v2:
Pipeline
: this is the skeleton of the dialer, onto which components attach. It handles the process of dialing a peer as a sequence of actions, and returns the result as either ainet.Conn
or an error.Request
: a request to dial a peer ID.Job
: an job to dial a single multiaddr, stemming from a Dial Request.Requests
and others deal withJobs
. There are different kinds of components.Components
RequestPreparer
componentPrepares a
Request
before it is planned. They can abort the request and return a result immediately.Implementations include:
Validator
: validates that the peer ID, that we're not dialing to ourselves, and we're not already connected to the peer. It can abort the request and return an error or an existing connection.Backoff
: returns an error if the peer is backed off.Sync
/dedup: deduplicates dial requests. If a dial request to a peer ID is underway, the new request will wait, and will return the same result as the first.AddressResolver
: fetches multiaddrs from the Peerstore, and applies filtering logic (configurable) to inject the set of addresses to dial in theRequest
.Planner
componentDial planners take a
Request
(populated with multiaddrs) and emit dialJobs
on a channel (wired to the throttler) for the addresses they want dialed.The pipeline will call the planner once at the beginning, to retrieve the initial set of dials to perform, as well as every time a dial job completes (providing a slice of all dials and the last dial), to give the planner a chance to react and emit new dial requests.
When the
Planner
is satisfied with the result (e.g. when a dial has succeeded), it closes the dial channel to signal no more work to do.Currently we have a
SingleBurstPlanner
implementation, but this model is flexible enough to create anything we can imagine, e.g. prioritise certain addrs, group dials together, dial one by one until at least two dials succeed or we have no more dials, etc.JobPreparer
componentPrepares jobs emitted by the planner before they are dialed, e.g. by setting protocol-specific timeouts.
Throttler
componentA goroutine that applies a throttling process to dial jobs requested by the
Planner
. Once spawned, it receives planned jobs via an input channel, and emits jobs to execute on an output channel. The throttler can apply any logic in-between: it may throttle jobs based on system resources, time, inflight dials, network conditions, fail rate, etc.Currently, we have a default throttler that applies limits based on file descriptors and active dials per peer.
Executor
componentA goroutine responsible for ultimately carrying out the network dial to an addr. Once spawned, it receives dial jobs (from the throttler). It must act like a dispatcher, in turn spawning individual
goroutines for each dial, or maintaining a finite set of child workers, to carry out the dials. The Executor must never block.
When a dial completes, it is sent to a channel where it is received by the pipeline, who informs the Planner of the result.
Selector
componentOnce the Executor has completed all dial jobs planned by the Planner, the Selector is called with a list of successful dials/connections. It can use any logic to decide which dial to keep, e.g. the one with the lowest latency, the first one, a random one, the one with the least relay hops, etc.
The
Pipeline
will take care of closing all unselected dials.Tips for review
interfaces.go
to get a good overview of the interfaces.pipeline.go
to understand how thePipeline
works and how the pieces fit together.dial
package.Pipeline
is integrated in theSwarm
.TODO