Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

Commit

Permalink
New test runner implementation
Browse files Browse the repository at this point in the history
Simplified, added support for serializing state and resuming.
Not used yet but will be.
  • Loading branch information
rojer9-fb committed Dec 18, 2020
1 parent a316f42 commit c5ca675
Show file tree
Hide file tree
Showing 25 changed files with 1,517 additions and 1,976 deletions.
55 changes: 55 additions & 0 deletions pkg/cerrors/cerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func (e *ErrTestStepsNeverReturned) Error() string {
return fmt.Sprintf("test step [%s] did not return", strings.Join(e.StepNames, ", "))
}

// ErrTestTargetInjectionTimedOut indicates that test step did not ingest a target
// within allotted time.
type ErrTestTargetInjectionTimedOut struct {
StepName string
}

// Error returns the error string associated with the error
func (e *ErrTestTargetInjectionTimedOut) Error() string {
return fmt.Sprintf("test step %v failed to ingest a target", e.StepName)
}

// ErrTestStepClosedChannels indicates that the test step returned after
// closing its output channels, which constitutes an API violation
type ErrTestStepClosedChannels struct {
Expand All @@ -53,3 +64,47 @@ type ErrTestStepClosedChannels struct {
func (e *ErrTestStepClosedChannels) Error() string {
return fmt.Sprintf("test step %v closed output channels (api violation)", e.StepName)
}

// ErrTestStepPaniced indicates that a test step's method paniced.
type ErrTestStepPaniced struct {
StepName string
StackTrace string
}

// Error returns the error string associated with the error
func (e *ErrTestStepPaniced) Error() string {
return fmt.Sprintf("test step %s paniced, trace: %q", e.StepName, e.StackTrace)
}

// ErrTestStepReturnedDuplicateResult indicates that a test step's method paniced.
type ErrTestStepReturnedDuplicateResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedDuplicateResult) Error() string {
return fmt.Sprintf("test step %s returned duplicate result for %s", e.StepName, e.Target)
}

// ErrTestStepReturnedUnexpectedResult indicates that a test step's method paniced.
type ErrTestStepReturnedUnexpectedResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedUnexpectedResult) Error() string {
return fmt.Sprintf("test step %s returned unexpected result for %s", e.StepName, e.Target)
}

// ErrTestStepLostTargets indicates that targets have been lost during test run.
type ErrTestStepLostTargets struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepLostTargets) Error() string {
return fmt.Sprintf("test step %s lost target %s", e.StepName, e.Target)
}
29 changes: 4 additions & 25 deletions pkg/config/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,13 @@ import "time"
var TargetManagerTimeout = 5 * time.Minute

// StepInjectTimeout represents the maximum time that TestRunner will wait for
// the first TestStep of the pipeline to accept a Target
// a TestStep to accept a Target
var StepInjectTimeout = 30 * time.Second

// TestRunnerMsgTimeout represents the maximum time that any component of the
// TestRunner will wait for the delivery of a message to any other subsystem
// of the TestRunner
var TestRunnerMsgTimeout = 5 * time.Second

// TestRunnerShutdownTimeout represents the maximum time that the TestRunner will
// wait for all the TestStep to complete after a cancellation signal has been
// delivered

// TestRunnerShutdownTimeout controls a block of the TestRunner which works as a
// watchdog, i.e. if there are multiple steps that need to return, the timeout is
// reset every time a step returns. The timeout should be handled so that it
// doesn't reset when a TestStep returns.
var TestRunnerShutdownTimeout = 30 * time.Second

// TestRunnerStepShutdownTimeout represents the maximum time that the TestRunner
// TestRunnerShutdownTimeout represents the maximum time that the TestRunner
// will wait for all TestSteps to complete after all Targets have reached the end
// of the pipeline. This timeout is only relevant if a cancellation signal is *not*
// delivered.

