Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

Commit

Permalink
Merge pull request #208 from facebookincubator/new_runner
Browse files Browse the repository at this point in the history
New test runner implementation
  • Loading branch information
rojer authored Jan 18, 2021
2 parents 40dcae5 + 86b113d commit 11388b8
Show file tree
Hide file tree
Showing 28 changed files with 1,906 additions and 1,968 deletions.
57 changes: 57 additions & 0 deletions pkg/cerrors/cerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func (e *ErrTestStepsNeverReturned) Error() string {
return fmt.Sprintf("test step [%s] did not return", strings.Join(e.StepNames, ", "))
}

// ErrTestTargetInjectionTimedOut indicates that test step did not ingest a target
// within allotted time.
type ErrTestTargetInjectionTimedOut struct {
StepName string
}

// Error returns the error string associated with the error
func (e *ErrTestTargetInjectionTimedOut) Error() string {
return fmt.Sprintf("test step %v failed to ingest a target", e.StepName)
}

// ErrTestStepClosedChannels indicates that the test step returned after
// closing its output channels, which constitutes an API violation
type ErrTestStepClosedChannels struct {
Expand All @@ -53,3 +64,49 @@ type ErrTestStepClosedChannels struct {
func (e *ErrTestStepClosedChannels) Error() string {
return fmt.Sprintf("test step %v closed output channels (api violation)", e.StepName)
}

// ErrTestStepPaniced indicates that a test step's method panicked.
type ErrTestStepPaniced struct {
StepName string
StackTrace string
}

// Error returns the error string associated with the error
func (e *ErrTestStepPaniced) Error() string {
return fmt.Sprintf("test step %s paniced, trace: %q", e.StepName, e.StackTrace)
}

// ErrTestStepReturnedDuplicateResult indicates that a test step returned result
// twice for the same target.
type ErrTestStepReturnedDuplicateResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedDuplicateResult) Error() string {
return fmt.Sprintf("test step %s returned duplicate result for %s", e.StepName, e.Target)
}

// ErrTestStepReturnedUnexpectedResult indicates that a test step returned result
// for a target that was not given to it.
type ErrTestStepReturnedUnexpectedResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedUnexpectedResult) Error() string {
return fmt.Sprintf("test step %s returned unexpected result for %s", e.StepName, e.Target)
}

// ErrTestStepLostTargets indicates that targets have been lost during test run.
type ErrTestStepLostTargets struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepLostTargets) Error() string {
return fmt.Sprintf("test step %s lost target %s", e.StepName, e.Target)
}
29 changes: 4 additions & 25 deletions pkg/config/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,13 @@ import "time"
var TargetManagerTimeout = 5 * time.Minute

// StepInjectTimeout represents the maximum time that TestRunner will wait for
// the first TestStep of the pipeline to accept a Target
// a TestStep to accept a Target
var StepInjectTimeout = 30 * time.Second

// TestRunnerMsgTimeout represents the maximum time that any component of the
// TestRunner will wait for the delivery of a message to any other subsystem
// of the TestRunner
var TestRunnerMsgTimeout = 5 * time.Second

// TestRunnerShutdownTimeout represents the maximum time that the TestRunner will
// wait for all the TestStep to complete after a cancellation signal has been
// delivered

// TestRunnerShutdownTimeout controls a block of the TestRunner which works as a
// watchdog, i.e. if there are multiple steps that need to return, the timeout is
// reset every time a step returns. The timeout should be handled so that it
// doesn't reset when a TestStep returns.
var TestRunnerShutdownTimeout = 30 * time.Second

// TestRunnerStepShutdownTimeout represents the maximum time that the TestRunner
// TestRunnerShutdownTimeout represents the maximum time that the TestRunner
// will wait for all TestSteps to complete after all Targets have reached the end
// of the pipeline. This timeout is only relevant if a cancellation signal is *not*
// delivered.

