Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

New test runner implementation #208

Merged
merged 5 commits into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions pkg/cerrors/cerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func (e *ErrTestStepsNeverReturned) Error() string {
return fmt.Sprintf("test step [%s] did not return", strings.Join(e.StepNames, ", "))
}

// ErrTestTargetInjectionTimedOut indicates that test step did not ingest a target
// within allotted time.
type ErrTestTargetInjectionTimedOut struct {
StepName string
}

// Error returns the error string associated with the error
func (e *ErrTestTargetInjectionTimedOut) Error() string {
return fmt.Sprintf("test step %v failed to ingest a target", e.StepName)
}

// ErrTestStepClosedChannels indicates that the test step returned after
// closing its output channels, which constitutes an API violation
type ErrTestStepClosedChannels struct {
Expand All @@ -53,3 +64,49 @@ type ErrTestStepClosedChannels struct {
func (e *ErrTestStepClosedChannels) Error() string {
return fmt.Sprintf("test step %v closed output channels (api violation)", e.StepName)
}

// ErrTestStepPaniced indicates that a test step's method panicked.
type ErrTestStepPaniced struct {
rojer marked this conversation as resolved.
Show resolved Hide resolved
StepName string
StackTrace string
}

// Error returns the error string associated with the error
func (e *ErrTestStepPaniced) Error() string {
return fmt.Sprintf("test step %s paniced, trace: %q", e.StepName, e.StackTrace)
}

// ErrTestStepReturnedDuplicateResult indicates that a test step returned result
// twice for the same target.
type ErrTestStepReturnedDuplicateResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedDuplicateResult) Error() string {
return fmt.Sprintf("test step %s returned duplicate result for %s", e.StepName, e.Target)
}

// ErrTestStepReturnedUnexpectedResult indicates that a test step returned result
// for a target that was not given to it.
type ErrTestStepReturnedUnexpectedResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedUnexpectedResult) Error() string {
return fmt.Sprintf("test step %s returned unexpected result for %s", e.StepName, e.Target)
}

// ErrTestStepLostTargets indicates that targets have been lost during test run.
type ErrTestStepLostTargets struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepLostTargets) Error() string {
return fmt.Sprintf("test step %s lost target %s", e.StepName, e.Target)
}
29 changes: 4 additions & 25 deletions pkg/config/timeouts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,13 @@ import "time"
var TargetManagerTimeout = 5 * time.Minute

// StepInjectTimeout represents the maximum time that TestRunner will wait for
// the first TestStep of the pipeline to accept a Target
// a TestStep to accept a Target
var StepInjectTimeout = 30 * time.Second

// TestRunnerMsgTimeout represents the maximum time that any component of the
// TestRunner will wait for the delivery of a message to any other subsystem
// of the TestRunner
var TestRunnerMsgTimeout = 5 * time.Second

// TestRunnerShutdownTimeout represents the maximum time that the TestRunner will
// wait for all the TestStep to complete after a cancellation signal has been
// delivered

// TestRunnerShutdownTimeout controls a block of the TestRunner which works as a
// watchdog, i.e. if there are multiple steps that need to return, the timeout is
// reset every time a step returns. The timeout should be handled so that it
// doesn't reset when a TestStep returns.
var TestRunnerShutdownTimeout = 30 * time.Second

// TestRunnerStepShutdownTimeout represents the maximum time that the TestRunner
// TestRunnerShutdownTimeout represents the maximum time that the TestRunner
// will wait for all TestSteps to complete after all Targets have reached the end
// of the pipeline. This timeout is only relevant if a cancellation signal is *not*
// delivered.

// TestRunnerStepShutdownTimeout controls a block of the TestRunner which worksas
// a watchdog, i.e. if there are multiple steps that need to return, the timeout
// is reset every time a step returns. The timeout should be handled so that it
// doesn't reset when a TestStep returns.
var TestRunnerStepShutdownTimeout = 5 * time.Second
// of the pipeline.
var TestRunnerShutdownTimeout = 30 * time.Second

