Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

Commit

Permalink
New TestRunner implementation
Browse files Browse the repository at this point in the history
With inter-step state serialization

This is take 2, incorporating a fix for #228 and comments from #219
  • Loading branch information
rojer9-fb committed Mar 10, 2021
1 parent 44f52d3 commit 955be3d
Show file tree
Hide file tree
Showing 32 changed files with 1,907 additions and 2,015 deletions.
57 changes: 57 additions & 0 deletions pkg/cerrors/cerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func (e *ErrTestStepsNeverReturned) Error() string {
return fmt.Sprintf("test step [%s] did not return", strings.Join(e.StepNames, ", "))
}

// ErrTestTargetInjectionTimedOut indicates that test step did not ingest a target
// within allotted time.
type ErrTestTargetInjectionTimedOut struct {
StepName string
}

// Error returns the error string associated with the error
func (e *ErrTestTargetInjectionTimedOut) Error() string {
return fmt.Sprintf("test step %v failed to ingest a target", e.StepName)
}

// ErrTestStepClosedChannels indicates that the test step returned after
// closing its output channels, which constitutes an API violation
type ErrTestStepClosedChannels struct {
Expand All @@ -53,3 +64,49 @@ type ErrTestStepClosedChannels struct {
func (e *ErrTestStepClosedChannels) Error() string {
return fmt.Sprintf("test step %v closed output channels (api violation)", e.StepName)
}

// ErrTestStepPaniced indicates that a test step's method panicked.
type ErrTestStepPaniced struct {
StepName string
StackTrace string
}

// Error returns the error string associated with the error
func (e *ErrTestStepPaniced) Error() string {
return fmt.Sprintf("test step %s paniced, trace: %q", e.StepName, e.StackTrace)
}

// ErrTestStepReturnedDuplicateResult indicates that a test step returned result
// twice for the same target.
type ErrTestStepReturnedDuplicateResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedDuplicateResult) Error() string {
return fmt.Sprintf("test step %s returned duplicate result for %s", e.StepName, e.Target)
}

// ErrTestStepReturnedUnexpectedResult indicates that a test step returned result
// for a target that was not given to it.
type ErrTestStepReturnedUnexpectedResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedUnexpectedResult) Error() string {
return fmt.Sprintf("test step %s returned unexpected result for %s", e.StepName, e.Target)
}

// ErrTestStepLostTargets indicates that targets have been lost during test run.
type ErrTestStepLostTargets struct {
StepName string
Targets []string
}

// Error returns the error string associated with the error
func (e *ErrTestStepLostTargets) Error() string {
return fmt.Sprintf("test step %s lost targets %v", e.StepName, e.Targets)
}
16 changes: 14 additions & 2 deletions pkg/event/testevent/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/facebookincubator/contest/pkg/types"
)

// Header models the header of a test event, which consists in metadatat hat defines the
// Header models the header of a test event, which consists in metadata that defines the
// emitter of the events. The Header is under ConTest control and cannot be manipulated
// by the TestStep
type Header struct {
Expand All @@ -28,8 +28,8 @@ type Header struct {

// Data models the data of a test event. It is populated by the TestStep
type Data struct {
EventName event.Name
Target *target.Target
EventName event.Name
Payload *json.RawMessage
}

Expand Down Expand Up @@ -152,3 +152,15 @@ type EmitterFetcher interface {
Emitter
Fetcher
}

func (h *Header) String() string {
return fmt.Sprintf("[%d %d %s %s]", h.JobID, h.RunID, h.TestName, h.TestStepLabel)
}

func (d *Data) String() string {
ps := ""
if d.Payload != nil {
ps = fmt.Sprintf(" %q", d.Payload) //nolint SA5009 - works fine
}
return fmt.Sprintf("[%s %s%s]", d.Target, d.EventName, ps)
}
3 changes: 1 addition & 2 deletions pkg/jobmanager/bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func newBundlesFromSteps(descriptors []*test.TestStepDescriptor, registry *plugi
if err != nil {
return nil, err
}
// test step index is incremented by 1 so we can use 0 to signal an anomaly
tsb, err := registry.NewTestStepBundle(*descriptor, uint(idx)+1, tse)
tsb, err := registry.NewTestStepBundle(*descriptor, tse)
if err != nil {
return nil, fmt.Errorf("NewStepBundle for test step '%s' with index %d failed: %w", descriptor.Name, idx, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pluginregistry/bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

// NewTestStepBundle creates a TestStepBundle from a TestStepDescriptor
func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, stepIndex uint, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
func (r *PluginRegistry) NewTestStepBundle(testStepDescriptor test.TestStepDescriptor, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
testStep, err := r.NewTestStep(testStepDescriptor.Name)
if err != nil {
return nil, fmt.Errorf("could not get the desired TestStep (%s): %v", testStepDescriptor.Name, err)
Expand Down
9 changes: 8 additions & 1 deletion pkg/runner/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/facebookincubator/contest/pkg/event/testevent"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/logging"
"github.com/facebookincubator/contest/pkg/statectx"
"github.com/facebookincubator/contest/pkg/storage"
"github.com/facebookincubator/contest/pkg/target"
"github.com/facebookincubator/contest/pkg/types"
Expand Down Expand Up @@ -183,7 +184,13 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
if runErr = jr.emitAcquiredTargets(testEventEmitter, targets); runErr == nil {
jobLog.Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets", run+1, idx, j.Name, j.ID, len(targets))
testRunner := NewTestRunner()
runErr = testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1))
resumeState, err := testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1), nil)
if err == statectx.ErrPaused {
jobLog.Debugf("Runner paused, state: %s", string(resumeState))
// TODO(rojer): Persist the state.
} else {
runErr = err
}
}

// Job is done, release all the targets
Expand Down
Loading

0 comments on commit 955be3d

Please sign in to comment.