// TestRunnerStepShutdownTimeout controls a block of the TestRunner which worksas
// a watchdog, i.e. if there are multiple steps that need to return, the timeout
// is reset every time a step returns. The timeout should be handled so that it
// doesn't reset when a TestStep returns.
var TestRunnerStepShutdownTimeout = 5 * time.Second
// of the pipeline.
var TestRunnerShutdownTimeout = 30 * time.Second

// LockRefreshTimeout is the amount of time by which a target lock is extended
// periodically while a job is running.
Expand Down
16 changes: 14 additions & 2 deletions pkg/event/testevent/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/facebookincubator/contest/pkg/types"
)

// Header models the header of a test event, which consists in metadatat hat defines the
// Header models the header of a test event, which consists in metadata that defines the
// emitter of the events. The Header is under ConTest control and cannot be manipulated
// by the TestStep
type Header struct {
Expand All @@ -28,8 +28,8 @@ type Header struct {

// Data models the data of a test event. It is populated by the TestStep
type Data struct {
EventName event.Name
Target *target.Target
EventName event.Name
Payload *json.RawMessage
}

Expand Down Expand Up @@ -152,3 +152,15 @@ type EmitterFetcher interface {
Emitter
Fetcher
}

func (h *Header) String() string {
return fmt.Sprintf("[%d %d %s %s]", h.JobID, h.RunID, h.TestName, h.TestStepLabel)
}

func (d *Data) String() string {
ps := ""
if d.Payload != nil {
ps = fmt.Sprintf(" %q", d.Payload) //nolint SA5009 - works fine
}
return fmt.Sprintf("[%s %s%s]", d.Target, d.EventName, ps)
}
4 changes: 1 addition & 3 deletions pkg/jobmanager/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ func newPartialJobFromDescriptor(pr *pluginregistry.PluginRegistry, jd *job.JobD
if err != nil {
return nil, err
}
// test step index is incremented by 1 so we can use 0 to signal an
// anomaly.
tsb, err := pr.NewTestStepBundle(*testStepDesc, uint(idx)+1, tse)
tsb, err := pr.NewTestStepBundle(*testStepDesc, tse)
if err != nil {
return nil, fmt.Errorf("NewTestStepBundle for test step '%s' with index %d failed: %w", testStepDesc.Name, idx, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pluginregistry/bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

// NewTestStepBundle creates a TestStepBundle from a TestStepDescriptor
func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, stepIndex uint, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
testStep, err := r.NewTestStep(testStepDescriptor.Name)
if err != nil {
return nil, fmt.Errorf("could not get the desired TestStep (%s): %v", testStepDescriptor.Name, err)
Expand Down
13 changes: 10 additions & 3 deletions pkg/runner/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/facebookincubator/contest/pkg/event/testevent"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/logging"
"github.com/facebookincubator/contest/pkg/statectx"
"github.com/facebookincubator/contest/pkg/storage"
"github.com/facebookincubator/contest/pkg/target"
"github.com/facebookincubator/contest/pkg/types"
Expand Down Expand Up @@ -157,9 +158,8 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
if err := tl.Unlock(j.ID, targets); err != nil {
jobLog.Warningf("Failed to unlock targets (%v) for job ID %d: %v", targets, j.ID, err)
}
case <-j.StateCtx.Paused():
jobLog.Debugf("Received pause request, NOT releasing targets so the job can be resumed")
return
// Ignore the pause signal, continue to refresh targets.
case <-done:
if err := tl.Unlock(j.ID, targets); err != nil {
jobLog.Warningf("Failed to unlock %d target(s) (%v): %v", len(targets), targets, err)
Expand All @@ -183,7 +183,14 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
if runErr = jr.emitAcquiredTargets(testEventEmitter, targets); runErr == nil {
jobLog.Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets", run+1, idx, j.Name, j.ID, len(targets))
testRunner := NewTestRunner()
runErr = testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1))
var resumeState []byte
resumeState, err := testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1), resumeState)
if err == statectx.ErrPaused {
jobLog.Errorf("Runner paused, state: %s", string(resumeState))
// TODO(rojer): Persist the state.
} else {
runErr = err
}
}

// Job is done, release all the targets
Expand Down
Loading

0 comments on commit 11388b8

Please sign in to comment.