From e7dcdbbc355166beff04eda989bf791f6425444e Mon Sep 17 00:00:00 2001 From: "Deomid \"rojer\" Ryabkov" Date: Thu, 10 Dec 2020 10:24:03 +0000 Subject: [PATCH] New test runner implementation Simplified, added support for serializing state and resuming. Not used yet but will be. --- pkg/cerrors/cerrors.go | 55 ++ pkg/config/timeouts.go | 29 +- pkg/event/testevent/test.go | 16 +- pkg/jobmanager/jobmanager.go | 2 +- pkg/pluginregistry/bundles.go | 2 +- pkg/runner/job_runner.go | 13 +- pkg/runner/test_runner.go | 882 +++++++++++++----- pkg/runner/test_runner_pipeline.go | 542 ----------- pkg/runner/test_runner_route.go | 302 ------ pkg/runner/test_runner_route_test.go | 400 -------- pkg/runner/test_runner_test.go | 548 +++++++++++ pkg/runner/wait_for_first_target.go | 58 -- pkg/runner/wait_for_first_target_test.go | 142 --- pkg/statectx/context.go | 31 +- pkg/storage/storage.go | 1 + pkg/target/target.go | 7 +- plugins/storage/memory/memory.go | 2 +- plugins/teststeps/example/example.go | 142 ++- plugins/teststeps/teststeps.go | 11 +- tests/integ/jobmanager/common.go | 1 - tests/integ/test/testrunner_test.go | 218 +---- .../teststeps/badtargets/badtargets.go | 79 ++ tests/plugins/teststeps/channels/channels.go | 6 +- tests/plugins/teststeps/hanging/hanging.go | 2 +- tests/plugins/teststeps/noreturn/noreturn.go | 2 +- 25 files changed, 1517 insertions(+), 1976 deletions(-) delete mode 100644 pkg/runner/test_runner_pipeline.go delete mode 100644 pkg/runner/test_runner_route.go delete mode 100644 pkg/runner/test_runner_route_test.go create mode 100644 pkg/runner/test_runner_test.go delete mode 100644 pkg/runner/wait_for_first_target.go delete mode 100644 pkg/runner/wait_for_first_target_test.go create mode 100644 tests/plugins/teststeps/badtargets/badtargets.go diff --git a/pkg/cerrors/cerrors.go b/pkg/cerrors/cerrors.go index 4fa3211e..d0a1dfdd 100644 --- a/pkg/cerrors/cerrors.go +++ b/pkg/cerrors/cerrors.go @@ -43,6 +43,17 @@ func (e *ErrTestStepsNeverReturned) Error() string { return fmt.Sprintf("test step [%s] did not return", strings.Join(e.StepNames, ", ")) } +// ErrTestTargetInjectionTimedOut indicates that test step did not ingest a target +// within allotted time. +type ErrTestTargetInjectionTimedOut struct { + StepName string +} + +// Error returns the error string associated with the error +func (e *ErrTestTargetInjectionTimedOut) Error() string { + return fmt.Sprintf("test step %v failed to ingest a target", e.StepName) +} + // ErrTestStepClosedChannels indicates that the test step returned after // closing its output channels, which constitutes an API violation type ErrTestStepClosedChannels struct { @@ -53,3 +64,47 @@ type ErrTestStepClosedChannels struct { func (e *ErrTestStepClosedChannels) Error() string { return fmt.Sprintf("test step %v closed output channels (api violation)", e.StepName) } + +// ErrTestStepPaniced indicates that a test step's method paniced. +type ErrTestStepPaniced struct { + StepName string + StackTrace string +} + +// Error returns the error string associated with the error +func (e *ErrTestStepPaniced) Error() string { + return fmt.Sprintf("test step %s paniced, trace: %q", e.StepName, e.StackTrace) +} + +// ErrTestStepReturnedDuplicateResult indicates that a test step's method paniced. +type ErrTestStepReturnedDuplicateResult struct { + StepName string + Target string +} + +// Error returns the error string associated with the error +func (e *ErrTestStepReturnedDuplicateResult) Error() string { + return fmt.Sprintf("test step %s returned duplicate result for %s", e.StepName, e.Target) +} + +// ErrTestStepReturnedUnexpectedResult indicates that a test step's method paniced. +type ErrTestStepReturnedUnexpectedResult struct { + StepName string + Target string +} + +// Error returns the error string associated with the error +func (e *ErrTestStepReturnedUnexpectedResult) Error() string { + return fmt.Sprintf("test step %s returned unexpected result for %s", e.StepName, e.Target) +} + +// ErrTestStepLostTargets indicates that targets have been lost during test run. +type ErrTestStepLostTargets struct { + StepName string + Target string +} + +// Error returns the error string associated with the error +func (e *ErrTestStepLostTargets) Error() string { + return fmt.Sprintf("test step %s lost target %s", e.StepName, e.Target) +} diff --git a/pkg/config/timeouts.go b/pkg/config/timeouts.go index 0fb66f58..5a06a8e8 100644 --- a/pkg/config/timeouts.go +++ b/pkg/config/timeouts.go @@ -12,34 +12,13 @@ import "time" var TargetManagerTimeout = 5 * time.Minute // StepInjectTimeout represents the maximum time that TestRunner will wait for -// the first TestStep of the pipeline to accept a Target +// a TestStep to accept a Target var StepInjectTimeout = 30 * time.Second -// TestRunnerMsgTimeout represents the maximum time that any component of the -// TestRunner will wait for the delivery of a message to any other subsystem -// of the TestRunner -var TestRunnerMsgTimeout = 5 * time.Second - -// TestRunnerShutdownTimeout represents the maximum time that the TestRunner will -// wait for all the TestStep to complete after a cancellation signal has been -// delivered - -// TestRunnerShutdownTimeout controls a block of the TestRunner which works as a -// watchdog, i.e. if there are multiple steps that need to return, the timeout is -// reset every time a step returns. The timeout should be handled so that it -// doesn't reset when a TestStep returns. -var TestRunnerShutdownTimeout = 30 * time.Second - -// TestRunnerStepShutdownTimeout represents the maximum time that the TestRunner +// TestRunnerShutdownTimeout represents the maximum time that the TestRunner // will wait for all TestSteps to complete after all Targets have reached the end -// of the pipeline. This timeout is only relevant if a cancellation signal is *not* -// delivered. - -// TestRunnerStepShutdownTimeout controls a block of the TestRunner which worksas -// a watchdog, i.e. if there are multiple steps that need to return, the timeout -// is reset every time a step returns. The timeout should be handled so that it -// doesn't reset when a TestStep returns. -var TestRunnerStepShutdownTimeout = 5 * time.Second +// of the pipeline. +var TestRunnerShutdownTimeout = 30 * time.Second // LockRefreshTimeout is the amount of time by which a target lock is extended // periodically while a job is running. diff --git a/pkg/event/testevent/test.go b/pkg/event/testevent/test.go index 5d28c1b8..3baa3232 100644 --- a/pkg/event/testevent/test.go +++ b/pkg/event/testevent/test.go @@ -16,7 +16,7 @@ import ( "github.com/facebookincubator/contest/pkg/types" ) -// Header models the header of a test event, which consists in metadatat hat defines the +// Header models the header of a test event, which consists in metadata that defines the // emitter of the events. The Header is under ConTest control and cannot be manipulated // by the TestStep type Header struct { @@ -28,8 +28,8 @@ type Header struct { // Data models the data of a test event. It is populated by the TestStep type Data struct { - EventName event.Name Target *target.Target + EventName event.Name Payload *json.RawMessage } @@ -152,3 +152,15 @@ type EmitterFetcher interface { Emitter Fetcher } + +func (h *Header) String() string { + return fmt.Sprintf("[%d %d %s %s]", h.JobID, h.RunID, h.TestName, h.TestStepLabel) +} + +func (d *Data) String() string { + ps := "" + if d.Payload != nil { + ps = fmt.Sprintf(" %q", d.Payload) //nolint SA5009 - works fine + } + return fmt.Sprintf("[%s %s%s]", d.Target, d.EventName, ps) +} diff --git a/pkg/jobmanager/jobmanager.go b/pkg/jobmanager/jobmanager.go index c67adfb3..a06fd120 100644 --- a/pkg/jobmanager/jobmanager.go +++ b/pkg/jobmanager/jobmanager.go @@ -158,7 +158,7 @@ func newPartialJobFromDescriptor(pr *pluginregistry.PluginRegistry, jd *job.JobD } // test step index is incremented by 1 so we can use 0 to signal an // anomaly. - tsb, err := pr.NewTestStepBundle(*testStepDesc, uint(idx)+1, tse) + tsb, err := pr.NewTestStepBundle(*testStepDesc, tse) if err != nil { return nil, fmt.Errorf("NewTestStepBundle for test step '%s' with index %d failed: %w", testStepDesc.Name, idx, err) } diff --git a/pkg/pluginregistry/bundles.go b/pkg/pluginregistry/bundles.go index 91038ce4..4a18a146 100644 --- a/pkg/pluginregistry/bundles.go +++ b/pkg/pluginregistry/bundles.go @@ -15,7 +15,7 @@ import ( ) // NewTestStepBundle creates a TestStepBundle from a TestStepDescriptor -func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, stepIndex uint, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) { +func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) { testStep, err := r.NewTestStep(testStepDescriptor.Name) if err != nil { return nil, fmt.Errorf("could not get the desired TestStep (%s): %v", testStepDescriptor.Name, err) diff --git a/pkg/runner/job_runner.go b/pkg/runner/job_runner.go index 94b18c7a..efb0c4a9 100644 --- a/pkg/runner/job_runner.go +++ b/pkg/runner/job_runner.go @@ -17,6 +17,7 @@ import ( "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/job" "github.com/facebookincubator/contest/pkg/logging" + "github.com/facebookincubator/contest/pkg/statectx" "github.com/facebookincubator/contest/pkg/storage" "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/types" @@ -157,9 +158,8 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) { if err := tl.Unlock(j.ID, targets); err != nil { jobLog.Warningf("Failed to unlock targets (%v) for job ID %d: %v", targets, j.ID, err) } - case <-j.StateCtx.Paused(): - jobLog.Debugf("Received pause request, NOT releasing targets so the job can be resumed") return + // Ignore the pause signal, continue to refresh targets. case <-done: if err := tl.Unlock(j.ID, targets); err != nil { jobLog.Warningf("Failed to unlock %d target(s) (%v): %v", len(targets), targets, err) @@ -183,7 +183,14 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) { if runErr = jr.emitAcquiredTargets(testEventEmitter, targets); runErr == nil { jobLog.Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets", run+1, idx, j.Name, j.ID, len(targets)) testRunner := NewTestRunner() - runErr = testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1)) + var resumeState []byte + resumeState, err := testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1), resumeState) + if err == statectx.ErrPaused { + jobLog.Debugf("Runner paused, state: %s", string(resumeState)) + // TODO(rojer): Persist the state. + } else { + runErr = err + } } // Job is done, release all the targets diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index b37bfb4d..f3a5b8d3 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -1,287 +1,753 @@ -// Copyright (c) Facebook, Inc. and its affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - package runner import ( - "context" + "encoding/json" "fmt" + "runtime/debug" + "sync" "time" + "github.com/sirupsen/logrus" + "github.com/facebookincubator/contest/pkg/cerrors" "github.com/facebookincubator/contest/pkg/config" + "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/logging" "github.com/facebookincubator/contest/pkg/statectx" + "github.com/facebookincubator/contest/pkg/storage" "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/test" "github.com/facebookincubator/contest/pkg/types" - "github.com/sirupsen/logrus" ) -// TestRunnerTimeouts collects all the timeouts values that the test runner uses -type TestRunnerTimeouts struct { - StepInjectTimeout time.Duration - MessageTimeout time.Duration - ShutdownTimeout time.Duration - StepShutdownTimeout time.Duration +// TestRunner is the interface test runner implements +type TestRunner interface { + // ErrPaused will be returned when runner was able to pause successfully. + // When this error is returned from Run(), it is accompanied by valid serialized state + // that can be passed later to Run() to continue from where we left off. + Run(ctx statectx.Context, test *test.Test, targets []*target.Target, jobID types.JobID, runID types.RunID, resumeState []byte) ([]byte, error) } -// routingCh represents a set of unidirectional channels used by the routing subsystem. -// There is a routing block for each TestStep of the pipeline, which is responsible for -// the following actions: -// * Targets in egress from the previous routing block are injected into the -// current TestStep -// * Targets in egress from the current TestStep are injected into the next -// routing block -type routingCh struct { - // routeIn and routeOut connect the routing block to other routing blocks - routeIn <-chan *target.Target - routeOut chan<- *target.Target - // Channels that connect the routing block to the TestStep - stepIn chan<- *target.Target - stepOut <-chan *target.Target - stepErr <-chan cerrors.TargetError - // targetErr connects the routing block directly to the TestRunner. Failing - // targets are acquired by the TestRunner via this channel - targetErr chan<- cerrors.TargetError +// testRunner is the state associated with a test run. +// Here's how a test run works: +// * Each target gets a targetState and a "target runner" - a goroutine that takes that particular +// target through each step of the pipeline in sequence. It injects the target, waits for the result, +// then mves on to the next step. +// * Each step of the pipeline gets a stepState and: +// - A "step runner" - a goroutine that is responsible for running the step's Run() method +// - A "step reader" - a goroutine that processes results and sends them on to target runners that await them. +// * After starting all of the above, the main goroutine goes into "monitor" mode +// that checks on the pipeline's progress and is responsible for closing step input channels +// when all the targets have been injected. +// * Monitor loop finishes when all the targets have been injected into the last step +// or if a step has encountered and error. +// * We then wait for all the step runners and readers to shut down. +// * Once all the activity has died down, resulting state is examined and an error is returned, if any. +type testRunner struct { + stepInjectTimeout time.Duration // Time to wait for steps to accept each target + shutdownTimeout time.Duration // Time to wait for steps runners to finish a the end of the run + + steps []*stepState // The pipeline, in order of execution + targets map[string]*targetState // Target state lookup map + targetsWg sync.WaitGroup // Tracks all the target runners + log *logrus.Entry // Logger + + // One mutex to rule them all, used to serialize access to all the state above. + // Could probably be split into several if necessary. + mu sync.Mutex + cond *sync.Cond // Used to notify the monitor about changes } -// stepCh represents a set of bidirectional channels that a TestStep and its associated -// routing block use to communicate. The TestRunner forces the direction of each -// channel when connecting the TestStep to the routing block. -type stepCh struct { - stepIn chan *target.Target - stepOut chan *target.Target - stepErr chan cerrors.TargetError -} +// stepState contains state associated with one state of the pipeline: +type stepState struct { + stepIndex int // Index of this step in the pipeline. + sb test.TestStepBundle // The test bundle. -type injectionCh struct { - stepIn chan<- *target.Target - resultCh chan<- injectionResult -} + // Channels used to communicate with the plugin. + inCh chan *target.Target + outCh chan *target.Target + errCh chan cerrors.TargetError + ev testevent.Emitter -// injectionResult represents the result of an injection goroutine -type injectionResult struct { - target *target.Target - err error -} + numInjected int // Number of targets injected. + tgtDone map[*target.Target]bool // Targets for which results have been received. -// routeResult represents the result of routing block, possibly carrying error information -type routeResult struct { - bundle test.TestStepBundle - err error -} + stepRunning bool // testStep.Run() is currently running. + readerRunning bool // Result reader is running. + runErr error // Runner error, returned from Run() or an error condition detected by the reader. -// stepResult represents the result of a TestStep, possibly carrying error information -type stepResult struct { - jobID types.JobID - runID types.RunID - bundle test.TestStepBundle - err error + log *logrus.Entry // Logger } -// pipelineCtrlCh represents multiple result and control channels that the pipeline uses -// to collect results from routing blocks, steps and target completing the test and to -// signa cancellation to various pipeline subsystems -type pipelineCtrlCh struct { - routingResultCh <-chan routeResult - stepResultCh <-chan stepResult - targetOut <-chan *target.Target - targetErr <-chan cerrors.TargetError - - // ctx is a control context used to cancel/pause the steps of the pipeline - ctx statectx.Context - pause func() - cancel func() +// targetStepPhase denotes progression of a target through a step +type targetStepPhase int + +const ( + targetStepPhaseInit targetStepPhase = 0 + targetStepPhaseBegin targetStepPhase = 1 // Picked up for execution. + targetStepPhaseRun targetStepPhase = 2 // Injected into step. + targetStepPhaseEnd targetStepPhase = 3 // Finished running a step. +) + +// targetState contains state associated with one target progressing through the pipeline. +type targetState struct { + tgt *target.Target + + // This part of state gets serialized into JSON for resumption. + CurStep int `json:"cur_step"` // Current step number. + CurPhase targetStepPhase `json:"cur_phase"` // Current phase of step execution. + + res error // Final result, if reached the end state. + resCh chan error // Channel used to communicate result by the step runner. } -// TestRunner is the main runner of TestSteps in ConTest. `results` collects -// the results of the run. It is not safe to access `results` concurrently. -type TestRunner struct { - timeouts TestRunnerTimeouts +// resumeStateStruct is used to serialize runner state to be resumed in the future. +type resumeStateStruct struct { + Version int `json:"version"` + JobID types.JobID `json:"job_id"` + RunID types.RunID `json:"run_id"` + Targets map[string]*targetState `json:"targets"` } -// targetWriter is a helper object which exposes methods to write targets into step channels -type targetWriter struct { - log *logrus.Entry - timeouts TestRunnerTimeouts +// Resume state version we are compatible with. +// When imcompatible changes are made to the state format, bump this. +// Restoring incompatible state will abort the job. +const resumeStateStructVersion = 1 + +// Run is the main enty point of the runner. +func (tr *testRunner) Run( + ctx statectx.Context, + t *test.Test, targets []*target.Target, + jobID types.JobID, runID types.RunID, + resumeState []byte) ([]byte, error) { + + // Set up logger + rootLog := logging.GetLogger("pkg/runner") + fields := make(map[string]interface{}) + fields["jobid"] = jobID + fields["runid"] = runID + rootLog = logging.AddFields(rootLog, fields) + tr.log = logging.AddField(rootLog, "phase", "run") + + tr.log.Debugf("== test runner starting job %d, run %d", jobID, runID) + resumeState, err := tr.run(ctx, t, targets, jobID, runID, resumeState) + tr.log.Debugf("== test runner finished job %d, run %d, err: %v", jobID, runID, err) + return resumeState, err } -func (w *targetWriter) writeTimeout(ctx context.Context, ch chan<- *target.Target, target *target.Target, timeout time.Duration) error { - w.log.Debugf("writing target %+v, timeout %v", target, timeout) - start := time.Now() +func (tr *testRunner) run( + ctx statectx.Context, + t *test.Test, targets []*target.Target, + jobID types.JobID, runID types.RunID, + resumeState []byte) ([]byte, error) { + + // Peel off contexts used for steps and target handlers. + stepCtx, _, stepCancel := statectx.WithParent(ctx) + defer stepCancel() + targetCtx, _, targetCancel := statectx.WithParent(ctx) + defer targetCancel() + + // Set up the pipeline + for i, sb := range t.TestStepsBundles { + tr.steps = append(tr.steps, &stepState{ + stepIndex: i, + sb: sb, + inCh: make(chan *target.Target), + outCh: make(chan *target.Target), + errCh: make(chan cerrors.TargetError), + ev: storage.NewTestEventEmitter(testevent.Header{ + JobID: jobID, + RunID: runID, + TestName: t.Name, + TestStepLabel: sb.TestStepLabel, + }), + tgtDone: make(map[*target.Target]bool), + log: logging.AddField(tr.log, "step", sb.TestStepLabel), + }) + } + + // Set up the targets + tr.targets = make(map[string]*targetState) + // If we have target state to resume, do it now. + if len(resumeState) > 0 { + tr.log.Debugf("Attempting to resume from state: %s", string(resumeState)) + var rs resumeStateStruct + if err := json.Unmarshal(resumeState, &rs); err != nil { + return nil, fmt.Errorf("invalid resume state: %w", err) + } + if rs.Version != resumeStateStructVersion { + return nil, fmt.Errorf("incompatible resume state version %d (want %d)", + rs.Version, resumeStateStructVersion) + } + if rs.JobID != jobID { + return nil, fmt.Errorf("wrong resume state, job id %d (want %d)", rs.JobID, jobID) + } + tr.targets = rs.Targets + } + // Initialize remaining fields of the target structures, + // build the map and kick off target processing. + for _, tgt := range targets { + ts := tr.targets[tgt.ID] + if ts == nil { + ts = &targetState{} + } + ts.tgt = tgt + ts.resCh = make(chan error) + tr.mu.Lock() + tr.targets[tgt.ID] = ts + tr.mu.Unlock() + tr.targetsWg.Add(1) + go tr.targetRunner(targetCtx, stepCtx, ts) + } + + // Run until no more progress can be made. + runErr := tr.runMonitor() + if runErr != nil { + tr.log.Errorf("monitor returned error: %q, canceling", runErr) + stepCancel() + } + + // Wait for step runners and readers to exit. + if err := tr.waitStepRunners(ctx); err != nil { + tr.log.Errorf("step runner error: %q, canceling", err) + stepCancel() + if runErr == nil { + runErr = err + } + } + // There will be no more results, reel in all the target runners (if any). + tr.log.Debugf("waiting for target runners to finish") + targetCancel() + tr.targetsWg.Wait() + + // Examine the resulting state. + tr.log.Debugf("leaving, err %v, target states:", runErr) + tr.mu.Lock() + defer tr.mu.Unlock() + resumeOk := (runErr == nil) + var inFlightTargets []*targetState + for i, tgt := range targets { + ts := tr.targets[tgt.ID] + stepErr := tr.steps[ts.CurStep].runErr + tr.log.Debugf(" %d %s %v", i, ts, stepErr) + if ts.CurPhase == targetStepPhaseRun { + inFlightTargets = append(inFlightTargets, ts) + if stepErr != statectx.ErrPaused { + resumeOk = false + } + } + } + tr.log.Debugf("- %d in flight, ok to resume? %t", len(inFlightTargets), resumeOk) + tr.log.Debugf("step states:") + for i, ss := range tr.steps { + tr.log.Debugf(" %d %s %t %t %v", i, ss, ss.stepRunning, ss.readerRunning, ss.runErr) + } + + // Is there a useful error to report? + if runErr != nil { + return nil, runErr + } + + // Has the run been canceled? select { case <-ctx.Done(): - w.log.Debugf("terminate requested while writing target %+v", target) - case ch <- target: - case <-time.After(timeout): - return fmt.Errorf("timeout (%v) while writing target %+v", timeout, target) + return nil, statectx.ErrCanceled + default: } - w.log.Debugf("done writing target %+v, spent %v", target, time.Since(start)) - return nil -} -// writeTargetWithResult attempts to deliver a Target on the input channel of a step, -// returning the result of the operation on the result channel wrapped in the -// injectionCh argument -func (w *targetWriter) writeTargetWithResult(ctx context.Context, target *target.Target, ch injectionCh) { - err := w.writeTimeout(ctx, ch.stepIn, target, w.timeouts.StepInjectTimeout) + // Have we been asked to pause? If yes, is it safe to do so? select { - case <-ctx.Done(): - case ch.resultCh <- injectionResult{target: target, err: err}: - case <-time.After(w.timeouts.MessageTimeout): - w.log.Panicf("timeout while writing result for target %+v after %v", target, w.timeouts.MessageTimeout) + case <-ctx.Paused(): + if !resumeOk { + tr.log.Warningf("paused but not ok to resume") + break + } + rs := &resumeStateStruct{ + Version: resumeStateStructVersion, + JobID: jobID, RunID: runID, + Targets: tr.targets, + } + resumeState, runErr = json.Marshal(rs) + if runErr != nil { + tr.log.Errorf("unable to serialize the state: %s", runErr) + } else { + runErr = statectx.ErrPaused + } + default: + // We are not pausing and yet some targets were left in flight. + if len(inFlightTargets) > 0 { + ts := inFlightTargets[0] + runErr = &cerrors.ErrTestStepLostTargets{ + StepName: tr.steps[ts.CurStep].sb.TestStepLabel, + Target: ts.tgt.ID, + } + } } + + return resumeState, runErr } -// writeTargetError writes a TargetError object to a TargetError channel with timeout -func (w *targetWriter) writeTargetError(ctx context.Context, ch chan<- cerrors.TargetError, targetError cerrors.TargetError, timeout time.Duration) error { +func (tr *testRunner) waitStepRunners(ctx statectx.Context) error { + tr.log.Debugf("waiting for step runners to finish") + swch := make(chan struct{}) + go func() { + tr.mu.Lock() + defer tr.mu.Unlock() + for { + ok := true + for _, ss := range tr.steps { + // numRunning == 1 is also acceptable: we allow the Run() goroutine + // to continue in case of error, if the result processor decided + // to abandon its runner, there's nothing we can do. + switch { + case !ss.stepRunning && !ss.readerRunning: + // Done + case ss.stepRunning && ss.readerRunning: + // Still active + ok = false + case !ss.stepRunning && ss.readerRunning: + // Transient state, let it finish + ok = false + case ss.stepRunning && !ss.readerRunning: + // This is possible if plugin got stuck and result processor gave up on it. + // If so, it should have left an error. + if ss.runErr == nil { + tr.log.Errorf("%s: result processor left runner with no error", ss) + // There's nothing we can do at this point, fall through. + } + } + } + if ok { + close(swch) + return + } + tr.cond.Wait() + } + }() select { - case <-ctx.Done(): - case ch <- targetError: - case <-time.After(timeout): - return fmt.Errorf("timeout while writing targetError %+v", targetError) + case <-swch: + tr.log.Debugf("step runners finished") + tr.mu.Lock() + defer tr.mu.Unlock() + return tr.checkStepRunners() + case <-time.After(tr.shutdownTimeout): + tr.log.Errorf("step runners failed to shut down correctly") + // If there is a step with an error set, use that. + if err := tr.checkStepRunners(); err != nil { + return err + } + // If there isn't, enumerate ones that were still running at the time. + err := &cerrors.ErrTestStepsNeverReturned{} + tr.mu.Lock() + defer tr.mu.Unlock() + for _, ss := range tr.steps { + if ss.stepRunning { + err.StepNames = append(err.StepNames, ss.sb.TestStepLabel) + } + } + return err } - return nil } -func newTargetWriter(log *logrus.Entry, timeouts TestRunnerTimeouts) *targetWriter { - return &targetWriter{log: log, timeouts: timeouts} +// targetRunner runs one target through all the steps of the pipeline. +func (tr *testRunner) targetRunner(ctx, stepCtx statectx.Context, ts *targetState) { + log := logging.AddField(tr.log, "target", ts.tgt.ID) + log.Debugf("%s: target runner active", ts) + // NB: CurStep may be non-zero on entry if resumed +loop: + for i := ts.CurStep; i < len(tr.steps); { + // Early check for pause of cancelation. + select { + case <-ctx.Paused(): + log.Debugf("%s: paused 0", ts) + break loop + case <-ctx.Done(): + log.Debugf("%s: canceled 0", ts) + break loop + default: + } + tr.mu.Lock() + ss := tr.steps[i] + if ts.CurPhase == targetStepPhaseEnd { + // This target already terminated. + // Can happen if resumed from terminal state. + tr.mu.Unlock() + break loop + } + ts.CurStep = i + ts.CurPhase = targetStepPhaseBegin + tr.mu.Unlock() + // Make sure we have a step runner active. + // These are started on-demand. + tr.runStepIfNeeded(stepCtx, ss) + // Inject the target. + log.Debugf("%s: injecting into %s", ts, ss) + select { + case ss.inCh <- ts.tgt: + // Injected successfully. + tr.mu.Lock() + ts.CurPhase = targetStepPhaseRun + ss.numInjected++ + tr.mu.Unlock() + tr.cond.Signal() + case <-time.After(tr.stepInjectTimeout): + tr.mu.Lock() + ss.log.Errorf("timed out while injecting a target") + ss.runErr = &cerrors.ErrTestTargetInjectionTimedOut{StepName: ss.sb.TestStepLabel} + tr.mu.Unlock() + break loop + case <-ctx.Done(): + log.Debugf("%s: canceled 1", ts) + break loop + } + // Await result. It will be communicated to us by the step runner. + select { + case res, ok := <-ts.resCh: + if !ok { + log.Debugf("%s: result channel closed", ts) + break loop + } + log.Debugf("%s: result for %s recd", ts, ss) + tr.mu.Lock() + ts.CurPhase = targetStepPhaseEnd + ts.res = res + tr.cond.Signal() + if res != nil { + tr.mu.Unlock() + break loop + } + i++ + if i < len(tr.steps) { + ts.CurStep = i + ts.CurPhase = targetStepPhaseInit + } + tr.mu.Unlock() + // Check for cancellation. + // Notably we are not checking for the pause condition here: + // when paused, we want to let all the injected targets to finish + // and collect all the results they produce. If that doesn't happen, + // step runner will close resCh on its way out and unlock us. + case <-ctx.Done(): + log.Debugf("%s: canceled 2", ts) + break loop + } + } + log.Debugf("%s: target runner finished", ts) + tr.mu.Lock() + ts.resCh = nil + tr.cond.Signal() + tr.mu.Unlock() + tr.targetsWg.Done() } -// Run implements the main logic of the TestRunner, i.e. the instantiation and -// connection of the TestSteps, routing blocks and pipeline runner. -func (tr *TestRunner) Run(ctx statectx.Context, test *test.Test, targets []*target.Target, jobID types.JobID, runID types.RunID) error { - - if len(test.TestStepsBundles) == 0 { - return fmt.Errorf("no steps to run for test") +// runStepIfNeeded starts the step runner goroutine if not already running. +func (tr *testRunner) runStepIfNeeded(ctx statectx.Context, ss *stepState) { + tr.mu.Lock() + defer tr.mu.Unlock() + if ss.stepRunning { + return + } + if ss.runErr != nil { + return } + ss.stepRunning = true + ss.readerRunning = true + go tr.stepRunner(ctx, ss) + go tr.stepReader(ctx, ss) +} - // rootLog is propagated to all the subsystems of the pipeline - rootLog := logging.GetLogger("pkg/runner") - fields := make(map[string]interface{}) - fields["jobid"] = jobID - fields["runid"] = runID - rootLog = logging.AddFields(rootLog, fields) +// emitStepEvent emits an error event if step resulted in an error. +func (ss *stepState) emitStepEvent(tgt *target.Target, err error) error { + if err == nil { + return nil + } + payload, jmErr := json.Marshal(err.Error()) + if jmErr != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + rm := json.RawMessage(payload) + errEv := testevent.Data{ + EventName: EventTestError, + Target: tgt, + Payload: &rm, + } + return ss.ev.Emit(errEv) +} - log := logging.AddField(rootLog, "phase", "run") - testPipeline := newPipeline(logging.AddField(rootLog, "entity", "test_pipeline"), test.TestStepsBundles, test, jobID, runID, tr.timeouts) - - log.Infof("setting up pipeline") - completedTargets := make(chan *target.Target, 1) - inCh := testPipeline.init(ctx) - - // inject targets in the step - terminateInjectionCtx, terminateInjection := context.WithCancel(context.Background()) - go func(ctx context.Context, inputChannel chan<- *target.Target) { - defer close(inputChannel) - log := logging.AddField(log, "step", "injection") - writer := newTargetWriter(log, tr.timeouts) - for _, t := range targets { - if err := writer.writeTimeout(ctx, inputChannel, t, tr.timeouts.MessageTimeout); err != nil { - log.Debugf("could not inject target %+v into first routing block: %+v", t, err) +// stepRunner runs a test pipeline's step (the Run() method). +func (tr *testRunner) stepRunner(ctx statectx.Context, ss *stepState) { + ss.log.Debugf("%s: step runner active", ss) + defer func() { + if r := recover(); r != nil { + tr.mu.Lock() + ss.stepRunning = false + ss.runErr = &cerrors.ErrTestStepPaniced{ + StepName: ss.sb.TestStepLabel, + StackTrace: string(debug.Stack()), } + tr.mu.Unlock() + tr.safeCloseOutCh(ss) } - }(terminateInjectionCtx, inCh) - - errCh := make(chan error, 1) - go func() { - log.Infof("running pipeline") - errCh <- testPipeline.run(completedTargets) }() - - defer terminateInjection() - // Receive targets from the completed channel controlled by the pipeline, while - // waiting for termination signals or fatal errors encountered while running - // the pipeline. - for { - select { - case err := <-errCh: - log.Debugf("test runner terminated, returning %v", err) - return err - case t := <-completedTargets: - log.Infof("test runner completed target: %v", t) + chans := test.TestStepChannels{In: ss.inCh, Out: ss.outCh, Err: ss.errCh} + runErr := ss.sb.TestStep.Run(ctx, chans, ss.sb.Parameters, ss.ev) + if err := ss.emitStepEvent(nil, runErr); err != nil { + if ss.runErr == nil { + ss.runErr = err } } + tr.mu.Lock() + ss.stepRunning = false + if runErr != nil { + ss.runErr = runErr + } + tr.mu.Unlock() + // Signal to the result processor that no more will be coming. + tr.safeCloseOutCh(ss) + ss.log.Debugf("%s: step runner finished", ss) } -// NewTestRunner initializes and returns a new TestRunner object. This test -// runner will use default timeout values -func NewTestRunner() TestRunner { - return TestRunner{ - timeouts: TestRunnerTimeouts{ - StepInjectTimeout: config.StepInjectTimeout, - MessageTimeout: config.TestRunnerMsgTimeout, - ShutdownTimeout: config.TestRunnerShutdownTimeout, - StepShutdownTimeout: config.TestRunnerStepShutdownTimeout, - }, +// reportTargetResult reports result of executing a step to the appropriate target runner. +func (tr *testRunner) reportTargetResult(ctx statectx.Context, ss *stepState, tgt *target.Target, res error) error { + resCh, err := func() (chan error, error) { + tr.mu.Lock() + defer tr.mu.Unlock() + ts := tr.targets[tgt.ID] + if ts == nil { + return nil, fmt.Errorf("%s: result for nonexistent target %s %v", ss, tgt, res) + } + if ss.tgtDone[tgt] { + return nil, &cerrors.ErrTestStepReturnedDuplicateResult{ + StepName: ss.sb.TestStepLabel, + Target: tgt.ID, + } + } + ss.tgtDone[tgt] = true + // Begin is also allowed here because it may happen that we get a result before target runner updates phase. + if ts.CurStep != ss.stepIndex || (ts.CurPhase != targetStepPhaseBegin && ts.CurPhase != targetStepPhaseRun) { + return nil, &cerrors.ErrTestStepReturnedUnexpectedResult{ + StepName: ss.sb.TestStepLabel, + Target: tgt.ID, + } + } + if ts.resCh == nil { + // This should not happen, must be an internal error. + return nil, fmt.Errorf("%s: target runner %s is not there, dropping result on the floor", ss, ts) + } + ss.log.Debugf("%s: result for %s: %v", ss, ts, res) + return ts.resCh, nil + }() + if err != nil { + return err + } + if res != nil { + if err := ss.emitStepEvent(tgt, res); err != nil { + return err + } + } + select { + case resCh <- res: + break + case <-ctx.Done(): + break } + return nil } -// NewTestRunnerWithTimeouts initializes and returns a new TestRunner object with -// custom timeouts -func NewTestRunnerWithTimeouts(timeouts TestRunnerTimeouts) TestRunner { - return TestRunner{timeouts: timeouts} +func (tr *testRunner) safeCloseOutCh(ss *stepState) { + defer func() { + if r := recover(); r != nil { + tr.mu.Lock() + ss.runErr = &cerrors.ErrTestStepClosedChannels{StepName: ss.sb.TestStepLabel} + tr.mu.Unlock() + } + }() + close(ss.outCh) } -// State is a structure that models the current state of the test runner -type State struct { - completedSteps map[string]error - completedRouting map[string]error - completedTargets map[*target.Target]error +// safeCloseErrCh closes error channel safely, even if it has already been closed. +func (tr *testRunner) safeCloseErrCh(ss *stepState) { + defer func() { + if r := recover(); r != nil { + tr.mu.Lock() + ss.runErr = &cerrors.ErrTestStepClosedChannels{StepName: ss.sb.TestStepLabel} + tr.mu.Unlock() + } + }() + close(ss.errCh) } -// NewState initializes a State object. -func NewState() *State { - r := State{} - r.completedSteps = make(map[string]error) - r.completedRouting = make(map[string]error) - r.completedTargets = make(map[*target.Target]error) - return &r +// stepReader receives results from the step's output channel and forwards them to the appropriate target runners. +func (tr *testRunner) stepReader(ctx statectx.Context, ss *stepState) { + ss.log.Debugf("%s: step reader active", ss) + var err error + outCh := ss.outCh +loop: + for { + select { + case tgt, ok := <-outCh: + if !ok { + ss.log.Debugf("%s: out chan closed", ss) + // At this point we may still have an error to report, + // wait until error channel is emptied too. + outCh = make(chan *target.Target) + tr.safeCloseErrCh(ss) + break + } + if err = tr.reportTargetResult(ctx, ss, tgt, nil); err != nil { + break loop + } + case res, ok := <-ss.errCh: + if !ok { + ss.log.Debugf("%s: err chan closed", ss) + break loop + } + if err = tr.reportTargetResult(ctx, ss, res.Target, res.Err); err != nil { + break loop + } + case <-ctx.Done(): + ss.log.Debugf("%s: canceled 3, draining", ss) + for { + select { + case <-outCh: + break loop + case <-ss.errCh: + break loop + case <-time.After(tr.shutdownTimeout): + tr.mu.Lock() + if ss.runErr == nil { + ss.runErr = &cerrors.ErrTestStepsNeverReturned{} + } + tr.mu.Unlock() + } + } + } + } + tr.mu.Lock() + defer tr.mu.Unlock() + if ss.runErr == nil && err != nil { + ss.runErr = err + } + if ss.stepRunning && ss.runErr == nil { + // This means that plugin closed its channels before leaving. + ss.runErr = &cerrors.ErrTestStepClosedChannels{StepName: ss.sb.TestStepLabel} + } + ss.readerRunning = false + ss.log.Debugf("%s: step reader finished, %t %t %v", ss, ss.stepRunning, ss.readerRunning, ss.runErr) + tr.cond.Signal() } -// CompletedTargets returns a map that associates each target with its returning error. -// If the target succeeded, the error will be nil -func (r *State) CompletedTargets() map[*target.Target]error { - return r.completedTargets +// checkStepRunnersLocked checks if any step runner has encountered an error. +func (tr *testRunner) checkStepRunners() error { + for _, ss := range tr.steps { + if ss.runErr != nil { + return ss.runErr + } + } + return nil } -// CompletedRouting returns a map that associates each routing block with its returning error. -// If the routing block succeeded, the error will be nil -func (r *State) CompletedRouting() map[string]error { - return r.completedRouting +// runMonitor monitors progress of targets through the pipeline +// and closes input channels of the steps to indicate that no more are expected. +// It also monitors steps for critical errors and cancels the whole run. +// Note: input channels remain open when cancellation is requested, +// plugins are expected to handle it explicitly. +func (tr *testRunner) runMonitor() error { + tr.log.Debugf("monitor: active") + tr.mu.Lock() + defer tr.mu.Unlock() + // First, compute the starting step of the pipeline (it may be non-zero + // if the pipleine was resumed). + minStep := len(tr.steps) + for _, ts := range tr.targets { + if ts.CurStep < minStep { + minStep = ts.CurStep + } + } + if minStep < len(tr.steps) { + tr.log.Debugf("monitor: starting at step %s", tr.steps[minStep]) + } + + // Run the main loop. + pass := 1 + var runErr error +loop: + for step := minStep; step < len(tr.steps); pass++ { + ss := tr.steps[step] + tr.log.Debugf("monitor pass %d: current step %s", pass, ss) + // Check if all the targets have either made it past the injection phase or terminated. + ok := true + for _, ts := range tr.targets { + tr.log.Debugf("monitor pass %d: %s: %s", pass, ss, ts) + if ts.resCh == nil { // Not running anymore + continue + } + if ok && (ts.CurStep < step || ts.CurPhase < targetStepPhaseRun) { + tr.log.Debugf("monitor pass %d: %s: not all targets injected yet (%s)", pass, ss, ts) + ok = false + break + } + } + if runErr = tr.checkStepRunners(); runErr != nil { + break loop + } + if !ok { + // Wait for notification: as progress is being made, we get notified. + tr.cond.Wait() + continue + } + // All targets ok, close the step's input channel. + tr.log.Debugf("monitor pass %d: %s: no more targets, closing input channel", pass, ss) + close(ss.inCh) + step++ + } + tr.log.Debugf("monitor: finished, %v", runErr) + return runErr } -// CompletedSteps returns a map that associates each step with its returning error. -// If the step succeeded, the error will be nil -func (r *State) CompletedSteps() map[string]error { - return r.completedSteps +func NewTestRunnerWithTimeouts(stepInjectTimeout, shutdownTimeout time.Duration) TestRunner { + tr := &testRunner{ + stepInjectTimeout: stepInjectTimeout, + shutdownTimeout: shutdownTimeout, + } + tr.cond = sync.NewCond(&tr.mu) + return tr } -// SetRouting sets the error associated with a routing block -func (r *State) SetRouting(testStepLabel string, err error) { - r.completedRouting[testStepLabel] = err +func NewTestRunner() TestRunner { + return NewTestRunnerWithTimeouts(config.StepInjectTimeout, config.TestRunnerShutdownTimeout) } -// SetTarget sets the error associated with a target -func (r *State) SetTarget(target *target.Target, err error) { - r.completedTargets[target] = err +func (tph targetStepPhase) String() string { + switch tph { + case targetStepPhaseInit: + return "init" + case targetStepPhaseBegin: + return "begin" + case targetStepPhaseRun: + return "run" + case targetStepPhaseEnd: + return "end" + } + return fmt.Sprintf("???(%d)", tph) } -// SetStep sets the error associated with a step -func (r *State) SetStep(testStepLabel string, err error) { - r.completedSteps[testStepLabel] = err +func (ss *stepState) String() string { + return fmt.Sprintf("[#%d %s]", ss.stepIndex, ss.sb.TestStepLabel) } -// IncompleteSteps returns a slice of step names for which the result hasn't been set yet -func (r *State) IncompleteSteps(bundles []test.TestStepBundle) []string { - var incompleteSteps []string - for _, bundle := range bundles { - if _, ok := r.completedSteps[bundle.TestStepLabel]; !ok { - incompleteSteps = append(incompleteSteps, bundle.TestStepLabel) +func (ts *targetState) String() string { + var resText string + if ts.res != nil { + resStr := fmt.Sprintf("%s", ts.res) + if len(resStr) > 20 { + resStr = resStr[:20] + "..." } + resText = fmt.Sprintf("%q", resStr) + } else { + resText = "" } - return incompleteSteps + finished := ts.resCh == nil + return fmt.Sprintf("[%s %d %s %t %s]", + ts.tgt, ts.CurStep, ts.CurPhase, finished, resText) } diff --git a/pkg/runner/test_runner_pipeline.go b/pkg/runner/test_runner_pipeline.go deleted file mode 100644 index bfdaa435..00000000 --- a/pkg/runner/test_runner_pipeline.go +++ /dev/null @@ -1,542 +0,0 @@ -// Copyright (c) Facebook, Inc. and its affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -package runner - -import ( - "context" - "encoding/json" - "fmt" - "runtime/debug" - "sync/atomic" - "time" - - "github.com/facebookincubator/contest/pkg/cerrors" - "github.com/facebookincubator/contest/pkg/event/testevent" - "github.com/facebookincubator/contest/pkg/logging" - "github.com/facebookincubator/contest/pkg/statectx" - "github.com/facebookincubator/contest/pkg/storage" - "github.com/facebookincubator/contest/pkg/target" - "github.com/facebookincubator/contest/pkg/test" - "github.com/facebookincubator/contest/pkg/types" - "github.com/sirupsen/logrus" -) - -// pipeline represents a sequence of steps through which targets flow. A pipeline could implement -// either a test sequence or a cleanup sequence -type pipeline struct { - log *logrus.Entry - - bundles []test.TestStepBundle - - jobID types.JobID - runID types.RunID - - state *State - test *test.Test - - timeouts TestRunnerTimeouts - - // ctrlChannels represents a set of result and completion channels for this pipeline, - // used to collect the results of routing blocks, steps and targets completing - // the pipeline. It's available only after having initialized the pipeline - ctrlChannels *pipelineCtrlCh - - // numIngress represents the total number of targets that have been seen entering - // the pipeline. This number is set by the first routing block as soon as the injection - // terminates - numIngress uint64 -} - -// runStep runs synchronously a TestStep and peforms sanity checks on the status -// of the input/output channels on the defer control path. When the TestStep returns, -// the associated output channels are closed. This signals to the routing subsytem -// that this step of the pipeline will not be producing any more targets. The API -// dictates that the TestStep can only return if its input channel has been closed, -// which indicates that no more targets will be submitted to that step. If the -// TestStep does not comply with this rule, an error is sent downstream to the pipeline -// runner, which will in turn shutdown the whole pipeline and flag the TestStep -// as misbehaving. All attempts to send downstream a result after a TestStep complete -// are protected by a timeout and fail open. This because it's not guaranteed that -// the TestRunner will still be listening on the result channels. If the TestStep hangs -// indefinitely and does not respond to cancellation signals, the TestRunner will -// flag it as misbehaving and return. If the TestStep returns once the TestRunner -// has completed, it will timeout trying to write on the result channel. -func (p *pipeline) runStep(ctx statectx.Context, jobID types.JobID, runID types.RunID, bundle test.TestStepBundle, stepCh stepCh, resultCh chan<- stepResult, ev testevent.EmitterFetcher) { - - stepLabel := bundle.TestStepLabel - log := logging.AddField(p.log, "step", stepLabel) - log = logging.AddField(log, "phase", "runStep") - - log.Debugf("initializing step") - timeout := p.timeouts.MessageTimeout - defer func() { - if r := recover(); r != nil { - err := fmt.Errorf("step %s paniced (%v): %s", stepLabel, r, debug.Stack()) - select { - case resultCh <- stepResult{jobID: jobID, runID: runID, bundle: bundle, err: err}: - case <-time.After(p.timeouts.MessageTimeout): - log.Warningf("sending error back from test step runner timed out after %v: %v", timeout, err) - return - } - return - } - }() - - // We should not run a test step if there're no targets in stepCh.stepIn - // First we check if there's at least one incoming target, and only - // then we call `bundle.TestStep.Run()`. - // - // ITS: https://github.com/facebookincubator/contest/issues/101 - stepIn, onFirstTargetChan, onNoTargetsChan := waitForFirstTarget(ctx.PausedOrDoneCtx(), stepCh.stepIn) - - haveTargets := false - select { - case <-onFirstTargetChan: - log.Debugf("first target received, will start step") - haveTargets = true - case <-ctx.Done(): - log.Debugf("cancelled") - case <-ctx.Paused(): - log.Debugf("paused") - case <-onNoTargetsChan: - log.Debugf("no targets") - } - - var err error - if haveTargets { - // Run the TestStep and defer to the return path the handling of panic - // conditions. If multiple error conditions occur, send downstream only - // the first error encountered. - channels := test.TestStepChannels{ - In: stepIn, - Out: stepCh.stepOut, - Err: stepCh.stepErr, - } - err = bundle.TestStep.Run(ctx, channels, bundle.Parameters, ev) - } - - log.Debugf("step %s returned", bundle.TestStepLabel) - - // Check if we are shutting down. If so, do not perform sanity checks on the - // channels but return immediately as the TestStep itself probably returned - // because it honored the termination signal. - - cancellationAsserted := ctx.PausedOrDoneCtx().Err() == statectx.ErrCanceled - pauseAsserted := ctx.PausedOrDoneCtx().Err() == statectx.ErrPaused - - if cancellationAsserted && err == nil { - err = fmt.Errorf("test step cancelled") - } - - if cancellationAsserted || pauseAsserted { - select { - case resultCh <- stepResult{jobID: jobID, runID: runID, bundle: bundle, err: err}: - case <-time.After(timeout): - log.Warningf("sending error back from TestStep runner timed out after %v: %v", timeout, err) - } - return - } - - // If the TestStep has crashed with an error, return immediately the result to the TestRunner - // which in turn will issue a cancellation signal to the pipeline - if err != nil { - select { - case resultCh <- stepResult{jobID: jobID, runID: runID, bundle: bundle, err: err}: - case <-time.After(timeout): - log.Warningf("sending error back from test step runner (%s) timed out after %v: %v", stepLabel, timeout, err) - } - return - } - - // Perform sanity checks on the status of the channels. The TestStep API - // mandates that output and error channels shall not be closed by the - // TestStep itself. If the TestStep does not comply with the API, it is - // flagged as misbehaving. - select { - case _, ok := <-stepCh.stepIn: - if !ok { - break - } - // stepCh.stepIn is not closed, but the TestStep returned, which is a violation - // of the API. Record the error if no other error condition has been seen. - err = fmt.Errorf("step %s returned, but input channel is not closed (api violation; case 0)", stepLabel) - - default: - // stepCh.stepIn is not closed, and a read operation would block. The TestStep - // does not comply with the API (see above). - err = fmt.Errorf("step %s returned, but input channel is not closed (api violation; case 1)", stepLabel) - } - - select { - case _, ok := <-stepCh.stepOut: - if !ok { - // stepOutCh has been closed. This is a violation of the API. Record the error - // if no other error condition has been seen. - if err == nil { - err = &cerrors.ErrTestStepClosedChannels{StepName: stepLabel} - } - } - default: - // stepCh.stepOut is open. Flag it for closure to signal to the routing subsystem that - // no more Targets will go through from this channel. - close(stepCh.stepOut) - } - - select { - case _, ok := <-stepCh.stepErr: - if !ok { - // stepErrCh has been closed. This is a violation of the API. Record the error - // if no other error condition has been seen. - if err == nil { - err = &cerrors.ErrTestStepClosedChannels{StepName: stepLabel} - } - } - default: - // stepCh.stepErr is open. Flag it for closure to signal to the routing subsystem that - // no more Targets will go through from this channel. - close(stepCh.stepErr) - } - - select { - case resultCh <- stepResult{jobID: jobID, runID: runID, bundle: bundle, err: err}: - case <-time.After(timeout): - log.Warningf("sending error back from step runner (%s) timed out after %v: %v", stepLabel, timeout, err) - return - } -} - -// waitTargets reads results coming from results channels until all Targets -// have completed or an error occurs. If all Targets complete successfully, it checks -// whether TestSteps and routing blocks have completed as well. If not, returns an -// error. Termination is signalled via terminate channel. -func (p *pipeline) waitTargets(ctx context.Context, completedCh chan<- *target.Target) error { - - log := logging.AddField(p.log, "phase", "waitTargets") - - var ( - err error - completedTarget *target.Target - completedTargetError error - ) - - outChannel := p.ctrlChannels.targetOut - - writer := newTargetWriter(log, p.timeouts) - for { - select { - case t, ok := <-outChannel: - if !ok { - log.Debugf("pipeline output channel was closed, no more targets will come through") - outChannel = nil - break - } - completedTarget = t - case <-ctx.Done(): - // When termination is signaled just stop wait. It is up - // to the caller to decide how to further handle pipeline termination. - log.Debugf("termination requested") - return nil - case res := <-p.ctrlChannels.routingResultCh: - err = res.err - p.state.SetRouting(res.bundle.TestStepLabel, res.err) - case res := <-p.ctrlChannels.stepResultCh: - err = res.err - if err != nil { - payload, jmErr := json.Marshal(err.Error()) - if jmErr != nil { - log.Warningf("failed to marshal error string to JSON: %v", jmErr) - continue - } - rm := json.RawMessage(payload) - header := testevent.Header{ - JobID: res.jobID, - RunID: res.runID, - TestName: p.test.Name, - TestStepLabel: res.bundle.TestStepLabel, - } - ev := storage.NewTestEventEmitterFetcher(header) - // this event is not associated to any target, e.g. a plugin has returned an error. - errEv := testevent.Data{EventName: EventTestError, Target: nil, Payload: &rm} - // emit test event containing the completion error - if err := ev.Emit(errEv); err != nil { - log.Warningf("could not emit completion error event %v", errEv) - } - } - p.state.SetStep(res.bundle.TestStepLabel, res.err) - - case targetErr := <-p.ctrlChannels.targetErr: - completedTarget = targetErr.Target - completedTargetError = targetErr.Err - } - - if err != nil { - return err - } - - if outChannel == nil { - log.Debugf("no more targets to wait, output channel is closed") - break - } - - if completedTarget != nil { - p.state.SetTarget(completedTarget, completedTargetError) - log.Debugf("writing target %+v on the completed channel", completedTarget) - if err := writer.writeTimeout(ctx, completedCh, completedTarget, p.timeouts.MessageTimeout); err != nil { - log.Panicf("could not write completed target: %v", err) - } - completedTarget = nil - completedTargetError = nil - } - - numIngress := atomic.LoadUint64(&p.numIngress) - numCompleted := uint64(len(p.state.CompletedTargets())) - log.Debugf("targets completed: %d, expected: %d", numCompleted, numIngress) - if numIngress != 0 && numCompleted == numIngress { - log.Debugf("no more targets to wait, all targets (%d) completed", numCompleted) - break - } - } - // The test run completed, we have collected all Targets. TestSteps might have already - // closed `ch.out`, in which case the pipeline terminated correctly (channels are closed - // in a "domino" sequence, so seeing the last channel closed indicates that the - // sequence of close operations has completed). If `ch.out` is still open, - // there are still TestSteps that might have not returned. Wait for all - // TestSteps to complete or `StepShutdownTimeout` to occur. - log.Infof("waiting for all steps to complete") - return p.waitSteps() -} - -// waitTermination reads results coming from result channels waiting -// for the pipeline to completely shutdown before `ShutdownTimeout` occurs. A -// "complete shutdown" means that all TestSteps and routing blocks have sent -// downstream their results. -func (p *pipeline) waitTermination() error { - - if len(p.bundles) == 0 { - return fmt.Errorf("no bundles specified for waitTermination") - } - - log := logging.AddField(p.log, "phase", "waitTermination") - log.Printf("waiting for pipeline to terminate") - - for { - - log.Debugf("steps completed: %d", len(p.state.CompletedSteps())) - log.Debugf("routing completed: %d", len(p.state.CompletedRouting())) - stepsCompleted := len(p.state.CompletedSteps()) == len(p.bundles) - routingCompleted := len(p.state.CompletedRouting()) == len(p.bundles) - if stepsCompleted && routingCompleted { - return nil - } - - select { - case <-time.After(p.timeouts.ShutdownTimeout): - incompleteSteps := p.state.IncompleteSteps(p.bundles) - if len(incompleteSteps) > 0 { - return &cerrors.ErrTestStepsNeverReturned{StepNames: incompleteSteps} - } - return fmt.Errorf("pipeline did not return but all test steps completed") - case res := <-p.ctrlChannels.routingResultCh: - p.state.SetRouting(res.bundle.TestStepLabel, res.err) - case res := <-p.ctrlChannels.stepResultCh: - p.state.SetStep(res.bundle.TestStepLabel, res.err) - } - } -} - -// waitSteps reads results coming from result channels until `StepShutdownTimeout` -// occurs or an error is encountered. It then checks whether TestSteps and routing -// blocks have all returned correctly. If not, it returns an error. -func (p *pipeline) waitSteps() error { - - if len(p.bundles) == 0 { - return fmt.Errorf("no bundles specified for waitSteps") - } - - log := logging.AddField(p.log, "phase", "waitSteps") - - var err error - - log.Debugf("waiting for test steps to terminate") - for { - select { - case <-time.After(p.timeouts.StepShutdownTimeout): - log.Warningf("timed out waiting for steps to complete after %v", p.timeouts.StepShutdownTimeout) - incompleteSteps := p.state.IncompleteSteps(p.bundles) - if len(incompleteSteps) > 0 { - err = &cerrors.ErrTestStepsNeverReturned{StepNames: incompleteSteps} - break - } - if len(p.state.CompletedRouting()) != len(p.bundles) { - err = fmt.Errorf("not all routing completed: %d!=%d", len(p.state.CompletedRouting()), len(p.bundles)) - } - case res := <-p.ctrlChannels.routingResultCh: - log.Debugf("received routing block result for %s", res.bundle.TestStepLabel) - p.state.SetRouting(res.bundle.TestStepLabel, res.err) - err = res.err - case res := <-p.ctrlChannels.stepResultCh: - log.Debugf("received step result for %s", res.bundle.TestStepLabel) - p.state.SetStep(res.bundle.TestStepLabel, res.err) - err = res.err - } - if err != nil { - return err - } - log.Debugf("steps completed: %d, expected: %d", len(p.state.CompletedSteps()), len(p.bundles)) - log.Debugf("routing completed: %d, expected: %d", len(p.state.CompletedRouting()), len(p.bundles)) - stepsCompleted := len(p.state.CompletedSteps()) == len(p.bundles) - routingCompleted := len(p.state.CompletedRouting()) == len(p.bundles) - if stepsCompleted && routingCompleted { - break - } - } - - return nil -} - -// init initializes the pipeline by connecting steps and routing blocks. The result of pipeline -// initialization is a set of control/result channels assigned to the pipeline object. The pipeline -// input channel is returned. -func (p *pipeline) init(ctx statectx.Context) (routeInFirst chan *target.Target) { - p.log.Debugf("starting") - - if p.ctrlChannels != nil { - p.log.Panicf("pipeline is already initialized, control channel are already configured") - } - - var ( - routeOut chan *target.Target - routeIn chan *target.Target - ) - - pipelineCtx, pause, cancel := statectx.WithParent(ctx) - - // result channels used to communicate result information from the routing blocks - // and step executors - routingResultCh := make(chan routeResult) - stepResultCh := make(chan stepResult) - targetErrCh := make(chan cerrors.TargetError) - - routeIn = make(chan *target.Target) - for position, testStepBundle := range p.bundles { - - // Input and output channels for the TestStep - stepInCh := make(chan *target.Target) - stepOutCh := make(chan *target.Target) - stepErrCh := make(chan cerrors.TargetError) - - routeOut = make(chan *target.Target) - - // First step of the pipeline, keep track of the routeIn channel as this is - // going to be used to injects targets into the pipeline from outside. Also - // add an intermediate goroutine which keeps track of how many targets have - // been injected into the pipeline - if position == 0 { - routeInFirst = make(chan *target.Target) - routeInStep := routeIn - go func() { - defer close(routeInStep) - numIngress := uint64(0) - for t := range routeInFirst { - routeInStep <- t - numIngress++ - } - atomic.StoreUint64(&p.numIngress, numIngress) - }() - } - - stepChannels := stepCh{stepIn: stepInCh, stepErr: stepErrCh, stepOut: stepOutCh} - routingChannels := routingCh{ - routeIn: routeIn, - routeOut: routeOut, - stepIn: stepInCh, - stepErr: stepErrCh, - stepOut: stepOutCh, - targetErr: targetErrCh, - } - - // Build the Header that the the TestStep will be using for emitting events - Header := testevent.Header{ - JobID: p.jobID, - RunID: p.runID, - TestName: p.test.Name, - TestStepLabel: testStepBundle.TestStepLabel, - } - ev := storage.NewTestEventEmitterFetcherWithAllowedEvents(Header, &testStepBundle.AllowedEvents) - - router := newStepRouter(p.log, testStepBundle, routingChannels, ev, p.timeouts) - go router.route(pipelineCtx.PausedOrDoneCtx(), routingResultCh) - go p.runStep(pipelineCtx, p.jobID, p.runID, testStepBundle, stepChannels, stepResultCh, ev) - // The input of the next routing block is the output of the current routing block - routeIn = routeOut - } - - p.ctrlChannels = &pipelineCtrlCh{ - routingResultCh: routingResultCh, - stepResultCh: stepResultCh, - targetErr: targetErrCh, - targetOut: routeOut, - - ctx: pipelineCtx, - cancel: cancel, - pause: pause, - } - - return -} - -// run is a blocking method which executes the pipeline until successful or failed termination -func (p *pipeline) run(completedTargetsCh chan<- *target.Target) error { - p.log.Debugf("run") - if p.ctrlChannels == nil { - p.log.Panicf("pipeline is not initialized, control channels are not available") - } - ctx := p.ctrlChannels.ctx - - // Wait for the pipeline to complete. If an error occurrs, cancel all TestSteps - // and routing blocks and wait again for completion until shutdown timeout occurrs. - p.log.Infof("waiting for pipeline to complete") - completionError := p.waitTargets(ctx.PausedOrDoneCtx(), completedTargetsCh) - - pauseAsserted := ctx.PausedOrDoneCtx().Err() == statectx.ErrPaused - cancellationAsserted := ctx.PausedOrDoneCtx().Err() == statectx.ErrCanceled - - if completionError != nil { - p.log.Warningf("test failed to complete: %v. Forcing cancellation.", completionError) - p.ctrlChannels.cancel() // terminate routing and and propagate cancellation to the steps - } else if cancellationAsserted { - p.log.Infof("cancellation was asserted") - } else if pauseAsserted { - p.log.Warningf("received pause request") - } - - // If either cancellation or pause have been asserted, we need to wait for the - // pipeline to terminate - if cancellationAsserted || pauseAsserted || completionError != nil { - signal := "cancellation" - if pauseAsserted { - signal = "pause" - } - terminationError := p.waitTermination() - if terminationError != nil { - p.log.Infof("test did not terminate correctly after %s signal: %v", signal, terminationError) - } else { - p.log.Infof("test terminated correctly after %s signal", signal) - } - if completionError != nil { - return completionError - } - return terminationError - } - p.log.Infof("completed") - return nil -} - -func newPipeline(log *logrus.Entry, bundles []test.TestStepBundle, test *test.Test, jobID types.JobID, runID types.RunID, timeouts TestRunnerTimeouts) *pipeline { - p := pipeline{log: log, bundles: bundles, jobID: jobID, runID: runID, test: test, timeouts: timeouts} - p.state = NewState() - return &p -} diff --git a/pkg/runner/test_runner_route.go b/pkg/runner/test_runner_route.go deleted file mode 100644 index 4f5a880d..00000000 --- a/pkg/runner/test_runner_route.go +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright (c) Facebook, Inc. and its affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -package runner - -import ( - "container/list" - "context" - "encoding/json" - "fmt" - "log" - "sync" - "time" - - "github.com/facebookincubator/contest/pkg/event/testevent" - "github.com/facebookincubator/contest/pkg/logging" - "github.com/facebookincubator/contest/pkg/target" - "github.com/facebookincubator/contest/pkg/test" - "github.com/sirupsen/logrus" -) - -// router implements the routing logic that injects targets into a test step and consumes -// targets in output from another test step -type stepRouter struct { - log *logrus.Entry - - routingChannels routingCh - bundle test.TestStepBundle - ev testevent.EmitterFetcher - - timeouts TestRunnerTimeouts -} - -// routeIn is responsible for accepting a target from the previous routing block -// and injecting it into the associated test step. Returns the numer of targets -// injected into the test step or an error upon failure -func (r *stepRouter) routeIn(ctx context.Context) (int, error) { - - stepLabel := r.bundle.TestStepLabel - log := logging.AddField(r.log, "step", stepLabel) - log = logging.AddField(log, "phase", "routeIn") - - var ( - err error - injectionWg sync.WaitGroup - routeInProgress bool - ) - - // terminateTargetWriter is a control channel used to signal termination to - // the writer object which injects a target into the test step - terminateTargetWriterCtx, terminateTargetWriter := context.WithCancel(context.Background()) - defer terminateTargetWriter() // avoids possible goroutine deadlock in context.WithCancel implementation - - // `targets` is used to buffer targets coming from the previous routing blocks, - // queueing them for injection into the TestStep. The list is accessed - // synchronously by a single goroutine. - targets := list.New() - - // `ingressTarget` is used to keep track of ingress times of a target into a test step - ingressTarget := make(map[string]time.Time) - - // Channel that the injection goroutine uses to communicate back to `routeIn` the results - // of asynchronous injection - injectResultCh := make(chan injectionResult) - - // injectionChannels are used to inject targets into test step and return results to `routeIn` - injectionChannels := injectionCh{stepIn: r.routingChannels.stepIn, resultCh: injectResultCh} - - log.Debugf("initializing routeIn for %s", stepLabel) - targetWriter := newTargetWriter(log, r.timeouts) - - for { - select { - case <-ctx.Done(): - err = fmt.Errorf("termination requested for routing into %s", stepLabel) - case injectionResult := <-injectResultCh: - log.Debugf("received injection result for %v", injectionResult.target) - routeInProgress = false - if injectionResult.err != nil { - err = fmt.Errorf("routing failed while injecting target %+v into %s", injectionResult.target, stepLabel) - targetInErrEv := testevent.Data{EventName: target.EventTargetInErr, Target: injectionResult.target} - if err := r.ev.Emit(targetInErrEv); err != nil { - log.Warningf("could not emit %v event for target %+v: %v", targetInErrEv, *injectionResult.target, err) - } - } else { - targetInEv := testevent.Data{EventName: target.EventTargetIn, Target: injectionResult.target} - if err := r.ev.Emit(targetInEv); err != nil { - log.Warningf("could not emit %v event for Target: %+v", targetInEv, *injectionResult.target) - } - } - case t, chanIsOpen := <-r.routingChannels.routeIn: - if !chanIsOpen { - log.Debugf("routing input channel closed") - r.routingChannels.routeIn = nil - } else { - log.Debugf("received target %v in input", t) - targets.PushFront(t) - } - } - - if err != nil { - break - } - - if routeInProgress { - continue - } - - // no targets currently being injected in the test step - if targets.Len() == 0 { - if r.routingChannels.routeIn == nil { - log.Debugf("input channel is closed and no more targets are available, closing step input channel") - close(r.routingChannels.stepIn) - break - } - continue - } - - t := targets.Back().Value.(*target.Target) - ingressTarget[t.ID] = time.Now() - targets.Remove(targets.Back()) - log.Debugf("writing target %v into test step", t) - routeInProgress = true - injectionWg.Add(1) - go func() { - defer injectionWg.Done() - targetWriter.writeTargetWithResult(terminateTargetWriterCtx, t, injectionChannels) - }() - } - // Signal termination to the injection routines regardless of the result of the - // routing. If the routing completed successfully, this is a no-op. If there is an - // injection goroutine running, wait for it to terminate, as we might have gotten - // here after a cancellation signal. - terminateTargetWriter() - injectionWg.Wait() - - if err != nil { - log.Debugf("routeIn failed: %v", err) - return 0, err - } - return len(ingressTarget), nil -} - -func (r *stepRouter) emitOutEvent(t *target.Target, err error) error { - - log := logging.AddField(r.log, "step", r.bundle.TestStepLabel) - log = logging.AddField(log, "phase", "emitOutEvent") - - if err != nil { - targetErrPayload := target.ErrPayload{Error: err.Error()} - payloadEncoded, err := json.Marshal(targetErrPayload) - if err != nil { - log.Warningf("could not encode target error ('%s'): %v", targetErrPayload, err) - } - rawPayload := json.RawMessage(payloadEncoded) - targetErrEv := testevent.Data{EventName: target.EventTargetErr, Target: t, Payload: &rawPayload} - if err := r.ev.Emit(targetErrEv); err != nil { - return err - } - } else { - targetOutEv := testevent.Data{EventName: target.EventTargetOut, Target: t} - if err := r.ev.Emit(targetOutEv); err != nil { - log.Warningf("could not emit %v event for target: %v", targetOutEv, *t) - } - } - return nil -} - -// routeOut is responsible for accepting a target from the associated test step -// and forward it to the next routing block. Returns the number of targets -// received from the test step or an error upon failure -func (r *stepRouter) routeOut(ctx context.Context) (int, error) { - - stepLabel := r.bundle.TestStepLabel - log := logging.AddField(r.log, "step", stepLabel) - log = logging.AddField(log, "phase", "routeOut") - - targetWriter := newTargetWriter(log, r.timeouts) - - var err error - - log.Debugf("initializing routeOut for %s", stepLabel) - // `egressTarget` is used to keep track of egress times of a target from a test step - egressTarget := make(map[string]time.Time) - - for { - select { - case <-ctx.Done(): - err = fmt.Errorf("termination requested for routing into %s", r.bundle.TestStepLabel) - case t, chanIsOpen := <-r.routingChannels.stepOut: - if !chanIsOpen { - log.Debugf("step output closed") - r.routingChannels.stepOut = nil - break - } - - if _, targetPresent := egressTarget[t.ID]; targetPresent { - err = fmt.Errorf("step %s returned target %+v multiple times", r.bundle.TestStepLabel, t) - break - } - // Emit an event signaling that the target has left the TestStep - if err := r.emitOutEvent(t, nil); err != nil { - log.Warningf("could not emit out event for target %v: %v", *t, err) - } - // Register egress time and forward target to the next routing block - egressTarget[t.ID] = time.Now() - if err := targetWriter.writeTimeout(ctx, r.routingChannels.routeOut, t, r.timeouts.MessageTimeout); err != nil { - log.Panicf("could not forward target to the test runner: %+v", err) - } - case targetError, chanIsOpen := <-r.routingChannels.stepErr: - if !chanIsOpen { - log.Debugf("step error closed") - r.routingChannels.stepErr = nil - break - } - - if _, targetPresent := egressTarget[targetError.Target.ID]; targetPresent { - err = fmt.Errorf("step %s returned target %+v multiple times", r.bundle.TestStepLabel, targetError.Target) - } else { - if err := r.emitOutEvent(targetError.Target, targetError.Err); err != nil { - log.Warningf("could not emit err event for target: %v", *targetError.Target) - } - egressTarget[targetError.Target.ID] = time.Now() - if err := targetWriter.writeTargetError(ctx, r.routingChannels.targetErr, targetError, r.timeouts.MessageTimeout); err != nil { - log.Panicf("could not forward target (%+v) to the test runner: %v", targetError.Target, err) - } - } - } - if err != nil { - break - } - if r.routingChannels.stepErr == nil && r.routingChannels.stepOut == nil { - log.Debugf("output and error channel from step are closed, routeOut should terminate") - close(r.routingChannels.routeOut) - break - } - } - - if err != nil { - log.Debugf("routeOut failed: %v", err) - return 0, err - } - return len(egressTarget), nil - -} - -// route implements the routing logic from the previous routing block to the test step -// and from the test step to the next routing block -func (r *stepRouter) route(ctx context.Context, resultCh chan<- routeResult) { - - var ( - inTargets, outTargets int - errRouteIn, errRouteOut error - ) - - terminateInternalCtx, terminateInternal := context.WithCancel(ctx) - defer terminateInternal() // avoids possible goroutine deadlock in context.WithCancel implementation - - wg := sync.WaitGroup{} - wg.Add(2) - go func() { - defer wg.Done() - inTargets, errRouteIn = r.routeIn(terminateInternalCtx) - if errRouteIn != nil { - terminateInternal() - } - }() - - go func() { - defer wg.Done() - outTargets, errRouteOut = r.routeOut(terminateInternalCtx) - if errRouteOut != nil { - terminateInternal() - } - }() - wg.Wait() - - routingErr := errRouteIn - if routingErr == nil { - routingErr = errRouteOut - } - if routingErr == nil && inTargets != outTargets { - routingErr = fmt.Errorf("step %s completed but did not return all injected Targets (%d!=%d)", r.bundle.TestStepLabel, inTargets, outTargets) - } - - // Send the result to the test runner, which is expected to be listening - // within `MessageTimeout`. If that's not the case, we hit an unrecovrable - // condition. - select { - case resultCh <- routeResult{bundle: r.bundle, err: routingErr}: - case <-time.After(r.timeouts.MessageTimeout): - log.Panicf("could not send routing block result") - } -} - -func newStepRouter(log *logrus.Entry, bundle test.TestStepBundle, routingChannels routingCh, ev testevent.EmitterFetcher, timeouts TestRunnerTimeouts) *stepRouter { - routerLogger := logging.AddField(log, "step", bundle.TestStepLabel) - r := stepRouter{log: routerLogger, bundle: bundle, routingChannels: routingChannels, ev: ev, timeouts: timeouts} - return &r -} diff --git a/pkg/runner/test_runner_route_test.go b/pkg/runner/test_runner_route_test.go deleted file mode 100644 index d34d18ae..00000000 --- a/pkg/runner/test_runner_route_test.go +++ /dev/null @@ -1,400 +0,0 @@ -// Copyright (c) Facebook, Inc. and its affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -package runner - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/facebookincubator/contest/pkg/cerrors" - "github.com/facebookincubator/contest/pkg/event" - "github.com/facebookincubator/contest/pkg/event/testevent" - "github.com/facebookincubator/contest/pkg/logging" - "github.com/facebookincubator/contest/pkg/pluginregistry" - "github.com/facebookincubator/contest/pkg/storage" - "github.com/facebookincubator/contest/pkg/target" - "github.com/facebookincubator/contest/pkg/test" - "github.com/facebookincubator/contest/plugins/storage/memory" - "github.com/facebookincubator/contest/plugins/teststeps/example" - - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" -) - -var testSteps = map[string]test.TestStepFactory{ - example.Name: example.New, -} - -var testStepsEvents = map[string][]event.Name{ - example.Name: example.Events, -} - -type TestRunnerSuite struct { - suite.Suite - routingChannels routingCh - router *stepRouter - - // the following channels are the same channels wrapped within `routingChannels`, - // but used in the opposite direction compared to the routing block. When the - // routing block reads results, the corresponding channel of the test suite has - // write direction, when the routing block writes results, the corresponding channel - // of the test suite has read direction - routeInCh chan<- *target.Target - routeOutCh <-chan *target.Target - - stepInCh <-chan *target.Target - stepOutCh chan<- *target.Target - stepErrCh chan<- cerrors.TargetError - - targetErrCh <-chan cerrors.TargetError -} - -func (suite *TestRunnerSuite) SetupTest() { - - log := logging.GetLogger("TestRunnerSuite") - - pluginRegistry := pluginregistry.NewPluginRegistry() - // Setup the PluginRegistry by registering TestSteps - for name, tsfactory := range testSteps { - if _, ok := testStepsEvents[name]; !ok { - suite.T().Errorf("test step %s does not define any associated event", name) - } - err := pluginRegistry.RegisterTestStep(name, tsfactory, testStepsEvents[name]) - require.NoError(suite.T(), err) - } - ts1, err := pluginRegistry.NewTestStep("Example") - require.NoError(suite.T(), err) - - params := make(test.TestStepParameters) - bundle := test.TestStepBundle{TestStep: ts1, TestStepLabel: "FirstStage", Parameters: params} - - routeInCh := make(chan *target.Target) - routeOutCh := make(chan *target.Target) - stepInCh := make(chan *target.Target) - stepOutCh := make(chan *target.Target) - stepErrCh := make(chan cerrors.TargetError) - targetErrCh := make(chan cerrors.TargetError) - - suite.routeInCh = routeInCh - suite.routeOutCh = routeOutCh - suite.stepOutCh = stepOutCh - suite.stepInCh = stepInCh - suite.stepErrCh = stepErrCh - suite.targetErrCh = targetErrCh - - suite.routingChannels = routingCh{ - routeIn: routeInCh, - routeOut: routeOutCh, - stepIn: stepInCh, - stepErr: stepErrCh, - stepOut: stepOutCh, - targetErr: targetErrCh, - } - - s, err := memory.New() - require.NoError(suite.T(), err) - err = storage.SetStorage(s) - require.NoError(suite.T(), err) - - header := testevent.Header{ - JobID: 1, - RunID: 1, - TestName: "TestRunnerSuite", - TestStepLabel: "TestRunnerSuite", - } - ev := storage.NewTestEventEmitterFetcher(header) - - timeouts := TestRunnerTimeouts{ - StepInjectTimeout: 30 * time.Second, - MessageTimeout: 25 * time.Second, - ShutdownTimeout: 5 * time.Second, - StepShutdownTimeout: 5 * time.Second, - } - - suite.router = newStepRouter(log, bundle, suite.routingChannels, ev, timeouts) -} - -func (suite *TestRunnerSuite) TestRouteInRoutesAllTargets() { - - // test that all targets are routed to step input channel without delay, in order - targets := []*target.Target{ - {ID: "001", FQDN: "host001.facebook.com"}, - {ID: "002", FQDN: "host002.facebook.com"}, - {ID: "003", FQDN: "host003.facebook.com"}, - } - - routeInResult := make(chan error) - stepInResult := make(chan error) - - go func() { - // start routing - _, _ = suite.router.routeIn(context.Background()) - }() - - // inject targets - go func() { - defer close(suite.routeInCh) - for _, target := range targets { - select { - case <-time.After(2 * time.Second): - routeInResult <- fmt.Errorf("target should be accepted by routeIn block within timeout") - return - case suite.routeInCh <- target: - } - } - routeInResult <- nil - }() - - // mimic the test step consuming targets in input - go func() { - numTargets := 0 - for { - select { - case t, ok := <-suite.stepInCh: - if !ok { - if numTargets != len(targets) { - stepInResult <- fmt.Errorf("not all targets received by teste step") - } else { - stepInResult <- nil - } - return - } - if numTargets+1 > len(targets) { - stepInResult <- fmt.Errorf("more targets returned than injected") - return - } - if t != targets[numTargets] { - stepInResult <- fmt.Errorf("targets returned in wrong order") - return - } - numTargets++ - case <-time.After(2 * time.Second): - stepInResult <- fmt.Errorf("expected target on channel within timeout") - } - } - }() - - for { - select { - case <-time.After(5 * time.Second): - suite.T().Errorf("test should return within timeout") - case err := <-stepInResult: - stepInResult = nil - if err != nil { - suite.T().Errorf("step in returned error: %v", err) - } - case err := <-routeInResult: - routeInResult = nil - if err != nil { - suite.T().Errorf("route in returned error: %v", err) - } - } - if stepInResult == nil && routeInResult == nil { - return - } - } -} - -func (suite *TestRunnerSuite) TestRouteOutRoutesAllSuccessfulTargets() { - - // test that all targets are routed in output from a test step are received by - // the routing logic and forwarded to the next routing block - targets := []*target.Target{ - {ID: "001", FQDN: "host001.facebook.com"}, - {ID: "002", FQDN: "host002.facebook.com"}, - {ID: "003", FQDN: "host003.facebook.com"}, - } - - go func() { - // start routing - _, _ = suite.router.routeOut(context.Background()) - }() - - stepResult := make(chan error) - routeResult := make(chan error) - - // mimic the test step returning targets on the output channel. This test exercise - // the path where all targets complete successfully - go func() { - // it's expected that the step closes both channels - defer close(suite.stepOutCh) - defer close(suite.stepErrCh) - for _, target := range targets { - select { - case <-time.After(2 * time.Second): - stepResult <- fmt.Errorf("target should be accepted by routing block within timeout") - return - case suite.stepOutCh <- target: - } - } - stepResult <- nil - }() - - // mimic the next routing block and the pipeline reading from the routeOut channel and the targetErr - // channel. We don't expect any target coming through targetErr, we expect all targets to be successful - go func() { - numTargets := 0 - for { - select { - case _, ok := <-suite.targetErrCh: - if !ok { - suite.targetErrCh = nil - } else { - routeResult <- fmt.Errorf("no targets expected on the error channel") - return - } - case t, ok := <-suite.routeOutCh: - if !ok { - if numTargets != len(targets) { - routeResult <- fmt.Errorf("not all targets have been returned") - } else { - routeResult <- nil - } - return - } - if numTargets+1 > len(targets) { - routeResult <- fmt.Errorf("more targets returned than injected") - return - } - if t.ID != targets[numTargets].ID { - routeResult <- fmt.Errorf("targets returned in wrong order") - return - } - numTargets++ - case <-time.After(2 * time.Second): - routeResult <- fmt.Errorf("expected target on channel within timeout") - return - } - } - }() - - for { - select { - case <-time.After(5 * time.Second): - suite.T().Errorf("test should return within timeout") - case err := <-stepResult: - stepResult = nil - if err != nil { - suite.T().Errorf("step output returned error: %v", err) - } - case err := <-routeResult: - routeResult = nil - if err != nil { - suite.T().Errorf("router returned error: %v", err) - } - } - if stepResult == nil && routeResult == nil { - return - } - } -} - -func (suite *TestRunnerSuite) TestRouteOutRoutesAllFailedTargets() { - - // test that all targets are routed in output from a test step are received by - // the routing logic and forwarded to the next routing block - targets := []*target.Target{ - {ID: "001", FQDN: "host001.facebook.com"}, - {ID: "002", FQDN: "host002.facebook.com"}, - {ID: "003", FQDN: "host003.facebook.com"}, - } - - go func() { - // start routing - _, _ = suite.router.routeOut(context.Background()) - }() - - stepResult := make(chan error) - routeResult := make(chan error) - - // mimic the test step returning targets on the output channel. This test exercise - // the path where all targets complete successfully - go func() { - // it's expected that the step closes both channels - defer close(suite.stepOutCh) - defer close(suite.stepErrCh) - for _, target := range targets { - targetErr := cerrors.TargetError{Target: target, Err: fmt.Errorf("test error")} - select { - case <-time.After(2 * time.Second): - stepResult <- fmt.Errorf("target should be accepted by routing block within timeout") - return - case suite.stepErrCh <- targetErr: - } - } - stepResult <- nil - }() - - // mimic the next routing block and the pipeline reading from the routeOut channel and the targetErr - // channel. We don't expect any target coming through targetErr, we expect all targets to be successful - go func() { - numTargets := 0 - for { - select { - case targetErr, ok := <-suite.targetErrCh: - if !ok { - routeResult <- fmt.Errorf("target error channel should not be closed by routing block") - return - } - if targetErr.Err == nil { - routeResult <- fmt.Errorf("expected error associated to the target") - return - } - if numTargets+1 > len(targets) { - routeResult <- fmt.Errorf("more targets returned than injected") - return - } - if targetErr.Target.ID != targets[numTargets].ID { - routeResult <- fmt.Errorf("targets returned in wrong order") - return - } - - numTargets++ - if numTargets == len(targets) { - routeResult <- nil - return - } - case _, ok := <-suite.routeOutCh: - if !ok { - suite.routeOutCh = nil - } else { - routeResult <- fmt.Errorf("no targets expected on the output channel") - return - } - case <-time.After(2 * time.Second): - routeResult <- fmt.Errorf("expected target on channel within timeout") - return - } - } - }() - - for { - select { - case <-time.After(5 * time.Second): - suite.T().Errorf("test should return within timeout") - case err := <-stepResult: - stepResult = nil - if err != nil { - suite.T().Errorf("step output routine returned error: %v", err) - } - case err := <-routeResult: - routeResult = nil - if err != nil { - suite.T().Errorf("router returned error: %v", err) - } - } - if stepResult == nil && routeResult == nil { - return - } - } -} - -func TestTestRunnerSuite(t *testing.T) { - TestRunnerSuite := &TestRunnerSuite{} - suite.Run(t, TestRunnerSuite) -} diff --git a/pkg/runner/test_runner_test.go b/pkg/runner/test_runner_test.go new file mode 100644 index 00000000..70c590cf --- /dev/null +++ b/pkg/runner/test_runner_test.go @@ -0,0 +1,548 @@ +// Copyright (c) Facebook, Inc. and its affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +package runner + +import ( + "flag" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/facebookincubator/contest/pkg/cerrors" + "github.com/facebookincubator/contest/pkg/event" + "github.com/facebookincubator/contest/pkg/event/testevent" + "github.com/facebookincubator/contest/pkg/logging" + "github.com/facebookincubator/contest/pkg/pluginregistry" + "github.com/facebookincubator/contest/pkg/statectx" + "github.com/facebookincubator/contest/pkg/storage" + "github.com/facebookincubator/contest/pkg/target" + "github.com/facebookincubator/contest/pkg/test" + "github.com/facebookincubator/contest/pkg/types" + "github.com/facebookincubator/contest/plugins/storage/memory" + "github.com/facebookincubator/contest/plugins/teststeps/example" + "github.com/facebookincubator/contest/tests/plugins/teststeps/badtargets" + "github.com/facebookincubator/contest/tests/plugins/teststeps/channels" + "github.com/facebookincubator/contest/tests/plugins/teststeps/hanging" + "github.com/facebookincubator/contest/tests/plugins/teststeps/noreturn" + "github.com/facebookincubator/contest/tests/plugins/teststeps/panicstep" +) + +const testName = "SimpleTest" + +var ( + evs storage.ResettableStorage + pluginRegistry *pluginregistry.PluginRegistry +) + +func TestMain(m *testing.M) { + flag.Parse() + logging.Debug() + if ms, err := memory.New(); err == nil { + evs = ms + if err := storage.SetStorage(ms); err != nil { + panic(err.Error()) + } + } else { + panic(fmt.Sprintf("could not initialize in-memory storage layer: %v", err)) + } + pluginRegistry = pluginregistry.NewPluginRegistry() + for _, e := range []struct { + name string + factory test.TestStepFactory + events []event.Name + }{ + {badtargets.Name, badtargets.New, badtargets.Events}, + {channels.Name, channels.New, channels.Events}, + {example.Name, example.New, example.Events}, + {hanging.Name, hanging.New, hanging.Events}, + {noreturn.Name, noreturn.New, noreturn.Events}, + {panicstep.Name, panicstep.New, panicstep.Events}, + } { + if err := pluginRegistry.RegisterTestStep(e.name, e.factory, e.events); err != nil { + panic(fmt.Sprintf("could not register TestStep: %v", err)) + } + } + os.Exit(m.Run()) +} + +func eventToStringNoTime(ev testevent.Event) string { + // Omit the timestamp to make output stable. + return fmt.Sprintf("{%s%s}", ev.Header, ev.Data) +} + +func resetEventStorage() { + if err := evs.Reset(); err != nil { + panic(err.Error()) + } +} + +func tgt(id string) *target.Target { + return &target.Target{ID: id} +} + +func getEvents(targetID, stepLabel *string) string { + q, _ := testevent.BuildQuery(testevent.QueryTestName(testName)) + results, _ := evs.GetTestEvents(q) + var resultsForTarget []string + for _, r := range results { + if targetID != nil { + if r.Data.Target == nil { + continue + } + if *targetID != "" && r.Data.Target.ID != *targetID { + continue + } + } + if stepLabel != nil { + if *stepLabel != "" && r.Header.TestStepLabel != *stepLabel { + continue + } + if targetID == nil && r.Data.Target != nil { + continue + } + } + resultsForTarget = append(resultsForTarget, eventToStringNoTime(r)) + } + return "\n" + strings.Join(resultsForTarget, "\n") + "\n" +} + +func getStepEvents(stepLabel string) string { + return getEvents(nil, &stepLabel) +} + +func getTargetEvents(targetID string) string { + return getEvents(&targetID, nil) +} + +func newStep(label, name string, params *test.TestStepParameters) test.TestStepBundle { + td := test.TestStepDescriptor{ + Name: name, + Label: label, + } + if params != nil { + td.Parameters = *params + } + sb, err := pluginRegistry.NewTestStepBundle(td, nil) + if err != nil { + panic(fmt.Sprintf("failed to create test step bundle: %v", err)) + } + return *sb +} + +func newExampleStep(label string, failPct int, failTargets string, delayTargets string) test.TestStepBundle { + return newStep(label, example.Name, &test.TestStepParameters{ + example.FailPctParam: []test.Param{*test.NewParam(fmt.Sprintf("%d", failPct))}, + example.FailTargetsParam: []test.Param{*test.NewParam(failTargets)}, + example.DelayTargetsParam: []test.Param{*test.NewParam(delayTargets)}, + }) +} + +type runRes struct { + res []byte + err error +} + +func runWithTimeout(t *testing.T, tr TestRunner, ctx statectx.Context, resumeState []byte, runID types.RunID, timeout time.Duration, targets []*target.Target, bundles []test.TestStepBundle) ([]byte, error) { + newCtx, _, cancel := statectx.WithParent(ctx) + test := &test.Test{ + Name: testName, + TestStepsBundles: bundles, + } + resCh := make(chan runRes) + go func() { + res, err := tr.Run(newCtx, test, targets, 1, runID, resumeState) + resCh <- runRes{res: res, err: err} + }() + var res runRes + select { + case res = <-resCh: + case <-time.After(timeout): + cancel() + assert.FailNow(t, "TestRunner should not time out") + } + return res.res, res.err +} + +// Simple case: one target, one step, success. +func Test1Step1Success(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("T1")}, + []test.TestStepBundle{ + newExampleStep("Step 1", 0, "", ""), + }, + ) + require.NoError(t, err) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepRunningEvent]} +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepFinishedEvent]} +`, getStepEvents("")) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleFinishedEvent]} +`, getTargetEvents("T1")) +} + +// Simple case: one target, one step, failure. +func Test1Step1Fail(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("T1")}, + []test.TestStepBundle{ + newExampleStep("Step 1", 100, "", ""), + }, + ) + require.NoError(t, err) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepRunningEvent]} +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepFinishedEvent]} +`, getStepEvents("Step 1")) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleFailedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} TestError &"\"target failed\""]} +`, getTargetEvents("T1")) +} + +// One step pipeline with two targets - one fails, one succeeds. +func Test1Step1Success1Fail(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("T1"), tgt("T2")}, + []test.TestStepBundle{ + newExampleStep("Step 1", 0, "T1", "T2=100"), + }, + ) + require.NoError(t, err) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepRunningEvent]} +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepFinishedEvent]} +`, getStepEvents("")) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleFailedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} TestError &"\"target failed\""]} +`, getTargetEvents("T1")) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][Target{ID: "T2"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T2"} ExampleFinishedEvent]} +`, getTargetEvents("T2")) +} + +// Three-step pipeline, two targets: T1 fails at step 1, T2 fails at step 2, +// step 3 is not reached and not even run. +func Test3StepsNotReachedStepNotRun(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("T1"), tgt("T2")}, + []test.TestStepBundle{ + newExampleStep("Step 1", 0, "T1", ""), + newExampleStep("Step 2", 0, "T2", ""), + newExampleStep("Step 3", 0, "", ""), + }, + ) + require.NoError(t, err) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepRunningEvent]} +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepFinishedEvent]} +`, getStepEvents("Step 1")) + require.Equal(t, ` +{[1 1 SimpleTest Step 2][(*Target)(nil) ExampleStepRunningEvent]} +{[1 1 SimpleTest Step 2][(*Target)(nil) ExampleStepFinishedEvent]} +`, getStepEvents("Step 2")) + require.Equal(t, "\n\n", getStepEvents("Step 3")) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleFailedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} TestError &"\"target failed\""]} +`, getTargetEvents("T1")) + require.Equal(t, ` +{[1 1 SimpleTest Step 1][Target{ID: "T2"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T2"} ExampleFinishedEvent]} +{[1 1 SimpleTest Step 2][Target{ID: "T2"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 2][Target{ID: "T2"} ExampleFailedEvent]} +{[1 1 SimpleTest Step 2][Target{ID: "T2"} TestError &"\"target failed\""]} +`, getTargetEvents("T2")) +} + +// A misbehaving step that fails to shut down properly after processing targets +// and does not return. +func TestNoReturnStepWithCorrectTargetForwarding(t *testing.T) { + resetEventStorage() + tr := NewTestRunnerWithTimeouts(100*time.Millisecond, 200*time.Millisecond) + ctx, _, cancel := statectx.New() + defer cancel() + _, err := runWithTimeout(t, tr, ctx, nil, 1, 2*time.Second, + []*target.Target{tgt("T1")}, + []test.TestStepBundle{ + newStep("Step 1", noreturn.Name, nil), + }, + ) + require.Error(t, err) + require.IsType(t, &cerrors.ErrTestStepsNeverReturned{}, err) +} + +// A misbehaving step that does not process any targets. +func TestNoReturnStepWithoutTargetForwarding(t *testing.T) { + resetEventStorage() + tr := NewTestRunnerWithTimeouts(100*time.Millisecond, 200*time.Millisecond) + ctx, _, cancel := statectx.New() + defer cancel() + _, err := runWithTimeout(t, tr, ctx, nil, 1, 2*time.Second, + []*target.Target{tgt("T1")}, + []test.TestStepBundle{ + newStep("Step 1", hanging.Name, nil), + }, + ) + require.Error(t, err) + require.IsType(t, &cerrors.ErrTestTargetInjectionTimedOut{}, err) +} + +// A misbehaving step that panics. +func TestStepPanics(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("T1")}, + []test.TestStepBundle{ + newStep("Step 1", panicstep.Name, nil), + }, + ) + require.Error(t, err) + require.IsType(t, &cerrors.ErrTestStepPaniced{}, err) + require.Equal(t, "\n\n", getTargetEvents("T1")) +} + +// A misbehaving step that closes its output channel. +func TestStepClosesChannels(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("T1")}, + []test.TestStepBundle{ + newStep("Step 1", channels.Name, nil), + }, + ) + require.Error(t, err) + require.IsType(t, &cerrors.ErrTestStepClosedChannels{}, err) + require.Equal(t, "\n\n", getTargetEvents("T1")) +} + +// A misbehaving step that yields a result for a target that does not exist. +func TestStepYieldsResultForNonexistentTarget(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("T1")}, + []test.TestStepBundle{ + newStep("Step 1", badtargets.Name, nil), + }, + ) + require.Error(t, err) +} + +// A misbehaving step that yields a result for a target that does not exist. +func TestStepYieldsDuplicateResult(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("TGood"), tgt("TDup")}, + []test.TestStepBundle{ + // TGood makes it past here unscathed and gets delayed in Step 2, + // TDup also emerges fine at first but is then returned again, and that's bad. + newStep("Step 1", badtargets.Name, nil), + newExampleStep("Step 2", 0, "", "T2=100"), + }, + ) + require.Error(t, err) + require.IsType(t, &cerrors.ErrTestStepReturnedDuplicateResult{}, err) +} + +// A misbehaving step that loses targets. +func TestStepLosesTargets(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("TGood"), tgt("TDrop")}, + []test.TestStepBundle{ + newStep("Step 1", badtargets.Name, nil), + }, + ) + require.Error(t, err) + require.IsType(t, &cerrors.ErrTestStepLostTargets{}, err) +} + +// A misbehaving step that yields a result for a target that does exist +// but is not currently waiting for it. +func TestStepYieldsResultForUnexpectedTarget(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + []*target.Target{tgt("T1"), tgt("T1XXX")}, + []test.TestStepBundle{ + // T1XXX fails here. + newExampleStep("Step 1", 0, "T1XXX", ""), + // Yet, a result for it is returned here, which we did not expect. + newStep("Step 2", badtargets.Name, nil), + }, + ) + require.Error(t, err) + require.IsType(t, &cerrors.ErrTestStepReturnedUnexpectedResult{}, err) +} + +// Larger, randomized test - a number of steps, some targets failing, some succeeding. +func TestRandomizedMultiStep(t *testing.T) { + resetEventStorage() + tr := NewTestRunner() + var targets []*target.Target + for i := 1; i <= 100; i++ { + targets = append(targets, tgt(fmt.Sprintf("T%d", i))) + } + _, err := runWithTimeout(t, tr, nil, nil, 1, 2*time.Second, + targets, + []test.TestStepBundle{ + newExampleStep("Step 1", 0, "", "*=10"), // All targets pass the first step, with a slight delay + newExampleStep("Step 2", 25, "", ""), // 25% don't make it past the second step + newExampleStep("Step 3", 25, "", "*=10"), // Another 25% fail at the third step + }, + ) + require.NoError(t, err) + // Every target mush have started and finished the first step. + numFinished := 0 + for _, tgt := range targets { + s1n := "Step 1" + require.Equal(t, fmt.Sprintf(` +{[1 1 SimpleTest Step 1][Target{ID: "%s"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "%s"} ExampleFinishedEvent]} +`, tgt.ID, tgt.ID), + getEvents(&tgt.ID, &s1n)) + s3n := "Step 3" + if strings.Contains(getEvents(&tgt.ID, &s3n), "ExampleFinishedEvent") { + numFinished++ + } + } + // At least some must have finished. + require.Greater(t, numFinished, 0) +} + +// Test pausing/resuming a naive step that does not cooperate. +// In this case we drain input, wait for all targets to emerge and exit gracefully. +func TestPauseResumeSimple(t *testing.T) { + resetEventStorage() + log := logging.GetLogger("TestPauseResumeSimple") + var err error + var resumeState []byte + targets := []*target.Target{tgt("T1"), tgt("T2"), tgt("T3")} + steps := []test.TestStepBundle{ + newExampleStep("Step 1", 0, "T1", ""), + // T2 and T3 will be paused here, the step will be given time to finish. + newExampleStep("Step 2", 0, "", "T2=200,T3=200"), + newExampleStep("Step 3", 0, "", ""), + } + { + tr1 := NewTestRunner() + ctx1, pause, cancel := statectx.New() + defer cancel() + go func() { + time.Sleep(100 * time.Millisecond) + log.Infof("TestPauseResumeNaive: pausing") + pause() + }() + resumeState, err = runWithTimeout(t, tr1, ctx1, nil, 1, 2*time.Second, targets, steps) + require.Error(t, err) + require.IsType(t, statectx.ErrPaused, err) + require.NotNil(t, resumeState) + } + log.Debugf("Resume state: %s", string(resumeState)) + // Make sure that resume state is validated. + { + tr := NewTestRunner() + ctx, _, cancel := statectx.New() + defer cancel() + resumeState2, err := runWithTimeout( + t, tr, ctx, []byte("FOO"), 2, 2*time.Second, targets, steps) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid resume state") + require.Nil(t, resumeState2) + } + { + tr := NewTestRunner() + ctx, _, cancel := statectx.New() + defer cancel() + resumeState2 := strings.Replace(string(resumeState), `"version"`, `"Xversion"`, 1) + _, err := runWithTimeout( + t, tr, ctx, []byte(resumeState2), 3, 2*time.Second, targets, steps) + require.Error(t, err) + require.Contains(t, err.Error(), "incompatible resume state") + } + { + tr := NewTestRunner() + ctx, _, cancel := statectx.New() + defer cancel() + resumeState2 := strings.Replace(string(resumeState), `"job_id":1`, `"job_id":2`, 1) + _, err := runWithTimeout( + t, tr, ctx, []byte(resumeState2), 4, 2*time.Second, targets, steps) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong resume state") + } + // Finally, resume and finish the job. + { + tr2 := NewTestRunner() + ctx2, _, cancel := statectx.New() + defer cancel() + _, err := runWithTimeout(t, tr2, ctx2, resumeState, 5, 2*time.Second, + // Pass exactly the same targets and pipeline to resume properly. + // Don't use the same pointers ot make sure there is no reliance on that. + []*target.Target{tgt("T1"), tgt("T2"), tgt("T3")}, + []test.TestStepBundle{ + newExampleStep("Step 1", 0, "T1", ""), + newExampleStep("Step 2", 0, "", "T2=200,T3=200"), + newExampleStep("Step 3", 0, "", ""), + }, + ) + require.NoError(t, err) + } + // Verify step events. + // Steps 1 and 2 are executed entirely within the first runner instance + // and never started in the second. + require.Equal(t, ` +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepRunningEvent]} +{[1 1 SimpleTest Step 1][(*Target)(nil) ExampleStepFinishedEvent]} +`, getStepEvents("Step 1")) + require.Equal(t, ` +{[1 1 SimpleTest Step 2][(*Target)(nil) ExampleStepRunningEvent]} +{[1 1 SimpleTest Step 2][(*Target)(nil) ExampleStepFinishedEvent]} +`, getStepEvents("Step 2")) + // Step 3 did not get to start in the first instance and ran in the second. + require.Equal(t, ` +{[1 5 SimpleTest Step 3][(*Target)(nil) ExampleStepRunningEvent]} +{[1 5 SimpleTest Step 3][(*Target)(nil) ExampleStepFinishedEvent]} +`, getStepEvents("Step 3")) + // T1 failed entirely within the first run. + require.Equal(t, ` +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} ExampleFailedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T1"} TestError &"\"target failed\""]} +`, getTargetEvents("T1")) + // T2 and T3 ran in both. + require.Equal(t, ` +{[1 1 SimpleTest Step 1][Target{ID: "T2"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 1][Target{ID: "T2"} ExampleFinishedEvent]} +{[1 1 SimpleTest Step 2][Target{ID: "T2"} ExampleStartedEvent]} +{[1 1 SimpleTest Step 2][Target{ID: "T2"} ExampleFinishedEvent]} +{[1 5 SimpleTest Step 3][Target{ID: "T2"} ExampleStartedEvent]} +{[1 5 SimpleTest Step 3][Target{ID: "T2"} ExampleFinishedEvent]} +`, getTargetEvents("T2")) +} diff --git a/pkg/runner/wait_for_first_target.go b/pkg/runner/wait_for_first_target.go deleted file mode 100644 index 8d0602ff..00000000 --- a/pkg/runner/wait_for_first_target.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright (c) Facebook, Inc. and its affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -package runner - -import ( - "context" - "github.com/facebookincubator/contest/pkg/target" -) - -// waitForFirstTarget copies targets from `in` to `out` and also -// signals on first target or if `in` is closed and there was no targets -// at all. -// -// To solve the initial problem there're two approaches: -// * Just wait for a target in `stepCh.stepIn` and then insert -// it back to the channel. -// * Create substitute `stepCh.stepIn` with a new channel and just -// copy all targets from original `stepCh.stepIn` to this new channel. -// And we may detect when we receive the first target in the original -// channel. -// -// The first approach is much simpler, but the second preserves -// the order of targets (which is handy and not misleading -// while reading logs). Here we implement the second one: -func waitForFirstTarget( - ctx context.Context, - in <-chan *target.Target, -) (out chan *target.Target, onFirstTarget, onNoTargets <-chan struct{}) { - onFirstTargetCh := make(chan struct{}) - onNoTargetsCh := make(chan struct{}) - onFirstTarget, onNoTargets = onFirstTargetCh, onNoTargetsCh - - out = make(chan *target.Target) - go func() { - select { - case t, ok := <-in: - if !ok { - close(out) - close(onNoTargetsCh) - return - } - close(onFirstTargetCh) - out <- t - case <-ctx.Done(): - return - } - - for t := range in { - out <- t - } - close(out) - }() - - return -} diff --git a/pkg/runner/wait_for_first_target_test.go b/pkg/runner/wait_for_first_target_test.go deleted file mode 100644 index c5439035..00000000 --- a/pkg/runner/wait_for_first_target_test.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright (c) Facebook, Inc. and its affiliates. -// -// This source code is licensed under the MIT license found in the -// LICENSE file in the root directory of this source tree. - -package runner - -import ( - "context" - "fmt" - "runtime" - "sync" - "testing" - - "github.com/facebookincubator/contest/pkg/target" - "github.com/stretchr/testify/require" -) - -func TestWaitForFirstTarget(t *testing.T) { - t.Run("100targets", func(t *testing.T) { - ch0 := make(chan *target.Target) - ch1, onFirstTargetChan, onNoTargetsChan := waitForFirstTarget(context.Background(), ch0) - - var wgBeforeFirstTarget, wgAfterSecondTarget sync.WaitGroup - wgBeforeFirstTarget.Add(1) - wgAfterSecondTarget.Add(1) - - go func() { - wgBeforeFirstTarget.Wait() - for i := 0; i < 100; i++ { - ch0 <- &target.Target{ - ID: fmt.Sprintf("%d", i), - } - if i == 1 { - // to avoid race conditions in this test we verify - // if there's a "onFirstTarget" signal after the second - // target - wgAfterSecondTarget.Done() - } - } - }() - - runtime.Gosched() - select { - case <-onFirstTargetChan: - t.Fatal("should not happen") - case <-onNoTargetsChan: - t.Fatal("should not happen") - default: - } - - wgBeforeFirstTarget.Done() - - var wgReader sync.WaitGroup - wgReader.Add(1) - go func() { - defer wgReader.Done() - // Checking quantity and order - for i := 0; i < 100; i++ { - _target := <-ch1 - require.Equal(t, &target.Target{ - ID: fmt.Sprintf("%d", i), - }, _target) - } - }() - - wgAfterSecondTarget.Wait() - - select { - case <-onFirstTargetChan: - case <-onNoTargetsChan: - t.Fatal("should not happen") - default: - t.Fatal("should not happen") // see wgAfterSecondTarget.Wait() above - } - - wgReader.Wait() - - runtime.Gosched() - require.Len(t, ch1, 0) - }) - - t.Run("no_target", func(t *testing.T) { - ch0 := make(chan *target.Target) - ch1, onFirstTargetChan, onNoTargetsChan := waitForFirstTarget(context.Background(), ch0) - - runtime.Gosched() - select { - case <-onFirstTargetChan: - t.Fatal("should not happen") - case <-onNoTargetsChan: - t.Fatal("should not happen") - default: - } - - close(ch0) - - select { - case <-onFirstTargetChan: - t.Fatal("should not happen") - case <-onNoTargetsChan: - } - - runtime.Gosched() - require.Len(t, ch1, 0) - _, isOpened := <-ch1 - require.False(t, isOpened) - }) - - t.Run("cancel", func(t *testing.T) { - cancelCh := make(chan struct{}) - ch0 := make(chan *target.Target) - ch1, onFirstTargetChan, onNoTargetsChan := waitForFirstTarget(context.Background(), ch0) - - runtime.Gosched() - select { - case <-onFirstTargetChan: - t.Fatal("should not happen") // see wgBeforeFirstTarget.Wait() above - case <-onNoTargetsChan: - t.Fatal("should not happen") - default: - } - - close(cancelCh) - - runtime.Gosched() - select { - case <-onFirstTargetChan: - t.Fatal("should not happen") - case <-onNoTargetsChan: - t.Fatal("should not happen") - default: - } - - require.Len(t, ch1, 0) - select { - case <-ch1: - t.Fatal("should not happen") - default: - } - }) -} diff --git a/pkg/statectx/context.go b/pkg/statectx/context.go index a22456cc..1dbcc36d 100644 --- a/pkg/statectx/context.go +++ b/pkg/statectx/context.go @@ -39,10 +39,10 @@ func Background() Context { } } -func New() (Context, func(), func()) { - cancelCtx, cancel := newCancelContext(context.Background()) - pauseCtx, pause := newCancelContext(context.Background()) - pauseOrDoneCtx, pauseOrDone := newCancelContext(context.Background()) +func newInternal(cctx, pctx, cpctx context.Context) (Context, func(), func()) { + cancelCtx, cancel := newCancelContext(cctx) + pauseCtx, pause := newCancelContext(pctx) + pauseOrDoneCtx, pauseOrDone := newCancelContext(cpctx) resCtx := &stateCtx{ cancelCtx: cancelCtx, @@ -59,24 +59,15 @@ func New() (Context, func(), func()) { return resCtx, wrap(pause, ErrPaused), wrap(cancel, ErrCanceled) } -func WithParent(ctx Context) (Context, func(), func()) { - cancelCtx, cancel := newCancelContext(ctx) - pauseCtx, pause := newCancelContext(ctx.PausedCtx()) - pauseOrDoneCtx, pauseOrDone := newCancelContext(ctx.PausedOrDoneCtx()) - - resCtx := &stateCtx{ - cancelCtx: cancelCtx, - pauseCtx: pauseCtx, - pauseOrDoneCtx: pauseOrDoneCtx, - } +func New() (Context, func(), func()) { + return newInternal(context.Background(), context.Background(), context.Background()) +} - wrap := func(action func(err error), err error) func() { - return func() { - pauseOrDone(err) - action(err) - } +func WithParent(ctx Context) (Context, func(), func()) { + if ctx == nil { + return New() } - return resCtx, wrap(pause, ErrPaused), wrap(cancel, ErrCanceled) + return newInternal(ctx, ctx.PausedCtx(), ctx.PausedOrDoneCtx()) } type stateCtx struct { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index fe95b017..b56687bd 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -58,6 +58,7 @@ type TransactionalStorage interface { // ResettableStorage is implemented by storage engines that support reset operation type ResettableStorage interface { + Storage Reset() error } diff --git a/pkg/target/target.go b/pkg/target/target.go index 5593ff3f..7600c5a2 100644 --- a/pkg/target/target.go +++ b/pkg/target/target.go @@ -40,9 +40,10 @@ type ErrPayload struct { // FQDN, PrimaryIPv4, and PrimaryIPv6 are used by plugins to contact the target, set as many as possible for maximum plugin compatibility. // Plugins are generally expected to attempt contacting devices via FQDN, IPv4, and IPv6. Note there is no way to enforce this and more specialized plugins might only support a subset. type Target struct { - ID string - FQDN string - PrimaryIPv4, PrimaryIPv6 net.IP + ID string + FQDN string + PrimaryIPv4 net.IP + PrimaryIPv6 net.IP } func (t *Target) String() string { diff --git a/plugins/storage/memory/memory.go b/plugins/storage/memory/memory.go index e29aa0d6..a8a7f588 100644 --- a/plugins/storage/memory/memory.go +++ b/plugins/storage/memory/memory.go @@ -225,7 +225,7 @@ func (m *Memory) Version() (uint64, error) { } // New create a new Memory events storage backend -func New() (storage.Storage, error) { +func New() (storage.ResettableStorage, error) { m := Memory{lock: &sync.Mutex{}} m.jobRequests = make(map[types.JobID]*job.Request) m.jobReports = make(map[types.JobID]*job.JobReport) diff --git a/plugins/teststeps/example/example.go b/plugins/teststeps/example/example.go index a3c17142..c56b150f 100644 --- a/plugins/teststeps/example/example.go +++ b/plugins/teststeps/example/example.go @@ -8,14 +8,18 @@ package example import ( "fmt" "math/rand" + "strconv" "strings" + "time" "github.com/facebookincubator/contest/pkg/cerrors" "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/logging" "github.com/facebookincubator/contest/pkg/statectx" + "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/test" + "github.com/facebookincubator/contest/plugins/teststeps" ) // Name is the name used to look this plugin up. @@ -23,13 +27,25 @@ var Name = "Example" var log = logging.GetLogger("teststeps/" + strings.ToLower(Name)) +// Params this step accepts. +const ( + // A comma-delimited list of target IDs to fail on. + FailTargetsParam = "FailTargets" + // Alternatively, fail this percentage of targets at random. + FailPctParam = "FailPct" + // A comma-delimited list of target IDs to delay and by how much, ID=delay_ms. + DelayTargetsParam = "DelayTargets" +) + // events that we may emit during the plugin's lifecycle. This is used in Events below. // Note that you don't normally need to emit start/finish/cancellation events as // these are emitted automatically by the framework. const ( - StartedEvent = event.Name("ExampleStartedEvent") - FinishedEvent = event.Name("ExampleFinishedEvent") - FailedEvent = event.Name("ExampleFailedEvent") + StartedEvent = event.Name("ExampleStartedEvent") + FinishedEvent = event.Name("ExampleFinishedEvent") + FailedEvent = event.Name("ExampleFailedEvent") + StepRunningEvent = event.Name("ExampleStepRunningEvent") + StepFinishedEvent = event.Name("ExampleStepFinishedEvent") ) // Events defines the events that a TestStep is allow to emit. Emitting an event @@ -37,10 +53,12 @@ const ( var Events = []event.Name{StartedEvent, FinishedEvent, FailedEvent} // Step is an example implementation of a TestStep which simply -// consumes Targets in input and pipes them to the output channel with intermediate -// buffering. It randomly decides if a Target has failed and forwards it on -// the err channel. +// consumes Targets in input and pipes them to the output or error channel +// with intermediate buffering. type Step struct { + failPct int64 + failTargets map[string]bool + delayTargets map[string]time.Duration } // Name returns the name of the Step @@ -48,50 +66,87 @@ func (ts Step) Name() string { return Name } -// Run executes the example step. -func (ts *Step) Run(ctx statectx.Context, ch test.TestStepChannels, _ test.TestStepParameters, ev testevent.Emitter) error { - for { +func (ts *Step) shouldFail(t *target.Target, params test.TestStepParameters) bool { + if ts.failTargets[t.ID] { + return true + } + if ts.failPct > 0 { + roll := rand.Int63n(101) + return (roll <= ts.failPct) + } + return false +} - r := rand.Intn(3) +// Run executes the example step. +func (ts *Step) Run(ctx statectx.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter) error { + f := func(ctx statectx.Context, target *target.Target) error { + log.Infof("Executing on target %s", target) + // NOTE: you may want more robust error handling here, possibly just + // logging the error, or a retry mechanism. Returning an error + // here means failing the entire job. + if err := ev.Emit(testevent.Data{EventName: StartedEvent, Target: target, Payload: nil}); err != nil { + return fmt.Errorf("failed to emit start event: %v", err) + } + delay := ts.delayTargets[target.ID] + if delay == 0 { + delay = ts.delayTargets["*"] + } select { - case target := <-ch.In: - if target == nil { - return nil - } - log.Infof("Executing on target %s", target) - // NOTE: you may want more robust error handling here, possibly just - // logging the error, or a retry mechanism. Returning an error - // here means failing the entire job. - if err := ev.Emit(testevent.Data{EventName: StartedEvent, Target: target, Payload: nil}); err != nil { - return fmt.Errorf("failed to emit start event: %v", err) + case <-time.After(delay): + case <-ctx.Done(): + return statectx.ErrCanceled + } + if ts.shouldFail(target, params) { + if err := ev.Emit(testevent.Data{EventName: FailedEvent, Target: target, Payload: nil}); err != nil { + return fmt.Errorf("failed to emit finished event: %v", err) } - if r == 1 { - select { - case <-ctx.PausedOrDone(): - return nil - case ch.Err <- cerrors.TargetError{Target: target, Err: fmt.Errorf("target failed")}: - if err := ev.Emit(testevent.Data{EventName: FinishedEvent, Target: target, Payload: nil}); err != nil { - return fmt.Errorf("failed to emit finished event: %v", err) - } - } - } else { - select { - case <-ctx.PausedOrDone(): - return nil - case ch.Out <- target: - if err := ev.Emit(testevent.Data{EventName: FailedEvent, Target: target, Payload: nil}); err != nil { - return fmt.Errorf("failed to emit failed event: %v", err) - } - } + return fmt.Errorf("target failed") + } else { + if err := ev.Emit(testevent.Data{EventName: FinishedEvent, Target: target, Payload: nil}); err != nil { + return fmt.Errorf("failed to emit failed event: %v", err) } - case <-ctx.PausedOrDone(): - return nil } + return nil + } + if err := ev.Emit(testevent.Data{EventName: StepRunningEvent}); err != nil { + return fmt.Errorf("failed to emit failed event: %v", err) + } + res := teststeps.ForEachTarget(Name, ctx, ch, f) + if err := ev.Emit(testevent.Data{EventName: StepFinishedEvent}); err != nil { + return fmt.Errorf("failed to emit failed event: %v", err) } + return res } // ValidateParameters validates the parameters associated to the TestStep -func (ts *Step) ValidateParameters(_ test.TestStepParameters) error { +func (ts *Step) ValidateParameters(params test.TestStepParameters) error { + targetsToFail := params.GetOne(FailTargetsParam).String() + if len(targetsToFail) > 0 { + for _, t := range strings.Split(targetsToFail, ",") { + ts.failTargets[t] = true + } + } + targetsToDelay := params.GetOne(DelayTargetsParam).String() + if len(targetsToDelay) > 0 { + for _, e := range strings.Split(targetsToDelay, ",") { + kv := strings.Split(e, "=") + if len(kv) != 2 { + continue + } + v, err := strconv.Atoi(kv[1]) + if err != nil { + return fmt.Errorf("invalid FailTargets: %w", err) + } + ts.delayTargets[kv[0]] = time.Duration(v) * time.Millisecond + } + } + if params.GetOne(FailPctParam).String() != "" { + if pct, err := params.GetInt(FailPctParam); err == nil { + ts.failPct = pct + } else { + return fmt.Errorf("invalid FailPct: %w", err) + } + } return nil } @@ -108,7 +163,10 @@ func (ts *Step) CanResume() bool { // New initializes and returns a new ExampleTestStep. func New() test.TestStep { - return &Step{} + return &Step{ + failTargets: make(map[string]bool), + delayTargets: make(map[string]time.Duration), + } } // Load returns the name, factory and events which are needed to register the step. diff --git a/plugins/teststeps/teststeps.go b/plugins/teststeps/teststeps.go index 7e763a8a..b37adf1f 100644 --- a/plugins/teststeps/teststeps.go +++ b/plugins/teststeps/teststeps.go @@ -35,15 +35,15 @@ func ForEachTarget(pluginName string, ctx statectx.Context, ch test.TestStepChan log.Errorf("%s: ForEachTarget: failed to apply test step function on target %s: %v", pluginName, t, err) select { case ch.Err <- cerrors.TargetError{Target: t, Err: err}: - case <-ctx.PausedOrDone(): - log.Debugf("%s: ForEachTarget: received cancellation/pause signal while reporting error", pluginName) + case <-ctx.Done(): + log.Debugf("%s: ForEachTarget: received cancellation signal while reporting error", pluginName) } } else { log.Debugf("%s: ForEachTarget: target %s completed successfully", pluginName, t) select { case ch.Out <- t: - case <-ctx.PausedOrDone(): - log.Debugf("%s: ForEachTarget: received cancellation/pause signal while reporting success", pluginName) + case <-ctx.Done(): + log.Debugf("%s: ForEachTarget: received pause signal while reporting success", pluginName) } } } @@ -69,9 +69,6 @@ func ForEachTarget(pluginName string, ctx statectx.Context, ch test.TestStepChan case <-ctx.Done(): log.Debugf("%s: ForEachTarget: incoming loop canceled", pluginName) return - case <-ctx.Paused(): - log.Debugf("%s: ForEachTarget: incoming loop paused", pluginName) - return } } }() diff --git a/tests/integ/jobmanager/common.go b/tests/integ/jobmanager/common.go index aabbdb25..1c7f4170 100644 --- a/tests/integ/jobmanager/common.go +++ b/tests/integ/jobmanager/common.go @@ -457,7 +457,6 @@ func (suite *TestJobManagerSuite) TestJobManagerJobCrash() { func (suite *TestJobManagerSuite) TestJobManagerJobCancellationFailure() { config.TestRunnerShutdownTimeout = 1 * time.Second - config.TestRunnerStepShutdownTimeout = 1 * time.Second go func() { suite.jm.Start(suite.sigs) diff --git a/tests/integ/test/testrunner_test.go b/tests/integ/test/testrunner_test.go index 5e258991..27e44469 100644 --- a/tests/integ/test/testrunner_test.go +++ b/tests/integ/test/testrunner_test.go @@ -13,7 +13,8 @@ import ( "testing" "time" - "github.com/facebookincubator/contest/pkg/cerrors" + "github.com/stretchr/testify/require" + "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/logging" "github.com/facebookincubator/contest/pkg/pluginregistry" @@ -23,7 +24,6 @@ import ( "github.com/facebookincubator/contest/pkg/target" "github.com/facebookincubator/contest/pkg/test" "github.com/facebookincubator/contest/pkg/types" - "github.com/facebookincubator/contest/plugins/storage/memory" "github.com/facebookincubator/contest/plugins/teststeps/cmd" "github.com/facebookincubator/contest/plugins/teststeps/echo" @@ -34,8 +34,6 @@ import ( "github.com/facebookincubator/contest/tests/plugins/teststeps/hanging" "github.com/facebookincubator/contest/tests/plugins/teststeps/noreturn" "github.com/facebookincubator/contest/tests/plugins/teststeps/panicstep" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) var ( @@ -130,7 +128,7 @@ func TestSuccessfulCompletion(t *testing.T) { go func() { tr := runner.NewTestRunner() - err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID) + _, err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID, nil) errCh <- err }() select { @@ -141,180 +139,6 @@ func TestSuccessfulCompletion(t *testing.T) { } } -func TestPanicStep(t *testing.T) { - - jobID := types.JobID(1) - runID := types.RunID(1) - - ts1, err := pluginRegistry.NewTestStep("Panic") - require.NoError(t, err) - ts2, err := pluginRegistry.NewTestStep("Example") - require.NoError(t, err) - - params := make(test.TestStepParameters) - testSteps := []test.TestStepBundle{ - test.TestStepBundle{TestStep: ts1, TestStepLabel: "StageOne", Parameters: params}, - test.TestStepBundle{TestStep: ts2, TestStepLabel: "StageTwo", Parameters: params}, - } - - errCh := make(chan error, 1) - stateCtx, _, _ := statectx.New() - - go func() { - tr := runner.NewTestRunner() - err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID) - errCh <- err - }() - select { - case err = <-errCh: - require.Error(t, err) - case <-time.After(successTimeout): - t.Errorf("test should return within timeout: %+v", successTimeout) - } -} - -func TestNoReturnStepWithCorrectTargetForwarding(t *testing.T) { - - jobID := types.JobID(1) - runID := types.RunID(1) - - ts1, err := pluginRegistry.NewTestStep("NoReturn") - require.NoError(t, err) - ts2, err := pluginRegistry.NewTestStep("Example") - require.NoError(t, err) - - params := make(test.TestStepParameters) - testSteps := []test.TestStepBundle{ - test.TestStepBundle{TestStep: ts1, Parameters: params, TestStepLabel: "NoReturn"}, - test.TestStepBundle{TestStep: ts2, Parameters: params, TestStepLabel: "Example"}, - } - - stateCtx, _, _ := statectx.New() - errCh := make(chan error, 1) - - timeouts := runner.TestRunnerTimeouts{ - StepInjectTimeout: 30 * time.Second, - MessageTimeout: 5 * time.Second, - ShutdownTimeout: 1 * time.Second, - StepShutdownTimeout: 1 * time.Second, - } - go func() { - tr := runner.NewTestRunnerWithTimeouts(timeouts) - err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID) - errCh <- err - }() - select { - case err = <-errCh: - require.Error(t, err) - if _, ok := err.(*cerrors.ErrTestStepsNeverReturned); !ok { - errString := fmt.Sprintf("Error returned by TestRunner should be of type ErrTestStepsNeverReturned: %v", err) - assert.FailNow(t, errString) - } - assert.NotNil(t, err.(*cerrors.ErrTestStepsNeverReturned)) - case <-time.After(successTimeout): - t.Errorf("test should return within timeout: %+v", successTimeout) - } -} - -func TestNoReturnStepWithoutTargetForwarding(t *testing.T) { - - jobID := types.JobID(1) - runID := types.RunID(1) - - ts1, err := pluginRegistry.NewTestStep("Hanging") - require.NoError(t, err) - ts2, err := pluginRegistry.NewTestStep("Example") - require.NoError(t, err) - - params := make(test.TestStepParameters) - testSteps := []test.TestStepBundle{ - test.TestStepBundle{TestStep: ts1, TestStepLabel: "StageOne", Parameters: params}, - test.TestStepBundle{TestStep: ts2, TestStepLabel: "StageTwo", Parameters: params}, - } - - stateCtx, _, cancel := statectx.New() - errCh := make(chan error, 1) - - var ( - StepInjectTimeout = 30 * time.Second - MessageTimeout = 5 * time.Second - ShutdownTimeout = 1 * time.Second - StepShutdownTimeout = 1 * time.Second - ) - timeouts := runner.TestRunnerTimeouts{ - StepInjectTimeout: StepInjectTimeout, - MessageTimeout: MessageTimeout, - ShutdownTimeout: ShutdownTimeout, - StepShutdownTimeout: StepShutdownTimeout, - } - - go func() { - tr := runner.NewTestRunnerWithTimeouts(timeouts) - err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID) - errCh <- err - }() - - testTimeout := 2 * time.Second - testShutdownTimeout := ShutdownTimeout + StepShutdownTimeout + 2 - select { - case err = <-errCh: - // The TestRunner should never return and should instead time out - assert.FailNow(t, "TestRunner should not return, received an error instead: %v", err) - case <-time.After(testTimeout): - // Assert cancellation signal and expect the TestRunner to return within - // testShutdownTimeout - cancel() - select { - case err = <-errCh: - // The test timed out, which is an error from the perspective of the JobManager - require.Error(t, err) - if _, ok := err.(*cerrors.ErrTestStepsNeverReturned); !ok { - errString := fmt.Sprintf("Error returned by TestRunner should be of type ErrTestStepsNeverReturned: %v", err) - assert.FailNow(t, errString) - } - case <-time.After(testShutdownTimeout): - assert.FailNow(t, "TestRunner should return after cancellation before timeout") - } - } -} - -func TestStepClosesChannels(t *testing.T) { - - jobID := types.JobID(1) - runID := types.RunID(1) - - ts1, err := pluginRegistry.NewTestStep("Channels") - require.NoError(t, err) - ts2, err := pluginRegistry.NewTestStep("Example") - require.NoError(t, err) - - params := make(test.TestStepParameters) - testSteps := []test.TestStepBundle{ - test.TestStepBundle{TestStep: ts1, TestStepLabel: "StageOne", Parameters: params}, - test.TestStepBundle{TestStep: ts2, TestStepLabel: "StageTwo", Parameters: params}, - } - - stateCtx, _, _ := statectx.New() - errCh := make(chan error, 1) - - go func() { - tr := runner.NewTestRunner() - err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID) - errCh <- err - }() - - testTimeout := 2 * time.Second - select { - case err = <-errCh: - if _, ok := err.(*cerrors.ErrTestStepClosedChannels); !ok { - errString := fmt.Sprintf("Error returned by TestRunner should be of type ErrTestStepClosedChannels, got %T(%v)", err, err) - assert.FailNow(t, errString) - } - case <-time.After(testTimeout): - assert.FailNow(t, "TestRunner should not time out") - } -} - func TestCmdPlugin(t *testing.T) { jobID := types.JobID(1) @@ -340,7 +164,7 @@ func TestCmdPlugin(t *testing.T) { go func() { tr := runner.NewTestRunner() - err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID) + _, err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID, nil) errCh <- err }() @@ -355,37 +179,3 @@ func TestCmdPlugin(t *testing.T) { t.Errorf("test should return within timeout: %+v", successTimeout) } } - -func TestNoRunTestStepIfNoTargets(t *testing.T) { - - jobID := types.JobID(1) - runID := types.RunID(1) - - ts1, err := pluginRegistry.NewTestStep("Fail") - require.NoError(t, err) - ts2, err := pluginRegistry.NewTestStep("Crash") - require.NoError(t, err) - - params := make(test.TestStepParameters) - testSteps := []test.TestStepBundle{ - {TestStep: ts1, TestStepLabel: "StageOne", Parameters: params}, - {TestStep: ts2, TestStepLabel: "StageTwo", Parameters: params}, - } - - stateCtx, _, _ := statectx.New() - errCh := make(chan error, 1) - - go func() { - tr := runner.NewTestRunner() - err := tr.Run(stateCtx, &test.Test{TestStepsBundles: testSteps}, targets, jobID, runID) - errCh <- err - }() - - testTimeout := 2 * time.Second - select { - case err = <-errCh: - assert.Nil(t, err, "the Crash TestStep shouldn't be ran") - case <-time.After(testTimeout): - assert.FailNow(t, "TestRunner should not time out") - } -} diff --git a/tests/plugins/teststeps/badtargets/badtargets.go b/tests/plugins/teststeps/badtargets/badtargets.go new file mode 100644 index 00000000..72413ccc --- /dev/null +++ b/tests/plugins/teststeps/badtargets/badtargets.go @@ -0,0 +1,79 @@ +// Copyright (c) Facebook, Inc. and its affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +package badtargets + +import ( + "github.com/facebookincubator/contest/pkg/cerrors" + "github.com/facebookincubator/contest/pkg/event" + "github.com/facebookincubator/contest/pkg/event/testevent" + "github.com/facebookincubator/contest/pkg/statectx" + "github.com/facebookincubator/contest/pkg/test" +) + +// Name is the name used to look this plugin up. +const Name = "BadTargets" + +// Events defines the events that a TestStep is allow to emit +var Events = []event.Name{} + +type badTargets struct { +} + +// Name returns the name of the Step +func (ts *badTargets) Name() string { + return Name +} + +// Run executes a step that messes up the flow of targets. +func (ts *badTargets) Run(ctx statectx.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter) error { + for { + select { + case target, ok := <-ch.In: + if !ok { + return nil + } + switch target.ID { + case "TDrop": + // ... crickets ... + case "TGood": + // We should not depend on pointer matching, so emit a copy. + target2 := *target + ch.Out <- &target2 + case "TDup": + ch.Out <- target + ch.Out <- target + default: + // Mangle the returned target name. + target2 := *target + target2.ID = target2.ID + "XXX" + ch.Out <- &target2 + } + case <-ctx.Done(): + return nil + } + } +} + +// ValidateParameters validates the parameters associated to the TestStep +func (ts *badTargets) ValidateParameters(params test.TestStepParameters) error { + return nil +} + +// Resume tries to resume a previously interrupted test step. ExampleTestStep +// cannot resume. +func (ts *badTargets) Resume(ctx statectx.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.EmitterFetcher) error { + return &cerrors.ErrResumeNotSupported{StepName: Name} +} + +// CanResume tells whether this step is able to resume. +func (ts *badTargets) CanResume() bool { + return false +} + +// New creates a new badTargets step +func New() test.TestStep { + return &badTargets{} +} diff --git a/tests/plugins/teststeps/channels/channels.go b/tests/plugins/teststeps/channels/channels.go index 0d2702e8..a86ad8cf 100644 --- a/tests/plugins/teststeps/channels/channels.go +++ b/tests/plugins/teststeps/channels/channels.go @@ -16,7 +16,7 @@ import ( // Name is the name used to look this plugin up. var Name = "Channels" -// Events defines the events that a TestStep is allow to emit +// Events defines the events that a TestStep is allowed to emit var Events = []event.Name{} type channels struct { @@ -27,12 +27,14 @@ func (ts *channels) Name() string { return Name } -// Run executes a step which does never return.s +// Run executes a step that runs fine but closes its output channels on exit. func (ts *channels) Run(ctx statectx.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter) error { for target := range ch.In { ch.Out <- target } + // This is bad, do not do this. close(ch.Out) + close(ch.Err) return nil } diff --git a/tests/plugins/teststeps/hanging/hanging.go b/tests/plugins/teststeps/hanging/hanging.go index 86503d32..583ea7c0 100644 --- a/tests/plugins/teststeps/hanging/hanging.go +++ b/tests/plugins/teststeps/hanging/hanging.go @@ -27,7 +27,7 @@ func (ts *hanging) Name() string { return Name } -// Run executes a step which does never return. +// Run executes a step that does not process any targets and never returns. func (ts *hanging) Run(ctx statectx.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter) error { channel := make(chan struct{}) <-channel diff --git a/tests/plugins/teststeps/noreturn/noreturn.go b/tests/plugins/teststeps/noreturn/noreturn.go index 9d4a5aac..645847e0 100644 --- a/tests/plugins/teststeps/noreturn/noreturn.go +++ b/tests/plugins/teststeps/noreturn/noreturn.go @@ -27,7 +27,7 @@ func (ts *noreturnStep) Name() string { return Name } -// Run executes a step which does never return. +// Run executes a step that never returns. func (ts *noreturnStep) Run(ctx statectx.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter) error { for target := range ch.In { ch.Out <- target