diff --git a/Makefile b/Makefile index 912e360..f811f2b 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -PROJECT := github.com/juju/worker/v3 +PROJECT := github.com/juju/worker/v4 .PHONY: check-licence check-go check diff --git a/catacomb/catacomb.go b/catacomb/catacomb.go index 0d333c7..31a4e2f 100644 --- a/catacomb/catacomb.go +++ b/catacomb/catacomb.go @@ -14,7 +14,7 @@ import ( "github.com/juju/errors" "gopkg.in/tomb.v2" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) // Catacomb is a variant of tomb.Tomb with its own internal goroutine, designed diff --git a/catacomb/catacomb_test.go b/catacomb/catacomb_test.go index 6db34d5..c58b96b 100644 --- a/catacomb/catacomb_test.go +++ b/catacomb/catacomb_test.go @@ -13,8 +13,8 @@ import ( gc "gopkg.in/check.v1" "gopkg.in/tomb.v2" - "github.com/juju/worker/v3" - "github.com/juju/worker/v3/catacomb" + "github.com/juju/worker/v4" + "github.com/juju/worker/v4/catacomb" ) type CatacombSuite struct { diff --git a/catacomb/fixture_test.go b/catacomb/fixture_test.go index 9fa2ed4..0370dd5 100644 --- a/catacomb/fixture_test.go +++ b/catacomb/fixture_test.go @@ -11,8 +11,8 @@ import ( gc "gopkg.in/check.v1" "gopkg.in/tomb.v2" - "github.com/juju/worker/v3" - "github.com/juju/worker/v3/catacomb" + "github.com/juju/worker/v4" + "github.com/juju/worker/v4/catacomb" ) type cleaner interface { diff --git a/dependency/doc.go b/dependency/doc.go index 8b63fa0..8eee909 100644 --- a/dependency/doc.go +++ b/dependency/doc.go @@ -2,42 +2,41 @@ // Licensed under the LGPLv3, see LICENCE file for details. /* - Package dependency exists to address a general problem with shared resources and the management of their lifetimes. Many kinds of software handle these issues with more or less felicity, but it's particularly important that juju (which is a distributed system that needs to be very fault-tolerant) handle them clearly and sanely. -Background +# Background A cursory examination of the various workers run in juju agents (as of 2015-04-20) reveals a distressing range of approaches to the shared resource problem. A sampling of techniques (and their various problems) follows: - * enforce sharing in code structure, either directly via scoping or implicitly + - enforce sharing in code structure, either directly via scoping or implicitly via nested runners (state/api conns; agent config) - * code structure is inflexible, and it enforces strictly nested resource - lifetimes, which are not always adequate. - * just create N of them and hope it works out OK (environs) - * creating N prevents us from, e.g., using a single connection to an environ - and sanely rate-limiting ourselves. - * use filesystem locking across processes (machine execution lock) - * implementation sometimes flakes out, or is used improperly; and multiple - agents *are* a problem anyway, but even if we're all in-process we'll need - some shared machine lock... - * wrap workers to start up only when some condition is met (post-upgrade + - code structure is inflexible, and it enforces strictly nested resource + lifetimes, which are not always adequate. + - just create N of them and hope it works out OK (environs) + - creating N prevents us from, e.g., using a single connection to an environ + and sanely rate-limiting ourselves. + - use filesystem locking across processes (machine execution lock) + - implementation sometimes flakes out, or is used improperly; and multiple + agents *are* a problem anyway, but even if we're all in-process we'll need + some shared machine lock... + - wrap workers to start up only when some condition is met (post-upgrade stability -- itself also a shared resource) - * lifetime-nesting comments apply here again; *and* it makes it harder to - follow the code. - * implement a singleton (lease manager) - * singletons make it *even harder* to figure out what's going on -- they're - basically just fancy globals, and have all the associated problems with, - e.g. deadlocking due to unexpected shutdown order. + - lifetime-nesting comments apply here again; *and* it makes it harder to + follow the code. + - implement a singleton (lease manager) + - singletons make it *even harder* to figure out what's going on -- they're + basically just fancy globals, and have all the associated problems with, + e.g. deadlocking due to unexpected shutdown order. ...but, of course, they all have their various advantages: - * Of the approaches, the first is the most reliable by far. Despite the + - Of the approaches, the first is the most reliable by far. Despite the inflexibility, there's a clear and comprehensible model in play that has yet to cause serious confusion: each worker is created with its resource(s) directly available in code scope, and trusts that it will be restarted by an @@ -45,7 +44,7 @@ sampling of techniques (and their various problems) follows: extremely beneficial and must be preserved; we just need it to be more generally applicable. - * The create-N-Environs approach is valuable because it can be simply (if + - The create-N-Environs approach is valuable because it can be simply (if inelegantly) integrated with its dependent worker, and a changed Environ does not cause the whole dependent to fall over (unless the change is itself bad). The former characteristic is a subtle trap (we shouldn't be baking @@ -55,13 +54,13 @@ sampling of techniques (and their various problems) follows: be unwise to take an approach that led to them being restarted when not necessary. - * The filesystem locking just should not happen -- and we need to integrate the + - The filesystem locking just should not happen -- and we need to integrate the unit and machine agents to eliminate it (and for other reasons too) so we should give some thought to the fact that we'll be shuffling these dependencies around pretty hard in the future. If the approach can make that task easier, then great. - * The singleton is dangerous specifically because its dependency interactions are + - The singleton is dangerous specifically because its dependency interactions are unclear. Absolute clarity of dependencies, as provided by the nesting approaches, is in fact critical; but the sheer convenience of the singleton is alluring, and reminds us that the approach we take must remain easy to use. @@ -77,37 +76,36 @@ However, all of these resources are very different: for a solution that encompas them all, you kinda have to represent them as interface{} at some point, and that's very risky re: clarity. - -Problem +# Problem The package is intended to implement the following developer stories: - * As a developer trying to understand the codebase, I want to know what workers + - As a developer trying to understand the codebase, I want to know what workers are running in an agent at any given time. - * As a developer, I want to be prevented from introducing dependency cycles + - As a developer, I want to be prevented from introducing dependency cycles into my application. - * As a developer, I want to provide a service provided by some worker to one or + - As a developer, I want to provide a service provided by some worker to one or more client workers. - * As a developer, I want to write a service that consumes one or more other + - As a developer, I want to write a service that consumes one or more other workers' services. - * As a developer, I want to choose how I respond to missing dependencies. - * As a developer, I want to be able to inject test doubles for my dependencies. - * As a developer, I want control over how my service is exposed to others. - * As a developer, I don't want to have to typecast my dependencies from + - As a developer, I want to choose how I respond to missing dependencies. + - As a developer, I want to be able to inject test doubles for my dependencies. + - As a developer, I want control over how my service is exposed to others. + - As a developer, I don't want to have to typecast my dependencies from interface{} myself. - * As a developer, I want my service to be restarted if its dependencies change. + - As a developer, I want my service to be restarted if its dependencies change. That last one might bear a little bit of explanation: but I contend that it's the only reliable approach to writing resilient services that compose sanely into a comprehensible system. Consider: - * Juju agents' lifetimes must be assumed to exceed the MTBR of the systems + - Juju agents' lifetimes must be assumed to exceed the MTBR of the systems they're deployed on; you might naively think that hard reboots are "rare"... but they're not. They really are just a feature of the terrain we have to traverse. Therefore every worker *always* has to be capable of picking itself back up from scratch and continuing sanely. That is, we're not imposing a new expectation: we're just working within the existing constraints. - * While some workers are simple, some are decidedly not; when a worker has any + - While some workers are simple, some are decidedly not; when a worker has any more complexity than "none" it is a Bad Idea to mix dependency-management concerns into their core logic: it creates the sort of morass in which subtle bugs thrive. @@ -120,15 +118,14 @@ before they hit users. We'd maybe also like to implement this story: - * As a developer, I want to add and remove groups of workers atomically, e.g. + - As a developer, I want to add and remove groups of workers atomically, e.g. when starting the set of controller workers for a hosted environ; or when starting the set of workers used by a single unit. [NOT DONE] ...but there's no urgent use case yet, and it's not certain to be superior to an engine-nesting approach. - -Solution +# Solution Run a single dependency.Engine at the top level of each agent; express every shared resource, and every worker that uses one, as a dependency.Manifold; and @@ -137,9 +134,9 @@ install them all into the top-level engine. When installed under some name, a dependency.Manifold represents the features of a node in the engine's dependency graph. It lists: - * The names of its dependencies (Inputs). - * How to create the worker representing the resource (Start). - * How (if at all) to expose the resource as a service to other resources that + - The names of its dependencies (Inputs). + - How to create the worker representing the resource (Start). + - How (if at all) to expose the resource as a service to other resources that know it by name (Output). ...and allows the developers of each independent service a common mechanism for @@ -154,56 +151,55 @@ least there's still some observability; but there may also be call to pass actual dependencies down from one engine to another, and that'll demand careful thought. - -Usage +# Usage In each worker package, write a `manifold.go` containing the following: - // ManifoldConfig holds the information necessary to configure the worker - // controlled by a Manifold. - type ManifoldConfig struct { - - // The names of the various dependencies, e.g. - APICallerName string - - // Any other required top-level configuration, e.g. - Period time.Duration - } - - // Manifold returns a manifold that controls the operation of a worker - // responsible for , configured as supplied. - func Manifold(config ManifoldConfig) dependency.Manifold { - // Your code here... - return dependency.Manifold{ - - // * certainly include each of your configured dependency names, - // getResource will only expose them if you declare them here. - Inputs: []string{config.APICallerName, config.MachineLockName}, - - // * certainly include a start func, it will panic if you don't. - Start: func(getResource dependency.GetResourceFunc) (worker.Worker, error) { - // You presumably want to get your dependencies, and you almost - // certainly want to be closed over `config`... - var apicaller base.APICaller - if err := getResource(config.APICallerName, &apicaller); err != nil { - return nil, err - } - return newSomethingWorker(apicaller, config.Period) - }, - - // * output func is not obligatory, and should be skipped if you - // don't know what you'll be exposing or to whom. - // * see `worker/gate`, `worker/util`, and - // `worker/dependency/testing` for examples of output funcs. - // * if you do supply an output func, be sure to document it on the - // Manifold func; for example: - // - // // Manifold exposes Foo and Bar resources, which can be - // // accessed by passing a *Foo or a *Bar in the output - // // parameter of its dependencies' getResouce calls. - Output: nil, - } - } + // ManifoldConfig holds the information necessary to configure the worker + // controlled by a Manifold. + type ManifoldConfig struct { + + // The names of the various dependencies, e.g. + APICallerName string + + // Any other required top-level configuration, e.g. + Period time.Duration + } + + // Manifold returns a manifold that controls the operation of a worker + // responsible for , configured as supplied. + func Manifold(config ManifoldConfig) dependency.Manifold { + // Your code here... + return dependency.Manifold{ + + // * certainly include each of your configured dependency names, + // dependency.Getter will only expose them if you declare them here. + Inputs: []string{config.APICallerName, config.MachineLockName}, + + // * certainly include a start func, it will panic if you don't. + Start: func(ctx context.Context, getter dependency.Getter) (worker.Worker, error) { + // You presumably want to get your dependencies, and you almost + // certainly want to be closed over `config`... + var apicaller base.APICaller + if err := getter.Get(config.APICallerName, &apicaller); err != nil { + return nil, err + } + return newSomethingWorker(apicaller, config.Period) + }, + + // * output func is not obligatory, and should be skipped if you + // don't know what you'll be exposing or to whom. + // * see `worker/gate`, `worker/util`, and + // `worker/dependency/testing` for examples of output funcs. + // * if you do supply an output func, be sure to document it on the + // Manifold func; for example: + // + // // Manifold exposes Foo and Bar resources, which can be + // // accessed by passing a *Foo or a *Bar in the output + // // parameter of its dependencies' getResouce calls. + Output: nil, + } + } ...and take care to construct your manifolds *only* via that function; *all* your dependencies *must* be declared in your ManifoldConfig, and *must* be @@ -214,8 +210,7 @@ consider adding helpers to cmd/jujud/agent/engine, which includes mechanisms for simple definition of manifolds that depend on an API caller; on an agent; or on both. - -Testing +# Testing The `worker/dependency/testing` package, commonly imported as "dt", exposes a `StubResource` that is helpful for testing `Start` funcs in decent isolation, @@ -223,8 +218,7 @@ with mocked dependencies. Tests for `Inputs` and `Output` are generally pretty specific to their precise context and don't seem to benefit much from generalisation. - -Special considerations +# Special considerations The nodes in your *dependency* graph must be acyclic; this does not imply that the *information flow* must be acyclic. Indeed, it is common for separate @@ -250,19 +244,17 @@ particular is effectively just another lock, and it'd be trivial to construct a set of gate-users that can deadlock one another. All the usual considerations when working with locks still apply. - -Concerns and mitigations thereof +# Concerns and mitigations thereof The dependency package will *not* provide the following features: - * Deterministic worker startup. As above, this is a blessing in disguise: if + - Deterministic worker startup. As above, this is a blessing in disguise: if your workers have a problem with this, they're using magical undeclared dependencies and we get to see the inevitable bugs sooner. - * Hand-holding for developers writing Output funcs; the onus is on you to + - Hand-holding for developers writing Output funcs; the onus is on you to document what you expose; produce useful error messages when they supplied with unexpected types via the interface{} param; and NOT to panic. The onus on your clients is only to read your docs and handle the errors you might emit. - */ package dependency diff --git a/dependency/engine.go b/dependency/engine.go index 0a518e1..ce03911 100644 --- a/dependency/engine.go +++ b/dependency/engine.go @@ -4,6 +4,7 @@ package dependency import ( + "context" "math" "math/rand" "strings" @@ -13,7 +14,7 @@ import ( "github.com/juju/errors" "gopkg.in/tomb.v2" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) // Logger represents the various logging methods used by the runner. @@ -36,7 +37,6 @@ type Clock interface { // EngineConfig defines the parameters needed to create a new engine. type EngineConfig struct { - // IsFatal returns true when passed an error that should stop // the engine. It must not be nil. IsFatal IsFatalFunc @@ -154,7 +154,6 @@ func NewEngine(config EngineConfig) (*Engine, error) { // Engine maintains workers corresponding to its installed manifolds, and // restarts them whenever their inputs change. type Engine struct { - // config contains values passed in as config when the engine was created. config EngineConfig @@ -217,7 +216,7 @@ func (engine *Engine) loop() error { case ticket := <-engine.started: engine.gotStarted(ticket.name, ticket.worker, ticket.resourceLog) case ticket := <-engine.stopped: - engine.gotStopped(ticket.name, ticket.error, ticket.resourceLog) + engine.gotStopped(ticket.name, ticket.err, ticket.resourceLog) } if engine.isDying() { if engine.allOthersStopped() { @@ -399,12 +398,17 @@ func (engine *Engine) requestStart(name string, delay time.Duration) { // ...then update the info, copy it back to the engine, and start a worker // goroutine based on current known state. + + // Create a context for the worker, this will allow the cancellation of the + // worker if the engine is shutting down. + ctx, cancel := engine.scopedContext() + info.starting = true info.startAttempts++ info.err = nil - info.abort = make(chan struct{}) + info.cancel = cancel engine.current[name] = info - context := engine.context(name, manifold.Inputs, info.abort) + snapshot := engine.snapshot(ctx, name, manifold.Inputs) // Always fuzz the delay a bit to help randomise the order of workers starting, // which should make bugs more obvious @@ -425,14 +429,20 @@ func (engine *Engine) requestStart(name string, delay time.Duration) { delay = time.Duration(float64(delay) * fuzz).Round(time.Millisecond) } - go engine.runWorker(name, delay, manifold.Start, context) + go engine.runWorker(name, delay, manifold.Start, snapshot) +} + +// scopedContext returns a context that will be tied to the lifecycle of +// the tomb. +func (engine *Engine) scopedContext() (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + return engine.tomb.Context(ctx), cancel } -// context returns a context backed by a snapshot of current -// worker state, restricted to those workers declared in inputs. It must only -// be called from the loop goroutine; see inside for a detailed discussion of -// why we took this approach. -func (engine *Engine) context(name string, inputs []string, abort <-chan struct{}) *context { +// snapshot returns a snapshot of the current worker state, restricted to those +// workers declared in inputs. It must only be called from the loop goroutine; +// see inside for a detailed discussion of why we took this approach. +func (engine *Engine) snapshot(ctx context.Context, name string, inputs []string) *snapshot { // We snapshot the resources available at invocation time, rather than adding an // additional communicate-resource-request channel. The latter approach is not // unreasonable... but is prone to inelegant scrambles when starting several @@ -481,9 +491,10 @@ func (engine *Engine) context(name string, inputs []string, abort <-chan struct{ outputs[resourceName] = engine.manifolds[resourceName].Output workers[resourceName] = engine.current[resourceName].worker } - return &context{ + + return &snapshot{ clientName: name, - abort: abort, + ctx: ctx, expired: make(chan struct{}), workers: workers, outputs: outputs, @@ -498,8 +509,7 @@ const ( // runWorker starts the supplied manifold's worker and communicates it back to the // loop goroutine; waits for worker completion; and communicates any error encountered // back to the loop goroutine. It must not be run on the loop goroutine. -func (engine *Engine) runWorker(name string, delay time.Duration, start StartFunc, context *context) { - +func (engine *Engine) runWorker(name string, delay time.Duration, start StartFunc, snapshot *snapshot) { startAfterDelay := func() (worker.Worker, error) { // NOTE: the context will expire *after* the worker is started. // This is tolerable because @@ -507,17 +517,17 @@ func (engine *Engine) runWorker(name string, delay time.Duration, start StartFun // 2) failing to block them won't cause data races anyway // 3) it's not worth complicating the interface for every client just // to eliminate the possibility of one harmlessly dumb interaction. - defer context.expire() + defer snapshot.expire() engine.config.Logger.Tracef("starting %q manifold worker in %s...", name, delay) select { case <-engine.tomb.Dying(): return nil, errAborted - case <-context.Abort(): + case <-snapshot.ctx.Done(): return nil, errAborted case <-engine.config.Clock.After(delay): } engine.config.Logger.Tracef("starting %q manifold worker", name) - return start(context) + return start(snapshot.ctx, snapshot) } startWorkerAndWait := func() error { @@ -536,7 +546,11 @@ func (engine *Engine) runWorker(name string, delay time.Duration, start StartFun // Doesn't matter whether worker == engine: if we're already Dying // then cleanly Kill()ing ourselves again won't hurt anything. worker.Kill() - case engine.started <- startedTicket{name, worker, context.accessLog}: + case engine.started <- startedTicket{ + name: name, + worker: worker, + resourceLog: snapshot.accessLog, + }: engine.config.Logger.Tracef("registered %q manifold worker", name) } if worker == engine { @@ -552,7 +566,11 @@ func (engine *Engine) runWorker(name string, delay time.Duration, start StartFun } // We may or may not send on started, but we *must* send on stopped. - engine.stopped <- stoppedTicket{name, startWorkerAndWait(), context.accessLog} + engine.stopped <- stoppedTicket{ + name: name, + err: startWorkerAndWait(), + resourceLog: snapshot.accessLog, + } } // gotStarted updates the engine to reflect the creation of a worker. It must @@ -716,9 +734,9 @@ func (engine *Engine) requestStop(name string) { // Update info, kill worker if present, and copy info back to engine. info.stopping = true - if info.abort != nil { - close(info.abort) - info.abort = nil + if info.cancel != nil { + info.cancel() + info.cancel = nil } if info.worker != nil { info.worker.Kill() @@ -768,7 +786,7 @@ func (engine *Engine) bounceDependents(name string) { type workerInfo struct { starting bool stopping bool - abort chan struct{} + cancel func() worker worker.Worker err error resourceLog []resourceAccess @@ -823,7 +841,7 @@ type startedTicket struct { // failure to create) the worker for a particular manifold. type stoppedTicket struct { name string - error error + err error resourceLog []resourceAccess } diff --git a/dependency/engine_test.go b/dependency/engine_test.go index bd96208..c7e3629 100644 --- a/dependency/engine_test.go +++ b/dependency/engine_test.go @@ -4,6 +4,7 @@ package dependency_test import ( + "context" "time" "github.com/juju/clock" @@ -14,9 +15,9 @@ import ( jc "github.com/juju/testing/checkers" gc "gopkg.in/check.v1" - "github.com/juju/worker/v3" - "github.com/juju/worker/v3/dependency" - "github.com/juju/worker/v3/workertest" + "github.com/juju/worker/v4" + "github.com/juju/worker/v4/dependency" + "github.com/juju/worker/v4/workertest" ) type EngineSuite struct { @@ -166,13 +167,13 @@ func (s *EngineSuite) TestStartGetUndeclaredName(c *gc.C) { // Install another task with an undeclared dependency on the started task. done := make(chan struct{}) err = engine.Install("other-task", dependency.Manifold{ - Start: func(context dependency.Context) (worker.Worker, error) { - err := context.Get("some-task", nil) + Start: func(ctx context.Context, container dependency.Getter) (worker.Worker, error) { + err := container.Get("some-task", nil) c.Check(errors.Is(err, dependency.ErrMissing), jc.IsTrue) c.Check(err, gc.ErrorMatches, `"some-task" not declared: dependency not available`) close(done) // Return a real worker so we don't keep restarting and potentially double-closing. - return startMinimalWorker(context) + return startMinimalWorker(container) }, }) c.Assert(err, jc.ErrorIsNil) @@ -208,13 +209,13 @@ func (s *EngineSuite) testStartGet(c *gc.C, outErr error) { done := make(chan struct{}) err = engine.Install("other-task", dependency.Manifold{ Inputs: []string{"some-task"}, - Start: func(context dependency.Context) (worker.Worker, error) { - err := context.Get("some-task", &target) + Start: func(ctx context.Context, container dependency.Getter) (worker.Worker, error) { + err := container.Get("some-task", &target) // Check the result from some-task's Output func matches what we expect. c.Check(err, gc.Equals, outErr) close(done) // Return a real worker so we don't keep restarting and potentially double-closing. - return startMinimalWorker(context) + return startMinimalWorker(container) }, }) c.Check(err, jc.ErrorIsNil) @@ -240,10 +241,10 @@ func (s *EngineSuite) TestStartAbortOnEngineKill(c *gc.C) { s.fix.run(c, func(engine *dependency.Engine) { starts := make(chan struct{}, 1000) manifold := dependency.Manifold{ - Start: func(context dependency.Context) (worker.Worker, error) { + Start: func(ctx context.Context, container dependency.Getter) (worker.Worker, error) { starts <- struct{}{} select { - case <-context.Abort(): + case <-ctx.Done(): case <-time.After(testing.LongWait): c.Errorf("timed out") } @@ -273,10 +274,10 @@ func (s *EngineSuite) TestStartAbortOnDependencyChange(c *gc.C) { starts := make(chan struct{}, 1000) manifold := dependency.Manifold{ Inputs: []string{"parent"}, - Start: func(context dependency.Context) (worker.Worker, error) { + Start: func(ctx context.Context, container dependency.Getter) (worker.Worker, error) { starts <- struct{}{} select { - case <-context.Abort(): + case <-ctx.Done(): case <-time.After(testing.LongWait): c.Errorf("timed out") } @@ -495,7 +496,7 @@ func (s *EngineSuite) TestErrMissing(c *gc.C) { // Start a dependent that always complains ErrMissing. mh2 := newManifoldHarness("some-task") manifold := mh2.Manifold() - manifold.Start = func(_ dependency.Context) (worker.Worker, error) { + manifold.Start = func(_ context.Context, _ dependency.Getter) (worker.Worker, error) { mh2.starts <- struct{}{} return nil, errors.Trace(dependency.ErrMissing) } @@ -594,7 +595,7 @@ func (s *EngineSuite) TestFilterStartError(c *gc.C) { filterErr := errors.New("mew hiss") err := engine.Install("task", dependency.Manifold{ - Start: func(_ dependency.Context) (worker.Worker, error) { + Start: func(_ context.Context, _ dependency.Getter) (worker.Worker, error) { return nil, startErr }, Filter: func(in error) error { diff --git a/dependency/interface.go b/dependency/interface.go index b1fadad..691cedb 100644 --- a/dependency/interface.go +++ b/dependency/interface.go @@ -4,9 +4,11 @@ package dependency import ( + "context" + "github.com/juju/errors" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) // Manifold defines the behaviour of a node in an Engine's dependency graph. It's @@ -81,17 +83,11 @@ type Manifold struct { // Manifolds conveniently represents several Manifolds. type Manifolds map[string]Manifold -// Context represents the situation in which a StartFunc is running. A Context should -// not be used outside its StartFunc; attempts to do so will have undefined results. -type Context interface { - - // Abort will be closed if the containing engine no longer wants to - // start the manifold's worker. You can ignore Abort if your worker - // will start quickly -- it'll just be shut down immediately, nbd -- - // but if you need to mess with channels or long-running operations - // in your StartFunc, Abort lets you do so safely. - Abort() <-chan struct{} - +// Getter represents a way to request named dependencies from within a +// StartFunc. +// A Getter should not be used outside its StartFunc; attempts to do so +// will have undefined results. +type Getter interface { // Get returns an indication of whether a named dependency can be // satisfied. In particular: // @@ -109,7 +105,7 @@ type Context interface { // be taken from the supplied GetResourceFunc; if no worker can be started due // to unmet dependencies, it should return ErrMissing, in which case it will // not be called again until its declared inputs change. -type StartFunc func(context Context) (worker.Worker, error) +type StartFunc func(context.Context, Getter) (worker.Worker, error) // FilterFunc is an error conversion function for errors returned from workers // or StartFuncs. diff --git a/dependency/reporter_test.go b/dependency/reporter_test.go index e4679f6..f898e31 100644 --- a/dependency/reporter_test.go +++ b/dependency/reporter_test.go @@ -11,8 +11,8 @@ import ( jc "github.com/juju/testing/checkers" gc "gopkg.in/check.v1" - "github.com/juju/worker/v3/dependency" - "github.com/juju/worker/v3/workertest" + "github.com/juju/worker/v4/dependency" + "github.com/juju/worker/v4/workertest" ) type ReportSuite struct { diff --git a/dependency/self_test.go b/dependency/self_test.go index bcc70ba..7e24c54 100644 --- a/dependency/self_test.go +++ b/dependency/self_test.go @@ -4,14 +4,15 @@ package dependency_test import ( + "context" "fmt" "github.com/juju/testing" jc "github.com/juju/testing/checkers" gc "gopkg.in/check.v1" - "github.com/juju/worker/v3/dependency" - "github.com/juju/worker/v3/workertest" + "github.com/juju/worker/v4/dependency" + "github.com/juju/worker/v4/workertest" ) type SelfSuite struct { @@ -36,7 +37,7 @@ func (s *SelfSuite) TestInputs(c *gc.C) { func (s *SelfSuite) TestStart(c *gc.C) { s.fix.run(c, func(engine *dependency.Engine) { manifold := dependency.SelfManifold(engine) - actual, err := manifold.Start(nil) + actual, err := manifold.Start(context.Background(), nil) c.Check(err, jc.ErrorIsNil) c.Check(actual, gc.Equals, engine) }) diff --git a/dependency/context.go b/dependency/snapshot.go similarity index 54% rename from dependency/context.go rename to dependency/snapshot.go index c6661d4..915f652 100644 --- a/dependency/context.go +++ b/dependency/snapshot.go @@ -4,23 +4,24 @@ package dependency import ( + "context" "fmt" "github.com/juju/errors" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) -// context encapsulates a snapshot of workers and output funcs and implements Context. -type context struct { +// snapshot encapsulates a snapshot of workers and output funcs and implements +// Container. +type snapshot struct { + // ctx represents the context in which the snapshot was taken. + ctx context.Context // clientName is the name of the manifold for whose convenience this exists. clientName string - // abort is closed when the worker being started is no longer required. - abort <-chan struct{} - - // expired is closed when the context should no longer be used. + // expired is closed when the scope should no longer be used. expired chan struct{} // workers holds the snapshot of manifold workers. @@ -33,23 +34,19 @@ type context struct { // encountered. It does not include requests made after expiry. accessLog []resourceAccess + // logger is used to pass the logger to the workers. logger Logger } -// Abort is part of the Context interface. -func (ctx *context) Abort() <-chan struct{} { - return ctx.abort -} - // Get is part of the Context interface. -func (ctx *context) Get(resourceName string, out interface{}) error { - ctx.logger.Tracef("%q manifold requested %q resource", ctx.clientName, resourceName) +func (s *snapshot) Get(resourceName string, out interface{}) error { + s.logger.Tracef("%q manifold requested %q resource", s.clientName, resourceName) select { - case <-ctx.expired: + case <-s.expired: return errors.New("expired context: cannot be used outside Start func") default: - err := ctx.rawAccess(resourceName, out) - ctx.accessLog = append(ctx.accessLog, resourceAccess{ + err := s.rawAccess(resourceName, out) + s.accessLog = append(s.accessLog, resourceAccess{ name: resourceName, as: fmt.Sprintf("%T", out), err: err, @@ -59,13 +56,13 @@ func (ctx *context) Get(resourceName string, out interface{}) error { } // expire closes the expired channel. Calling it more than once will panic. -func (ctx *context) expire() { - close(ctx.expired) +func (s *snapshot) expire() { + close(s.expired) } // rawAccess is a GetResourceFunc that neither checks enpiry nor records access. -func (ctx *context) rawAccess(resourceName string, out interface{}) error { - input, found := ctx.workers[resourceName] +func (s *snapshot) rawAccess(resourceName string, out interface{}) error { + input, found := s.workers[resourceName] if !found { return errors.Annotatef(ErrMissing, "%q not declared", resourceName) } else if input == nil { @@ -75,7 +72,7 @@ func (ctx *context) rawAccess(resourceName string, out interface{}) error { // No conversion necessary, just an exist check. return nil } - convert := ctx.outputs[resourceName] + convert := s.outputs[resourceName] if convert == nil { return errors.Annotatef(ErrMissing, "%q not exposed", resourceName) } @@ -94,24 +91,3 @@ type resourceAccess struct { // err is any error returned from rawAccess. err error } - -// report returns a convenient representation of ra. -func (ra resourceAccess) report() map[string]interface{} { - report := map[string]interface{}{ - KeyName: ra.name, - KeyType: ra.as, - } - if ra.err != nil { - report[KeyError] = ra.err.Error() - } - return report -} - -// resourceLogReport returns a convenient representation of accessLog. -func resourceLogReport(accessLog []resourceAccess) []map[string]interface{} { - result := make([]map[string]interface{}, len(accessLog)) - for i, access := range accessLog { - result[i] = access.report() - } - return result -} diff --git a/dependency/testing/stub.go b/dependency/testing/stub.go index 385cf43..62d0065 100644 --- a/dependency/testing/stub.go +++ b/dependency/testing/stub.go @@ -10,7 +10,7 @@ import ( "github.com/juju/errors" - "github.com/juju/worker/v3/dependency" + "github.com/juju/worker/v4/dependency" ) // NewStubResource creates a single StubResource with the given @@ -51,7 +51,7 @@ func NewStubResources(raw map[string]interface{}) StubResources { type StubResources map[string]StubResource // Context returns a dependency.Context that never aborts, backed by resources. -func (resources StubResources) Context() dependency.Context { +func (resources StubResources) Context() dependency.Getter { return &Context{ resources: resources, } diff --git a/dependency/util.go b/dependency/util.go index 7d226ac..1af290c 100644 --- a/dependency/util.go +++ b/dependency/util.go @@ -4,9 +4,11 @@ package dependency import ( + "context" + "github.com/juju/errors" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) // Installer exposes an Engine's Install method. @@ -82,7 +84,7 @@ func (v validator) visit(node string) error { // may have surprising effects. func SelfManifold(engine *Engine) Manifold { return Manifold{ - Start: func(_ Context) (worker.Worker, error) { + Start: func(_ context.Context, _ Getter) (worker.Worker, error) { return engine, nil }, Output: func(in worker.Worker, out interface{}) error { diff --git a/dependency/util_test.go b/dependency/util_test.go index 586c8f6..c449c47 100644 --- a/dependency/util_test.go +++ b/dependency/util_test.go @@ -4,6 +4,7 @@ package dependency_test import ( + "context" "time" "github.com/juju/clock" @@ -13,9 +14,9 @@ import ( gc "gopkg.in/check.v1" "gopkg.in/tomb.v2" - "github.com/juju/worker/v3" - "github.com/juju/worker/v3/dependency" - "github.com/juju/worker/v3/workertest" + "github.com/juju/worker/v4" + "github.com/juju/worker/v4/dependency" + "github.com/juju/worker/v4/workertest" ) type engineFixture struct { @@ -124,13 +125,13 @@ func (mh *manifoldHarness) Manifold() dependency.Manifold { } } -func (mh *manifoldHarness) start(context dependency.Context) (worker.Worker, error) { +func (mh *manifoldHarness) start(ctx context.Context, container dependency.Getter) (worker.Worker, error) { mh.startAttempts <- struct{}{} if mh.startError != nil { return nil, mh.startError } for _, resourceName := range mh.inputs { - if err := context.Get(resourceName, nil); err != nil { + if err := container.Get(resourceName, nil); err != nil { if mh.requireResources { return nil, err } @@ -220,7 +221,7 @@ func (w *minimalWorker) Report() map[string]interface{} { } } -func startMinimalWorker(_ dependency.Context) (worker.Worker, error) { +func startMinimalWorker(_ dependency.Getter) (worker.Worker, error) { w := &minimalWorker{} w.tomb.Go(func() error { <-w.tomb.Dying() diff --git a/go.mod b/go.mod index f924f4a..398fe99 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ -module github.com/juju/worker/v3 +module github.com/juju/worker/v4 -go 1.19 +go 1.20 require ( github.com/juju/clock v0.0.0-20220203021603-d9deb868a28a diff --git a/runner_test.go b/runner_test.go index cce1ca2..2f4a35e 100644 --- a/runner_test.go +++ b/runner_test.go @@ -17,7 +17,7 @@ import ( gc "gopkg.in/check.v1" "gopkg.in/tomb.v2" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) type RunnerSuite struct { diff --git a/worker_test.go b/worker_test.go index 9cf1602..f3a4b06 100644 --- a/worker_test.go +++ b/worker_test.go @@ -10,7 +10,7 @@ import ( "github.com/juju/testing" gc "gopkg.in/check.v1" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) type WorkerSuite struct { diff --git a/workertest/check.go b/workertest/check.go index 755347e..605ab06 100644 --- a/workertest/check.go +++ b/workertest/check.go @@ -11,7 +11,7 @@ import ( jc "github.com/juju/testing/checkers" gc "gopkg.in/check.v1" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) var ( @@ -71,9 +71,9 @@ func CheckKill(c *gc.C, w worker.Worker) error { // CleanKill calls CheckKill with the supplied arguments, and Checks that the // returned error is nil. It's particularly suitable for deferring: // -// someWorker, err := some.NewWorker() -// c.Assert(err, jc.ErrorIsNil) -// defer workertest.CleanKill(c, someWorker) +// someWorker, err := some.NewWorker() +// c.Assert(err, jc.ErrorIsNil) +// defer workertest.CleanKill(c, someWorker) // // ...in the large number (majority?) of situations where a worker is expected // to run successfully; and it doesn't Assert, and is therefore suitable for use @@ -86,9 +86,9 @@ func CleanKill(c *gc.C, w worker.Worker) { // DirtyKill calls CheckKill with the supplied arguments, and logs the returned // error. It's particularly suitable for deferring: // -// someWorker, err := some.NewWorker() -// c.Assert(err, jc.ErrorIsNil) -// defer workertest.DirtyKill(c, someWorker) +// someWorker, err := some.NewWorker() +// c.Assert(err, jc.ErrorIsNil) +// defer workertest.DirtyKill(c, someWorker) // // ...in the cases where we expect a worker to fail, but aren't specifically // testing that failure; and it doesn't Assert, and is therefore suitable for @@ -104,9 +104,9 @@ func DirtyKill(c *gc.C, w worker.Worker) { // and tries to stop the (non-nil) worker via CleanKill(). It's suitable // for testing constructor failure: // -// someWorker, err := some.NewWorker(badConfig) -// workertest.CheckNilOrKill(c, someWorker) -// c.Check(err, ... +// someWorker, err := some.NewWorker(badConfig) +// workertest.CheckNilOrKill(c, someWorker) +// c.Check(err, ... // // ...because it will do the right thing if your constructor succeeds // unexpectedly, and make every effort to prevent a rogue worker living diff --git a/workertest/fake_watcher.go b/workertest/fake_watcher.go index 66a0b8e..26b1bba 100644 --- a/workertest/fake_watcher.go +++ b/workertest/fake_watcher.go @@ -6,7 +6,7 @@ package workertest import ( "errors" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) type NotAWatcher struct { diff --git a/workertest/workers.go b/workertest/workers.go index f575484..36355a6 100644 --- a/workertest/workers.go +++ b/workertest/workers.go @@ -6,7 +6,7 @@ package workertest import ( "gopkg.in/tomb.v2" - "github.com/juju/worker/v3" + "github.com/juju/worker/v4" ) // NewErrorWorker returns a Worker that runs until Kill()ed; at which point it diff --git a/workertest/workertest_test.go b/workertest/workertest_test.go index 48001e0..67c7e36 100644 --- a/workertest/workertest_test.go +++ b/workertest/workertest_test.go @@ -10,7 +10,7 @@ import ( "github.com/juju/testing" gc "gopkg.in/check.v1" - "github.com/juju/worker/v3/workertest" + "github.com/juju/worker/v4/workertest" ) type Suite struct {