// LockRefreshTimeout is the amount of time by which a target lock is extended
// periodically while a job is running.
Expand Down
16 changes: 14 additions & 2 deletions pkg/event/testevent/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/facebookincubator/contest/pkg/types"
)

// Header models the header of a test event, which consists in metadatat hat defines the
// Header models the header of a test event, which consists in metadata that defines the
// emitter of the events. The Header is under ConTest control and cannot be manipulated
// by the TestStep
type Header struct {
Expand All @@ -28,8 +28,8 @@ type Header struct {

// Data models the data of a test event. It is populated by the TestStep
type Data struct {
EventName event.Name
Target *target.Target
EventName event.Name
Payload *json.RawMessage
}

Expand Down Expand Up @@ -152,3 +152,15 @@ type EmitterFetcher interface {
Emitter
Fetcher
}

func (h *Header) String() string {
return fmt.Sprintf("[%d %d %s %s]", h.JobID, h.RunID, h.TestName, h.TestStepLabel)
}

func (d *Data) String() string {
ps := ""
if d.Payload != nil {
ps = fmt.Sprintf(" %q", d.Payload) //nolint SA5009 - works fine
}
return fmt.Sprintf("[%s %s%s]", d.Target, d.EventName, ps)
}
4 changes: 1 addition & 3 deletions pkg/jobmanager/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ func newPartialJobFromDescriptor(pr *pluginregistry.PluginRegistry, jd *job.JobD
if err != nil {
return nil, err
}
// test step index is incremented by 1 so we can use 0 to signal an
// anomaly.
tsb, err := pr.NewTestStepBundle(*testStepDesc, uint(idx)+1, tse)
tsb, err := pr.NewTestStepBundle(*testStepDesc, tse)
if err != nil {
return nil, fmt.Errorf("NewTestStepBundle for test step '%s' with index %d failed: %w", testStepDesc.Name, idx, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pluginregistry/bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

// NewTestStepBundle creates a TestStepBundle from a TestStepDescriptor
func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, stepIndex uint, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
testStep, err := r.NewTestStep(testStepDescriptor.Name)
if err != nil {
return nil, fmt.Errorf("could not get the desired TestStep (%s): %v", testStepDescriptor.Name, err)
Expand Down
13 changes: 10 additions & 3 deletions pkg/runner/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/facebookincubator/contest/pkg/event/testevent"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/logging"
"github.com/facebookincubator/contest/pkg/statectx"
"github.com/facebookincubator/contest/pkg/storage"
"github.com/facebookincubator/contest/pkg/target"
"github.com/facebookincubator/contest/pkg/types"
Expand Down Expand Up @@ -157,9 +158,8 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
if err := tl.Unlock(j.ID, targets); err != nil {
jobLog.Warningf("Failed to unlock targets (%v) for job ID %d: %v", targets, j.ID, err)
}
case <-j.StateCtx.Paused():
jobLog.Debugf("Received pause request, NOT releasing targets so the job can be resumed")
return
// Ignore the pause signal, continue to refresh targets.
tfg13 marked this conversation as resolved.
Show resolved Hide resolved
case <-done:
if err := tl.Unlock(j.ID, targets); err != nil {
jobLog.Warningf("Failed to unlock %d target(s) (%v): %v", len(targets), targets, err)
Expand All @@ -183,7 +183,14 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
if runErr = jr.emitAcquiredTargets(testEventEmitter, targets); runErr == nil {
jobLog.Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets", run+1, idx, j.Name, j.ID, len(targets))
testRunner := NewTestRunner()
runErr = testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1))
var resumeState []byte
resumeState, err := testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1), resumeState)
if err == statectx.ErrPaused {
jobLog.Errorf("Runner paused, state: %s", string(resumeState))
// TODO(rojer): Persist the state.
rojer marked this conversation as resolved.
Show resolved Hide resolved
} else {
runErr = err
}
}

// Job is done, release all the targets
Expand Down
Loading