Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dependency context.Context #31

Merged
merged 4 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions dependency/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dependency

import (
"context"
"math"
"math/rand"
"strings"
Expand Down Expand Up @@ -36,7 +37,6 @@ type Clock interface {

// EngineConfig defines the parameters needed to create a new engine.
type EngineConfig struct {

// IsFatal returns true when passed an error that should stop
// the engine. It must not be nil.
IsFatal IsFatalFunc
Expand Down Expand Up @@ -154,7 +154,6 @@ func NewEngine(config EngineConfig) (*Engine, error) {
// Engine maintains workers corresponding to its installed manifolds, and
// restarts them whenever their inputs change.
type Engine struct {

// config contains values passed in as config when the engine was created.
config EngineConfig

Expand Down Expand Up @@ -217,7 +216,7 @@ func (engine *Engine) loop() error {
case ticket := <-engine.started:
engine.gotStarted(ticket.name, ticket.worker, ticket.resourceLog)
case ticket := <-engine.stopped:
engine.gotStopped(ticket.name, ticket.error, ticket.resourceLog)
engine.gotStopped(ticket.name, ticket.err, ticket.resourceLog)
}
if engine.isDying() {
if engine.allOthersStopped() {
Expand Down Expand Up @@ -399,12 +398,17 @@ func (engine *Engine) requestStart(name string, delay time.Duration) {

// ...then update the info, copy it back to the engine, and start a worker
// goroutine based on current known state.

// Create a context for the worker, this will allow the cancellation of the
// worker if the engine is shutting down.
ctx, cancel := engine.scopedContext()

info.starting = true
info.startAttempts++
info.err = nil
info.abort = make(chan struct{})
info.cancel = cancel
engine.current[name] = info
context := engine.context(name, manifold.Inputs, info.abort)
snapshot := engine.snapshot(ctx, name, manifold.Inputs)

// Always fuzz the delay a bit to help randomise the order of workers starting,
// which should make bugs more obvious
Expand All @@ -425,14 +429,20 @@ func (engine *Engine) requestStart(name string, delay time.Duration) {
delay = time.Duration(float64(delay) * fuzz).Round(time.Millisecond)
}

go engine.runWorker(name, delay, manifold.Start, context)
go engine.runWorker(name, delay, manifold.Start, snapshot)
}

// context returns a context backed by a snapshot of current
// worker state, restricted to those workers declared in inputs. It must only
// be called from the loop goroutine; see inside for a detailed discussion of
// why we took this approach.
func (engine *Engine) context(name string, inputs []string, abort <-chan struct{}) *context {
// scopedContext returns a context that will be tied to the lifecycle of
// the tomb.
func (engine *Engine) scopedContext() (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
return engine.tomb.Context(ctx), cancel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reading the engine.tomb.Context() documentation, wouldn't it make more sense to just use engine.tomb.Context() ?
I guess you're trying to get access to its cancel function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

}

// snapshot returns a snapshot of the current worker state, restricted to those
// workers declared in inputs. It must only be called from the loop goroutine;
// see inside for a detailed discussion of why we took this approach.
func (engine *Engine) snapshot(ctx context.Context, name string, inputs []string) *snapshot {
// We snapshot the resources available at invocation time, rather than adding an
// additional communicate-resource-request channel. The latter approach is not
// unreasonable... but is prone to inelegant scrambles when starting several
Expand Down Expand Up @@ -481,9 +491,10 @@ func (engine *Engine) context(name string, inputs []string, abort <-chan struct{
outputs[resourceName] = engine.manifolds[resourceName].Output
workers[resourceName] = engine.current[resourceName].worker
}
return &context{

return &snapshot{
clientName: name,
abort: abort,
ctx: ctx,
expired: make(chan struct{}),
workers: workers,
outputs: outputs,
Expand All @@ -498,7 +509,7 @@ const (
// runWorker starts the supplied manifold's worker and communicates it back to the
// loop goroutine; waits for worker completion; and communicates any error encountered
// back to the loop goroutine. It must not be run on the loop goroutine.
func (engine *Engine) runWorker(name string, delay time.Duration, start StartFunc, context *context) {
func (engine *Engine) runWorker(name string, delay time.Duration, start StartFunc, snapshot *snapshot) {

startAfterDelay := func() (worker.Worker, error) {
// NOTE: the context will expire *after* the worker is started.
Expand All @@ -507,17 +518,17 @@ func (engine *Engine) runWorker(name string, delay time.Duration, start StartFun
// 2) failing to block them won't cause data races anyway
// 3) it's not worth complicating the interface for every client just
// to eliminate the possibility of one harmlessly dumb interaction.
defer context.expire()
defer snapshot.expire()
engine.config.Logger.Tracef("starting %q manifold worker in %s...", name, delay)
select {
case <-engine.tomb.Dying():
return nil, errAborted
case <-context.Abort():
case <-snapshot.ctx.Done():
return nil, errAborted
case <-engine.config.Clock.After(delay):
}
engine.config.Logger.Tracef("starting %q manifold worker", name)
return start(context)
return start(snapshot.ctx, snapshot)
}

startWorkerAndWait := func() error {
Expand All @@ -536,7 +547,11 @@ func (engine *Engine) runWorker(name string, delay time.Duration, start StartFun
// Doesn't matter whether worker == engine: if we're already Dying
// then cleanly Kill()ing ourselves again won't hurt anything.
worker.Kill()
case engine.started <- startedTicket{name, worker, context.accessLog}:
case engine.started <- startedTicket{
name: name,
worker: worker,
resourceLog: snapshot.accessLog,
}:
engine.config.Logger.Tracef("registered %q manifold worker", name)
}
if worker == engine {
Expand All @@ -552,7 +567,11 @@ func (engine *Engine) runWorker(name string, delay time.Duration, start StartFun
}

// We may or may not send on started, but we *must* send on stopped.
engine.stopped <- stoppedTicket{name, startWorkerAndWait(), context.accessLog}
engine.stopped <- stoppedTicket{
name: name,
err: startWorkerAndWait(),
resourceLog: snapshot.accessLog,
}
}

// gotStarted updates the engine to reflect the creation of a worker. It must
Expand Down Expand Up @@ -716,9 +735,9 @@ func (engine *Engine) requestStop(name string) {

// Update info, kill worker if present, and copy info back to engine.
info.stopping = true
if info.abort != nil {
close(info.abort)
info.abort = nil
if info.cancel != nil {
info.cancel()
info.cancel = nil
}
if info.worker != nil {
info.worker.Kill()
Expand Down Expand Up @@ -768,7 +787,7 @@ func (engine *Engine) bounceDependents(name string) {
type workerInfo struct {
starting bool
stopping bool
abort chan struct{}
cancel func()
worker worker.Worker
err error
resourceLog []resourceAccess
Expand Down Expand Up @@ -823,7 +842,7 @@ type startedTicket struct {
// failure to create) the worker for a particular manifold.
type stoppedTicket struct {
name string
error error
err error
resourceLog []resourceAccess
}

Expand Down
25 changes: 13 additions & 12 deletions dependency/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dependency_test

import (
"context"
"time"

"github.com/juju/clock"
Expand Down Expand Up @@ -166,13 +167,13 @@ func (s *EngineSuite) TestStartGetUndeclaredName(c *gc.C) {
// Install another task with an undeclared dependency on the started task.
done := make(chan struct{})
err = engine.Install("other-task", dependency.Manifold{
Start: func(context dependency.Context) (worker.Worker, error) {
err := context.Get("some-task", nil)
Start: func(ctx context.Context, container dependency.Container) (worker.Worker, error) {
SimonRichardson marked this conversation as resolved.
Show resolved Hide resolved
err := container.Get("some-task", nil)
c.Check(errors.Is(err, dependency.ErrMissing), jc.IsTrue)
c.Check(err, gc.ErrorMatches, `"some-task" not declared: dependency not available`)
close(done)
// Return a real worker so we don't keep restarting and potentially double-closing.
return startMinimalWorker(context)
return startMinimalWorker(container)
},
})
c.Assert(err, jc.ErrorIsNil)
Expand Down Expand Up @@ -208,13 +209,13 @@ func (s *EngineSuite) testStartGet(c *gc.C, outErr error) {
done := make(chan struct{})
err = engine.Install("other-task", dependency.Manifold{
Inputs: []string{"some-task"},
Start: func(context dependency.Context) (worker.Worker, error) {
err := context.Get("some-task", &target)
Start: func(ctx context.Context, container dependency.Container) (worker.Worker, error) {
err := container.Get("some-task", &target)
// Check the result from some-task's Output func matches what we expect.
c.Check(err, gc.Equals, outErr)
close(done)
// Return a real worker so we don't keep restarting and potentially double-closing.
return startMinimalWorker(context)
return startMinimalWorker(container)
},
})
c.Check(err, jc.ErrorIsNil)
Expand All @@ -240,10 +241,10 @@ func (s *EngineSuite) TestStartAbortOnEngineKill(c *gc.C) {
s.fix.run(c, func(engine *dependency.Engine) {
starts := make(chan struct{}, 1000)
manifold := dependency.Manifold{
Start: func(context dependency.Context) (worker.Worker, error) {
Start: func(ctx context.Context, container dependency.Container) (worker.Worker, error) {
starts <- struct{}{}
select {
case <-context.Abort():
case <-ctx.Done():
case <-time.After(testing.LongWait):
c.Errorf("timed out")
}
Expand Down Expand Up @@ -273,10 +274,10 @@ func (s *EngineSuite) TestStartAbortOnDependencyChange(c *gc.C) {
starts := make(chan struct{}, 1000)
manifold := dependency.Manifold{
Inputs: []string{"parent"},
Start: func(context dependency.Context) (worker.Worker, error) {
Start: func(ctx context.Context, container dependency.Container) (worker.Worker, error) {
starts <- struct{}{}
select {
case <-context.Abort():
case <-ctx.Done():
case <-time.After(testing.LongWait):
c.Errorf("timed out")
}
Expand Down Expand Up @@ -495,7 +496,7 @@ func (s *EngineSuite) TestErrMissing(c *gc.C) {
// Start a dependent that always complains ErrMissing.
mh2 := newManifoldHarness("some-task")
manifold := mh2.Manifold()
manifold.Start = func(_ dependency.Context) (worker.Worker, error) {
manifold.Start = func(_ context.Context, _ dependency.Container) (worker.Worker, error) {
mh2.starts <- struct{}{}
return nil, errors.Trace(dependency.ErrMissing)
}
Expand Down Expand Up @@ -594,7 +595,7 @@ func (s *EngineSuite) TestFilterStartError(c *gc.C) {
filterErr := errors.New("mew hiss")

err := engine.Install("task", dependency.Manifold{
Start: func(_ dependency.Context) (worker.Worker, error) {
Start: func(_ context.Context, _ dependency.Container) (worker.Worker, error) {
return nil, startErr
},
Filter: func(in error) error {
Expand Down
19 changes: 7 additions & 12 deletions dependency/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package dependency

import (
"context"

"github.com/juju/errors"

"github.com/juju/worker/v3"
Expand Down Expand Up @@ -81,17 +83,10 @@ type Manifold struct {
// Manifolds conveniently represents several Manifolds.
type Manifolds map[string]Manifold

// Context represents the situation in which a StartFunc is running. A Context should
// not be used outside its StartFunc; attempts to do so will have undefined results.
type Context interface {

// Abort will be closed if the containing engine no longer wants to
// start the manifold's worker. You can ignore Abort if your worker
// will start quickly -- it'll just be shut down immediately, nbd --
// but if you need to mess with channels or long-running operations
// in your StartFunc, Abort lets you do so safely.
Abort() <-chan struct{}

// Container represents the situation in which a StartFunc is running.
// A Container should not be used outside its StartFunc; attempts to do so
// will have undefined results.
type Container interface {
// Get returns an indication of whether a named dependency can be
// satisfied. In particular:
//
Expand All @@ -109,7 +104,7 @@ type Context interface {
// be taken from the supplied GetResourceFunc; if no worker can be started due
// to unmet dependencies, it should return ErrMissing, in which case it will
// not be called again until its declared inputs change.
type StartFunc func(context Context) (worker.Worker, error)
type StartFunc func(context.Context, Container) (worker.Worker, error)

// FilterFunc is an error conversion function for errors returned from workers
// or StartFuncs.
Expand Down
3 changes: 2 additions & 1 deletion dependency/self_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dependency_test

import (
"context"
"fmt"

"github.com/juju/testing"
Expand Down Expand Up @@ -36,7 +37,7 @@ func (s *SelfSuite) TestInputs(c *gc.C) {
func (s *SelfSuite) TestStart(c *gc.C) {
s.fix.run(c, func(engine *dependency.Engine) {
manifold := dependency.SelfManifold(engine)
actual, err := manifold.Start(nil)
actual, err := manifold.Start(context.Background(), nil)
c.Check(err, jc.ErrorIsNil)
c.Check(actual, gc.Equals, engine)
})
Expand Down
Loading