Skip to content

Commit

Permalink
New TestRunner implementation
Browse files Browse the repository at this point in the history
With inter-step state serialization

This is take 2, incorporating a fix for facebookarchive#228 and comments from facebookarchive#219
  • Loading branch information
rojer9-fb authored and xaionaro committed Mar 13, 2021
1 parent 499c2c9 commit 5e608d3
Show file tree
Hide file tree
Showing 40 changed files with 1,997 additions and 2,050 deletions.
2 changes: 1 addition & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type API struct {

// New returns an initialized instance of an API struct with the specified
// server ID generation function.
func New(ctx xcontext.Context, opts ...Option) (*API, error) {
func New(opts ...Option) (*API, error) {
cfg := getConfig(opts...)
serverID, err := obtainServerID(cfg.ServerIDFunc)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var ctx = logrusctx.NewContext(logger.LevelDebug)
func TestOptions(t *testing.T) {
eventTimeout := 3141592654 * time.Second
serverID := "myUnitTestServerID"
api, err := New(ctx,
api, err := New(
OptionEventTimeout(eventTimeout),
OptionServerID(serverID),
)
Expand All @@ -40,7 +40,7 @@ func (d dummyEventMsg) Requestor() EventRequestor {

func TestEventTimeout(t *testing.T) {
t.Run("timeout", func(t *testing.T) {
apiInstance, err := New(ctx, OptionServerID("unit-test"), OptionEventTimeout(time.Nanosecond))
apiInstance, err := New(OptionServerID("unit-test"), OptionEventTimeout(time.Nanosecond))
require.NoError(t, err)
t.Run("Status", func(t *testing.T) {
startTime := time.Now()
Expand All @@ -63,7 +63,7 @@ func TestEventTimeout(t *testing.T) {
})

t.Run("noTimeout", func(t *testing.T) {
apiInstance, err := New(ctx, OptionServerID("unit-test"))
apiInstance, err := New(OptionServerID("unit-test"))
require.NoError(t, err)

respExpected := &EventResponse{
Expand Down
57 changes: 57 additions & 0 deletions pkg/cerrors/cerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ func (e *ErrTestStepsNeverReturned) Error() string {
return fmt.Sprintf("test step [%s] did not return", strings.Join(e.StepNames, ", "))
}

// ErrTestTargetInjectionTimedOut indicates that test step did not ingest a target
// within allotted time.
type ErrTestTargetInjectionTimedOut struct {
StepName string
}

// Error returns the error string associated with the error
func (e *ErrTestTargetInjectionTimedOut) Error() string {
return fmt.Sprintf("test step %v failed to ingest a target", e.StepName)
}

// ErrTestStepClosedChannels indicates that the test step returned after
// closing its output channels, which constitutes an API violation
type ErrTestStepClosedChannels struct {
Expand All @@ -53,3 +64,49 @@ type ErrTestStepClosedChannels struct {
func (e *ErrTestStepClosedChannels) Error() string {
return fmt.Sprintf("test step %v closed output channels (api violation)", e.StepName)
}

// ErrTestStepPaniced indicates that a test step's method panicked.
type ErrTestStepPaniced struct {
StepName string
StackTrace string
}

// Error returns the error string associated with the error
func (e *ErrTestStepPaniced) Error() string {
return fmt.Sprintf("test step %s paniced, trace: %q", e.StepName, e.StackTrace)
}

// ErrTestStepReturnedDuplicateResult indicates that a test step returned result
// twice for the same target.
type ErrTestStepReturnedDuplicateResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedDuplicateResult) Error() string {
return fmt.Sprintf("test step %s returned duplicate result for %s", e.StepName, e.Target)
}

// ErrTestStepReturnedUnexpectedResult indicates that a test step returned result
// for a target that was not given to it.
type ErrTestStepReturnedUnexpectedResult struct {
StepName string
Target string
}

// Error returns the error string associated with the error
func (e *ErrTestStepReturnedUnexpectedResult) Error() string {
return fmt.Sprintf("test step %s returned unexpected result for %s", e.StepName, e.Target)
}

// ErrTestStepLostTargets indicates that targets have been lost during test run.
type ErrTestStepLostTargets struct {
StepName string
Targets []string
}

// Error returns the error string associated with the error
func (e *ErrTestStepLostTargets) Error() string {
return fmt.Sprintf("test step %s lost targets %v", e.StepName, e.Targets)
}
16 changes: 14 additions & 2 deletions pkg/event/testevent/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/facebookincubator/contest/pkg/xcontext"
)

// Header models the header of a test event, which consists in metadatat hat defines the
// Header models the header of a test event, which consists in metadata that defines the
// emitter of the events. The Header is under ConTest control and cannot be manipulated
// by the TestStep
type Header struct {
Expand All @@ -29,8 +29,8 @@ type Header struct {

// Data models the data of a test event. It is populated by the TestStep
type Data struct {
EventName event.Name
Target *target.Target
EventName event.Name
Payload *json.RawMessage
}

Expand Down Expand Up @@ -153,3 +153,15 @@ type EmitterFetcher interface {
Emitter
Fetcher
}

func (h *Header) String() string {
return fmt.Sprintf("[%d %d %s %s]", h.JobID, h.RunID, h.TestName, h.TestStepLabel)
}

func (d *Data) String() string {
ps := ""
if d.Payload != nil {
ps = fmt.Sprintf(" %q", d.Payload) //nolint SA5009 - works fine
}
return fmt.Sprintf("[%s %s%s]", d.Target, d.EventName, ps)
}
3 changes: 1 addition & 2 deletions pkg/jobmanager/bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ func newBundlesFromSteps(ctx xcontext.Context, descriptors []*test.TestStepDescr
if err != nil {
return nil, err
}
// test step index is incremented by 1 so we can use 0 to signal an anomaly
tsb, err := registry.NewTestStepBundle(ctx, *descriptor, uint(idx)+1, tse)
tsb, err := registry.NewTestStepBundle(ctx, *descriptor, tse)
if err != nil {
return nil, fmt.Errorf("NewStepBundle for test step '%s' with index %d failed: %w", descriptor.Name, idx, err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/jobmanager/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,15 @@ func (jm *JobManager) handleEvent(ev *api.Event) {
// signals, propagating the signals downwards to all jobs.
func (jm *JobManager) Start(ctx xcontext.Context, sigs chan os.Signal) error {
log := ctx.Logger()
log.Debugf("Starting JobManager")

a, err := api.New(ctx, jm.config.apiOptions...)
a, err := api.New(jm.config.apiOptions...)
if err != nil {
return fmt.Errorf("Cannot start JobManager: %w", err)
}

apiCtx, apiCancel := xcontext.WithCancel(ctx)
apiCtx, apiPause := xcontext.WithNotify(apiCtx, xcontext.Paused)
errCh := make(chan error, 1)
go func() {
lErr := jm.apiListener.Serve(apiCtx, a)
Expand Down Expand Up @@ -176,7 +178,7 @@ loop:
apiCancel()
} else {
log.Debugf("Interrupted by signal '%s': pause jobs and exit", sig)
apiCancel()
apiPause()
jm.PauseJobs(ctx)
}
select {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pluginregistry/bundles.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

// NewTestStepBundle creates a TestStepBundle from a TestStepDescriptor
func (r *PluginRegistry) NewTestStepBundle(ctx xcontext.Context, testStepDescriptor test.TestStepDescriptor, stepIndex uint, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
func (r *PluginRegistry) NewTestStepBundle(ctx xcontext.Context, testStepDescriptor test.TestStepDescriptor, allowedEvents map[event.Name]bool) (*test.TestStepBundle, error) {
testStep, err := r.NewTestStep(testStepDescriptor.Name)
if err != nil {
return nil, fmt.Errorf("could not get the desired TestStep (%s): %v", testStepDescriptor.Name, err)
Expand Down
12 changes: 9 additions & 3 deletions pkg/runner/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
targets = <-targetsCh
if err != nil {
err = fmt.Errorf("run #%d: cannot fetch targets for test '%s': %v", run+1, t.Name, err)
j.StateCtx.Logger().Errorf("%v", err.Error())
j.StateCtx.Logger().Errorf(err.Error())
return nil, nil, err
}
// Associate the targets with the job for later retrievel
Expand Down Expand Up @@ -181,7 +181,13 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
if runErr = jr.emitAcquiredTargets(j.StateCtx, testEventEmitter, targets); runErr == nil {
j.StateCtx.Logger().Infof("Run #%d: running test #%d for job '%s' (job ID: %d) on %d targets", run+1, idx, j.Name, j.ID, len(targets))
testRunner := NewTestRunner()
runErr = testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1))
resumeState, err := testRunner.Run(j.StateCtx, t, targets, j.ID, types.RunID(run+1), nil)
if err == xcontext.Paused {
j.StateCtx.Logger().Debugf("Runner paused, state: %s", string(resumeState))
// TODO(rojer): Persist the state.
} else {
runErr = err
}
}

// Job is done, release all the targets
Expand Down Expand Up @@ -232,7 +238,7 @@ func (jr *JobRunner) Run(j *job.Job) ([][]*job.Report, []*job.Report, error) {
j.StateCtx.Logger().Warnf("Run reporter failed while calculating run results, proceeding anyway: %v", err)
} else {
if success {
j.StateCtx.Logger().Warnf("Run #%d of job %d considered successful according to %s", run+1, j.ID, bundle.Reporter.Name())
j.StateCtx.Logger().Debugf("Run #%d of job %d considered successful according to %s", run+1, j.ID, bundle.Reporter.Name())
} else {
j.StateCtx.Logger().Errorf("Run #%d of job %d considered failed according to %s", run+1, j.ID, bundle.Reporter.Name())
}
Expand Down
Loading

0 comments on commit 5e608d3

Please sign in to comment.