// TestRunnerStepShutdownTimeout controls a block of the TestRunner which worksas
// a watchdog, i.e. if there are multiple steps that need to return, the timeout
// is reset every time a step returns. The timeout should be handled so that it
// doesn't reset when a TestStep returns.
var TestRunnerStepShutdownTimeout = 5 * time.Second
// of the pipeline.
var TestRunnerShutdownTimeout = 30 * time.Second

// LockRefreshTimeout is the amount of time by which a target lock is extended
// periodically while a job is running.
Expand Down
16 changes: 14 additions & 2 deletions pkg/event/testevent/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/facebookincubator/contest/pkg/types"
)

// Header models the header of a test event, which consists in metadatat hat defines the
// Header models the header of a test event, which consists in metadata that defines the
// emitter of the events. The Header is under ConTest control and cannot be manipulated
// by the TestStep
type Header struct {
Expand All @@ -28,8 +28,8 @@ type Header struct {

// Data models the data of a test event. It is populated by the TestStep
type Data struct {
EventName event.Name
Target *target.Target
EventName event.Name
Payload *json.RawMessage
}

Expand Down Expand Up @@ -152,3 +152,15 @@ type EmitterFetcher interface {
Emitter
Fetcher
}

func (h *Header) String() string {
return fmt.Sprintf("[%d %d %s %s]", h.JobID, h.RunID, h.TestName, h.TestStepLabel)
}

func (d *Data) String() string {
ps := ""
if d.Payload != nil {
ps = fmt.Sprintf(" %q", d.Payload) //nolint SA5009 - works fine
}
return fmt.Sprintf("[%s %s%s]", d.Target, d.EventName, ps)
}
2 changes: 1 addition & 1 deletion pkg/jobmanager/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func newPartialJobFromDescriptor(pr *pluginregistry.PluginRegistry, jd *job.JobD
}
// test step index is incremented by 1 so we can use 0 to signal an
// anomaly.
tsb, err := pr.NewTestStepBundle(*testStepDesc, uint(idx)+1, tse)
tsb, err := pr.NewTestStepBundle(*testStepDesc, tse)
if err != nil {
return nil, fmt.Errorf("NewTestStepBundle for test step '%s' with index %d failed: %w", testStepDesc.Name, idx, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pluginregistry/bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

// NewTestStepBundle creates a TestStepBundle from a TestStepDescriptor
func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, stepIndex uint, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
testStep, err := r.NewTestStep(testStepDescriptor.Name)
if err != nil {
return nil, fmt.Errorf("could not get the desired TestStep (%s): %v", testStepDescriptor.Name, err)
Expand Down
13 changes: 10 additions & 3 deletions pkg/runner/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/facebookincubator/contest/pkg/event/testevent"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/logging"
"github.com/facebookincubator/contest/pkg/statectx"
"github.com/facebookincubator/contest/pkg/storage"
"github.com/facebookincubator/contest/pkg/target"
"github.com/facebookincubator/contest/pkg/types"
Expand Down Expand Up @@ -157,9 +158,8 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
if err := tl.Unlock(j.ID, targets); err != nil {
jobLog.Warningf("Failed to unlock targets (%v) for job ID %d: %v", targets, j.ID, err)
}
case <-j.StateCtx.Paused():
jobLog.Debugf("Received pause request, NOT releasing targets so the job can be resumed")
return
// Ignore the pause signal, continue to refresh targets.
case <-done:
if err := tl.Unlock(j.ID, targets); err != nil {
jobLog.Warningf("Failed to unlock %d target(s) (%v): %v", len(targets), targets, err)
Expand All @@ -183,7 +183,14 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
if runErr = jr.emitAcquiredTargets(testEventEmitter, targets); runErr == nil {
jobLog.Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets", run+1, idx, j.Name, j.ID, len(targets))
testRunner := NewTestRunner()
runErr = testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1))
var resumeState []byte
resumeState, err := testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1), resumeState)
if err == statectx.ErrPaused {
jobLog.Debugf("Runner paused, state: %s", string(resumeState))
// TODO(rojer): Persist the state.
} else {
runErr = err
}
}

// Job is done, release all the targets
Expand Down
Loading

0 comments on commit c5ca675

Please sign in to comment.