Skip to content
This repository has been archived by the owner on May 24, 2022. It is now read-only.

Commit

Permalink
Rename ts -> tgs
Browse files Browse the repository at this point in the history
  • Loading branch information
rojer9-fb committed Jan 26, 2021
1 parent 7d45d6c commit d6c11a2
Showing 1 changed file with 71 additions and 71 deletions.
142 changes: 71 additions & 71 deletions pkg/runner/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,20 @@ func (tr *TestRunner) run(
// Initialize remaining fields of the target structures,
// build the map and kick off target processing.
for _, tgt := range targets {
ts := tr.targets[tgt.ID]
if ts == nil {
ts = &targetState{
tgs := tr.targets[tgt.ID]
if tgs == nil {
tgs = &targetState{
CurPhase: targetStepPhaseInit,
}
}
ts.tgt = tgt
ts.resCh = make(chan error)
tgs.tgt = tgt
tgs.resCh = make(chan error)
tr.mu.Lock()
tr.targets[tgt.ID] = ts
tr.targets[tgt.ID] = tgs
tr.mu.Unlock()
tr.targetsWg.Add(1)
go func() {
tr.targetHandler(targetCtx, stepCtx, ts)
tr.targetHandler(targetCtx, stepCtx, tgs)
tr.targetsWg.Done()
}()
}
Expand Down Expand Up @@ -230,11 +230,11 @@ func (tr *TestRunner) run(
resumeOk := (runErr == nil)
var inFlightTargets []*targetState
for i, tgt := range targets {
ts := tr.targets[tgt.ID]
stepErr := tr.steps[ts.CurStep].runErr
tr.log.Debugf(" %d %s %v", i, ts, stepErr)
if ts.CurPhase == targetStepPhaseRun {
inFlightTargets = append(inFlightTargets, ts)
tgs := tr.targets[tgt.ID]
stepErr := tr.steps[tgs.CurStep].runErr
tr.log.Debugf(" %d %s %v", i, tgs, stepErr)
if tgs.CurPhase == targetStepPhaseRun {
inFlightTargets = append(inFlightTargets, tgs)
}
if stepErr != nil && stepErr != statectx.ErrPaused {
resumeOk = false
Expand Down Expand Up @@ -280,15 +280,15 @@ func (tr *TestRunner) run(
// We are not pausing and yet some targets were left in flight.
// Use first target as a sample.
if len(inFlightTargets) > 0 {
ts0 := inFlightTargets[0]
tgs0 := inFlightTargets[0]
var lostTargets []string
for _, ts := range inFlightTargets {
if ts.CurStep == ts0.CurStep {
lostTargets = append(lostTargets, ts.tgt.ID)
for _, tgs := range inFlightTargets {
if tgs.CurStep == tgs0.CurStep {
lostTargets = append(lostTargets, tgs.tgt.ID)
}
}
runErr = &cerrors.ErrTestStepLostTargets{
StepName: tr.steps[ts0.CurStep].sb.TestStepLabel,
StepName: tr.steps[tgs0.CurStep].sb.TestStepLabel,
Targets: lostTargets,
}
}
Expand Down Expand Up @@ -360,7 +360,7 @@ func (tr *TestRunner) waitStepRunners(ctx statectx.Context) error {
}
}
}
// Emit step error events.
// Emit step error eventgs.
for _, ss := range tr.steps {
if ss.runErr != nil && ss.runErr != statectx.ErrPaused && ss.runErr != statectx.ErrCanceled {
if err := ss.emitEvent(EventTestError, nil, ss.runErr.Error()); err != nil {
Expand All @@ -371,22 +371,22 @@ func (tr *TestRunner) waitStepRunners(ctx statectx.Context) error {
return err
}

func (tr *TestRunner) injectTarget(ctx statectx.Context, ts *targetState, ss *stepState, log *logrus.Entry) error {
log.Debugf("%s: injecting into %s", ts, ss)
func (tr *TestRunner) injectTarget(ctx statectx.Context, tgs *targetState, ss *stepState, log *logrus.Entry) error {
log.Debugf("%s: injecting into %s", tgs, ss)
select {
case ss.inCh <- ts.tgt:
case ss.inCh <- tgs.tgt:
// Injected successfully.
err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetIn, Target: ts.tgt})
err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetIn, Target: tgs.tgt})
tr.mu.Lock()
defer tr.mu.Unlock()
ts.CurPhase = targetStepPhaseRun
tgs.CurPhase = targetStepPhaseRun
if err != nil {
return fmt.Errorf("failed to report target injection: %w", err)
}
tr.cond.Signal()
case <-time.After(tr.stepInjectTimeout):
ss.log.Errorf("timed out while injecting a target")
if err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetInErr, Target: ts.tgt}); err != nil {
if err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetInErr, Target: tgs.tgt}); err != nil {
ss.log.Errorf("failed to emit event: %s", err)
}
return &cerrors.ErrTestTargetInjectionTimedOut{StepName: ss.sb.TestStepLabel}
Expand All @@ -396,26 +396,26 @@ func (tr *TestRunner) injectTarget(ctx statectx.Context, ts *targetState, ss *st
return nil
}

func (tr *TestRunner) awaitTargetResult(ctx statectx.Context, ts *targetState, ss *stepState, log *logrus.Entry) error {
func (tr *TestRunner) awaitTargetResult(ctx statectx.Context, tgs *targetState, ss *stepState, log *logrus.Entry) error {
select {
case res, ok := <-ts.resCh:
case res, ok := <-tgs.resCh:
if !ok {
log.Debugf("%s: result channel closed", ts)
log.Debugf("%s: result channel closed", tgs)
return statectx.ErrCanceled
}
log.Debugf("%s: result for %s recd", ts, ss)
log.Debugf("%s: result for %s recd", tgs, ss)
var err error
if res == nil {
err = ss.emitEvent(target.EventTargetOut, ts.tgt, nil)
err = ss.emitEvent(target.EventTargetOut, tgs.tgt, nil)
} else {
err = ss.emitEvent(target.EventTargetErr, ts.tgt, target.ErrPayload{Error: res.Error()})
err = ss.emitEvent(target.EventTargetErr, tgs.tgt, target.ErrPayload{Error: res.Error()})
}
if err != nil {
ss.log.Errorf("failed to emit event: %s", err)
}
tr.mu.Lock()
ts.res = res
ts.CurPhase = targetStepPhaseEnd
tgs.res = res
tgs.CurPhase = targetStepPhaseEnd
tr.mu.Unlock()
tr.cond.Signal()
return err
Expand All @@ -425,48 +425,48 @@ func (tr *TestRunner) awaitTargetResult(ctx statectx.Context, ts *targetState, s
// and collect all the results they produce. If that doesn't happen,
// step runner will close resCh on its way out and unlock us.
case <-ctx.Done():
log.Debugf("%s: canceled 2", ts)
log.Debugf("%s: canceled 2", tgs)
return statectx.ErrCanceled
}
}

// targetHandler takes a single target through each step of the pipeline in sequence.
// It injects the target, waits for the result, then moves on to the next step.
func (tr *TestRunner) targetHandler(ctx, stepCtx statectx.Context, ts *targetState) {
log := logging.AddField(tr.log, "target", ts.tgt.ID)
log.Debugf("%s: target runner active", ts)
func (tr *TestRunner) targetHandler(ctx, stepCtx statectx.Context, tgs *targetState) {
log := logging.AddField(tr.log, "target", tgs.tgt.ID)
log.Debugf("%s: target runner active", tgs)
// NB: CurStep may be non-zero on entry if resumed
loop:
for i := ts.CurStep; i < len(tr.steps); {
for i := tgs.CurStep; i < len(tr.steps); {
// Early check for pause or cancelation.
select {
case <-ctx.Paused():
log.Debugf("%s: paused 0", ts)
log.Debugf("%s: paused 0", tgs)
break loop
case <-ctx.Done():
log.Debugf("%s: canceled 0", ts)
log.Debugf("%s: canceled 0", tgs)
break loop
default:
}
tr.mu.Lock()
ss := tr.steps[i]
if ts.CurPhase == targetStepPhaseEnd {
if tgs.CurPhase == targetStepPhaseEnd {
// This target already terminated.
// Can happen if resumed from terminal state.
tr.mu.Unlock()
break loop
}
ts.CurStep = i
ts.CurPhase = targetStepPhaseBegin
tgs.CurStep = i
tgs.CurPhase = targetStepPhaseBegin
tr.mu.Unlock()
// Make sure we have a step runner active. If not, start one.
tr.runStepIfNeeded(stepCtx, ss)
// Inject the target.
err := tr.injectTarget(ctx, ts, ss, log)
err := tr.injectTarget(ctx, tgs, ss, log)
// Await result. It will be communicated to us by the step runner
// and returned in ts.res.
// and returned in tgs.res.
if err == nil {
err = tr.awaitTargetResult(ctx, ts, ss, log)
err = tr.awaitTargetResult(ctx, tgs, ss, log)
}
if err != nil {
ss.log.Errorf("%s", err)
Expand All @@ -475,25 +475,25 @@ loop:
ss.runErr = err
tr.mu.Unlock()
} else {
log.Debugf("%s: canceled 1", ts)
log.Debugf("%s: canceled 1", tgs)
}
break
}
tr.mu.Lock()
if ts.res != nil {
if tgs.res != nil {
tr.mu.Unlock()
break
}
i++
if i < len(tr.steps) {
ts.CurStep = i
ts.CurPhase = targetStepPhaseInit
tgs.CurStep = i
tgs.CurPhase = targetStepPhaseInit
}
tr.mu.Unlock()
}
log.Debugf("%s: target runner finished", ts)
log.Debugf("%s: target runner finished", tgs)
tr.mu.Lock()
ts.resCh = nil
tgs.resCh = nil
tr.cond.Signal()
tr.mu.Unlock()
}
Expand Down Expand Up @@ -566,8 +566,8 @@ func (tr *TestRunner) reportTargetResult(ctx statectx.Context, ss *stepState, tg
resCh, err := func() (chan error, error) {
tr.mu.Lock()
defer tr.mu.Unlock()
ts := tr.targets[tgt.ID]
if ts == nil {
tgs := tr.targets[tgt.ID]
if tgs == nil {
return nil, fmt.Errorf("%s: result for nonexistent target %s %v", ss, tgt, res)
}
if ss.tgtDone[tgt] {
Expand All @@ -578,18 +578,18 @@ func (tr *TestRunner) reportTargetResult(ctx statectx.Context, ss *stepState, tg
}
ss.tgtDone[tgt] = true
// Begin is also allowed here because it may happen that we get a result before target runner updates phase.
if ts.CurStep != ss.stepIndex || (ts.CurPhase != targetStepPhaseBegin && ts.CurPhase != targetStepPhaseRun) {
if tgs.CurStep != ss.stepIndex || (tgs.CurPhase != targetStepPhaseBegin && tgs.CurPhase != targetStepPhaseRun) {
return nil, &cerrors.ErrTestStepReturnedUnexpectedResult{
StepName: ss.sb.TestStepLabel,
Target: tgt.ID,
}
}
if ts.resCh == nil {
if tgs.resCh == nil {
// This should not happen, must be an internal error.
return nil, fmt.Errorf("%s: target runner %s is not there, dropping result on the floor", ss, ts)
return nil, fmt.Errorf("%s: target runner %s is not there, dropping result on the floor", ss, tgs)
}
ss.log.Debugf("%s: result for %s: %v", ss, ts, res)
return ts.resCh, nil
ss.log.Debugf("%s: result for %s: %v", ss, tgs, res)
return tgs.resCh, nil
}()
if err != nil {
return err
Expand Down Expand Up @@ -716,9 +716,9 @@ func (tr *TestRunner) runMonitor() error {
// First, compute the starting step of the pipeline (it may be non-zero
// if the pipeline was resumed).
minStep := len(tr.steps)
for _, ts := range tr.targets {
if ts.CurStep < minStep {
minStep = ts.CurStep
for _, tgs := range tr.targets {
if tgs.CurStep < minStep {
minStep = tgs.CurStep
}
}
if minStep < len(tr.steps) {
Expand All @@ -734,13 +734,13 @@ loop:
tr.log.Debugf("monitor pass %d: current step %s", pass, ss)
// Check if all the targets have either made it past the injection phase or terminated.
ok := true
for _, ts := range tr.targets {
tr.log.Debugf("monitor pass %d: %s: %s", pass, ss, ts)
if ts.resCh == nil { // Not running anymore
for _, tgs := range tr.targets {
tr.log.Debugf("monitor pass %d: %s: %s", pass, ss, tgs)
if tgs.resCh == nil { // Not running anymore
continue
}
if ts.CurStep < step || ts.CurPhase < targetStepPhaseRun {
tr.log.Debugf("monitor pass %d: %s: not all targets injected yet (%s)", pass, ss, ts)
if tgs.CurStep < step || tgs.CurPhase < targetStepPhaseRun {
tr.log.Debugf("monitor pass %d: %s: not all targets injected yet (%s)", pass, ss, tgs)
ok = false
break
}
Expand Down Expand Up @@ -795,18 +795,18 @@ func (ss *stepState) String() string {
return fmt.Sprintf("[#%d %s]", ss.stepIndex, ss.sb.TestStepLabel)
}

func (ts *targetState) String() string {
func (tgs *targetState) String() string {
var resText string
if ts.res != nil {
resStr := fmt.Sprintf("%s", ts.res)
if tgs.res != nil {
resStr := fmt.Sprintf("%s", tgs.res)
if len(resStr) > 20 {
resStr = resStr[:20] + "..."
}
resText = fmt.Sprintf("%q", resStr)
} else {
resText = "<nil>"
}
finished := ts.resCh == nil
finished := tgs.resCh == nil
return fmt.Sprintf("[%s %d %s %t %s]",
ts.tgt, ts.CurStep, ts.CurPhase, finished, resText)
tgs.tgt, tgs.CurStep, tgs.CurPhase, finished, resText)
}

0 comments on commit d6c11a2

Please sign in to comment.