Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

Commit

Permalink
make sure correct events are emitted
Browse files Browse the repository at this point in the history
  • Loading branch information
rojer9-fb committed Jan 18, 2021
1 parent f40a7c8 commit 5c93032
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 158 deletions.
76 changes: 49 additions & 27 deletions pkg/runner/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/facebookincubator/contest/pkg/cerrors"
"github.com/facebookincubator/contest/pkg/config"
"github.com/facebookincubator/contest/pkg/event"
"github.com/facebookincubator/contest/pkg/event/testevent"
"github.com/facebookincubator/contest/pkg/logging"
"github.com/facebookincubator/contest/pkg/statectx"
Expand Down Expand Up @@ -329,32 +330,41 @@ func (tr *testRunner) waitStepRunners(ctx statectx.Context) error {
tr.cond.Wait()
}
}()
var err error
select {
case <-swch:
tr.log.Debugf("step runners finished")
tr.mu.Lock()
defer tr.mu.Unlock()
return tr.checkStepRunners()
err = tr.checkStepRunners()
case <-time.After(tr.shutdownTimeout):
tr.log.Errorf("step runners failed to shut down correctly")
tr.mu.Lock()
defer tr.mu.Unlock()
// If there is a step with an error set, use that.
err := tr.checkStepRunners()
err = tr.checkStepRunners()
// If there isn't, enumerate ones that were still running at the time.
nrerr := &cerrors.ErrTestStepsNeverReturned{}
if err == nil {
err = nrerr
}
tr.mu.Lock()
defer tr.mu.Unlock()
for _, ss := range tr.steps {
if ss.stepRunning {
nrerr.StepNames = append(nrerr.StepNames, ss.sb.TestStepLabel)
// We cannot make the step itself return but we can at least release the reader.
tr.safeCloseOutCh(ss)
}
}
return err
}
// Emit step error events.
for _, ss := range tr.steps {
if ss.runErr != nil && ss.runErr != statectx.ErrPaused && ss.runErr != statectx.ErrCanceled {
if err := ss.emitEvent(EventTestError, nil, ss.runErr.Error()); err != nil {
tr.log.Errorf("failed to emit event: %s", err)
}
}
}
return err
}

// targetRunner runs one target through all the steps of the pipeline.
Expand Down Expand Up @@ -393,16 +403,28 @@ loop:
select {
case ss.inCh <- ts.tgt:
// Injected successfully.
err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetIn, Target: ts.tgt})
tr.mu.Lock()
ts.CurPhase = targetStepPhaseRun
ss.numInjected++
if err != nil {
ss.runErr = fmt.Errorf("failed to report target injection: %w", err)
ss.log.Errorf("%s", ss.runErr)
}
tr.mu.Unlock()
tr.cond.Signal()
if err != nil {
break loop
}
case <-time.After(tr.stepInjectTimeout):
tr.mu.Lock()
ss.log.Errorf("timed out while injecting a target")
ss.runErr = &cerrors.ErrTestTargetInjectionTimedOut{StepName: ss.sb.TestStepLabel}
tr.mu.Unlock()
err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetInErr, Target: ts.tgt})
if err != nil {
ss.log.Errorf("failed to emit event: %s", err)
}
break loop
case <-ctx.Done():
log.Debugf("%s: canceled 1", ts)
Expand All @@ -416,6 +438,15 @@ loop:
break loop
}
log.Debugf("%s: result for %s recd", ts, ss)
var err error
if res == nil {
err = ss.emitEvent(target.EventTargetOut, ts.tgt, nil)
} else {
err = ss.emitEvent(target.EventTargetErr, ts.tgt, target.ErrPayload{Error: res.Error()})
}
if err != nil {
ss.log.Errorf("failed to emit event: %s", err)
}
tr.mu.Lock()
ts.CurPhase = targetStepPhaseEnd
ts.res = res
Expand Down Expand Up @@ -464,20 +495,21 @@ func (tr *testRunner) runStepIfNeeded(ctx statectx.Context, ss *stepState) {
go tr.stepReader(ctx, ss)
}

// emitStepEvent emits an error event if step resulted in an error.
func (ss *stepState) emitStepEvent(tgt *target.Target, err error) error {
if err == nil {
return nil
}
payload, jmErr := json.Marshal(err.Error())
if jmErr != nil {
return fmt.Errorf("failed to marshal event: %w", err)
// emitEvent emits the specified event with the specified JSON payload (if any).
func (ss *stepState) emitEvent(name event.Name, tgt *target.Target, payload interface{}) error {
var payloadJSON *json.RawMessage
if payload != nil {
payloadBytes, jmErr := json.Marshal(payload)
if jmErr != nil {
return fmt.Errorf("failed to marshal event: %w", jmErr)
}
pj := json.RawMessage(payloadBytes)
payloadJSON = &pj
}
rm := json.RawMessage(payload)
errEv := testevent.Data{
EventName: EventTestError,
EventName: name,
Target: tgt,
Payload: &rm,
Payload: payloadJSON,
}
return ss.ev.Emit(errEv)
}
Expand All @@ -491,19 +523,14 @@ func (tr *testRunner) stepRunner(ctx statectx.Context, ss *stepState) {
ss.stepRunning = false
ss.runErr = &cerrors.ErrTestStepPaniced{
StepName: ss.sb.TestStepLabel,
StackTrace: string(debug.Stack()),
StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()),
}
tr.mu.Unlock()
tr.safeCloseOutCh(ss)
}
}()
chans := test.TestStepChannels{In: ss.inCh, Out: ss.outCh, Err: ss.errCh}
runErr := ss.sb.TestStep.Run(ctx, chans, ss.sb.Parameters, ss.ev)
if err := ss.emitStepEvent(nil, runErr); err != nil {
if ss.runErr == nil {
ss.runErr = err
}
}
tr.mu.Lock()
ss.stepRunning = false
if runErr != nil {
Expand Down Expand Up @@ -548,11 +575,6 @@ func (tr *testRunner) reportTargetResult(ctx statectx.Context, ss *stepState, tg
if err != nil {
return err
}
if res != nil {
if err := ss.emitStepEvent(tgt, res); err != nil {
return err
}
}
select {
case resCh <- res:
break
Expand Down
Loading

0 comments on commit 5c93032

Please sign in to comment.