From d6c11a2d30252ffa88919e71cf87b06127c3b298 Mon Sep 17 00:00:00 2001 From: "Deomid \"rojer\" Ryabkov" Date: Tue, 26 Jan 2021 12:56:45 +0000 Subject: [PATCH] Rename ts -> tgs --- pkg/runner/test_runner.go | 142 +++++++++++++++++++------------------- 1 file changed, 71 insertions(+), 71 deletions(-) diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index ecc1c362..836ab9c6 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -186,20 +186,20 @@ func (tr *TestRunner) run( // Initialize remaining fields of the target structures, // build the map and kick off target processing. for _, tgt := range targets { - ts := tr.targets[tgt.ID] - if ts == nil { - ts = &targetState{ + tgs := tr.targets[tgt.ID] + if tgs == nil { + tgs = &targetState{ CurPhase: targetStepPhaseInit, } } - ts.tgt = tgt - ts.resCh = make(chan error) + tgs.tgt = tgt + tgs.resCh = make(chan error) tr.mu.Lock() - tr.targets[tgt.ID] = ts + tr.targets[tgt.ID] = tgs tr.mu.Unlock() tr.targetsWg.Add(1) go func() { - tr.targetHandler(targetCtx, stepCtx, ts) + tr.targetHandler(targetCtx, stepCtx, tgs) tr.targetsWg.Done() }() } @@ -230,11 +230,11 @@ func (tr *TestRunner) run( resumeOk := (runErr == nil) var inFlightTargets []*targetState for i, tgt := range targets { - ts := tr.targets[tgt.ID] - stepErr := tr.steps[ts.CurStep].runErr - tr.log.Debugf(" %d %s %v", i, ts, stepErr) - if ts.CurPhase == targetStepPhaseRun { - inFlightTargets = append(inFlightTargets, ts) + tgs := tr.targets[tgt.ID] + stepErr := tr.steps[tgs.CurStep].runErr + tr.log.Debugf(" %d %s %v", i, tgs, stepErr) + if tgs.CurPhase == targetStepPhaseRun { + inFlightTargets = append(inFlightTargets, tgs) } if stepErr != nil && stepErr != statectx.ErrPaused { resumeOk = false @@ -280,15 +280,15 @@ func (tr *TestRunner) run( // We are not pausing and yet some targets were left in flight. // Use first target as a sample. if len(inFlightTargets) > 0 { - ts0 := inFlightTargets[0] + tgs0 := inFlightTargets[0] var lostTargets []string - for _, ts := range inFlightTargets { - if ts.CurStep == ts0.CurStep { - lostTargets = append(lostTargets, ts.tgt.ID) + for _, tgs := range inFlightTargets { + if tgs.CurStep == tgs0.CurStep { + lostTargets = append(lostTargets, tgs.tgt.ID) } } runErr = &cerrors.ErrTestStepLostTargets{ - StepName: tr.steps[ts0.CurStep].sb.TestStepLabel, + StepName: tr.steps[tgs0.CurStep].sb.TestStepLabel, Targets: lostTargets, } } @@ -360,7 +360,7 @@ func (tr *TestRunner) waitStepRunners(ctx statectx.Context) error { } } } - // Emit step error events. + // Emit step error eventgs. for _, ss := range tr.steps { if ss.runErr != nil && ss.runErr != statectx.ErrPaused && ss.runErr != statectx.ErrCanceled { if err := ss.emitEvent(EventTestError, nil, ss.runErr.Error()); err != nil { @@ -371,22 +371,22 @@ func (tr *TestRunner) waitStepRunners(ctx statectx.Context) error { return err } -func (tr *TestRunner) injectTarget(ctx statectx.Context, ts *targetState, ss *stepState, log *logrus.Entry) error { - log.Debugf("%s: injecting into %s", ts, ss) +func (tr *TestRunner) injectTarget(ctx statectx.Context, tgs *targetState, ss *stepState, log *logrus.Entry) error { + log.Debugf("%s: injecting into %s", tgs, ss) select { - case ss.inCh <- ts.tgt: + case ss.inCh <- tgs.tgt: // Injected successfully. - err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetIn, Target: ts.tgt}) + err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetIn, Target: tgs.tgt}) tr.mu.Lock() defer tr.mu.Unlock() - ts.CurPhase = targetStepPhaseRun + tgs.CurPhase = targetStepPhaseRun if err != nil { return fmt.Errorf("failed to report target injection: %w", err) } tr.cond.Signal() case <-time.After(tr.stepInjectTimeout): ss.log.Errorf("timed out while injecting a target") - if err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetInErr, Target: ts.tgt}); err != nil { + if err := ss.ev.Emit(testevent.Data{EventName: target.EventTargetInErr, Target: tgs.tgt}); err != nil { ss.log.Errorf("failed to emit event: %s", err) } return &cerrors.ErrTestTargetInjectionTimedOut{StepName: ss.sb.TestStepLabel} @@ -396,26 +396,26 @@ func (tr *TestRunner) injectTarget(ctx statectx.Context, ts *targetState, ss *st return nil } -func (tr *TestRunner) awaitTargetResult(ctx statectx.Context, ts *targetState, ss *stepState, log *logrus.Entry) error { +func (tr *TestRunner) awaitTargetResult(ctx statectx.Context, tgs *targetState, ss *stepState, log *logrus.Entry) error { select { - case res, ok := <-ts.resCh: + case res, ok := <-tgs.resCh: if !ok { - log.Debugf("%s: result channel closed", ts) + log.Debugf("%s: result channel closed", tgs) return statectx.ErrCanceled } - log.Debugf("%s: result for %s recd", ts, ss) + log.Debugf("%s: result for %s recd", tgs, ss) var err error if res == nil { - err = ss.emitEvent(target.EventTargetOut, ts.tgt, nil) + err = ss.emitEvent(target.EventTargetOut, tgs.tgt, nil) } else { - err = ss.emitEvent(target.EventTargetErr, ts.tgt, target.ErrPayload{Error: res.Error()}) + err = ss.emitEvent(target.EventTargetErr, tgs.tgt, target.ErrPayload{Error: res.Error()}) } if err != nil { ss.log.Errorf("failed to emit event: %s", err) } tr.mu.Lock() - ts.res = res - ts.CurPhase = targetStepPhaseEnd + tgs.res = res + tgs.CurPhase = targetStepPhaseEnd tr.mu.Unlock() tr.cond.Signal() return err @@ -425,48 +425,48 @@ func (tr *TestRunner) awaitTargetResult(ctx statectx.Context, ts *targetState, s // and collect all the results they produce. If that doesn't happen, // step runner will close resCh on its way out and unlock us. case <-ctx.Done(): - log.Debugf("%s: canceled 2", ts) + log.Debugf("%s: canceled 2", tgs) return statectx.ErrCanceled } } // targetHandler takes a single target through each step of the pipeline in sequence. // It injects the target, waits for the result, then moves on to the next step. -func (tr *TestRunner) targetHandler(ctx, stepCtx statectx.Context, ts *targetState) { - log := logging.AddField(tr.log, "target", ts.tgt.ID) - log.Debugf("%s: target runner active", ts) +func (tr *TestRunner) targetHandler(ctx, stepCtx statectx.Context, tgs *targetState) { + log := logging.AddField(tr.log, "target", tgs.tgt.ID) + log.Debugf("%s: target runner active", tgs) // NB: CurStep may be non-zero on entry if resumed loop: - for i := ts.CurStep; i < len(tr.steps); { + for i := tgs.CurStep; i < len(tr.steps); { // Early check for pause or cancelation. select { case <-ctx.Paused(): - log.Debugf("%s: paused 0", ts) + log.Debugf("%s: paused 0", tgs) break loop case <-ctx.Done(): - log.Debugf("%s: canceled 0", ts) + log.Debugf("%s: canceled 0", tgs) break loop default: } tr.mu.Lock() ss := tr.steps[i] - if ts.CurPhase == targetStepPhaseEnd { + if tgs.CurPhase == targetStepPhaseEnd { // This target already terminated. // Can happen if resumed from terminal state. tr.mu.Unlock() break loop } - ts.CurStep = i - ts.CurPhase = targetStepPhaseBegin + tgs.CurStep = i + tgs.CurPhase = targetStepPhaseBegin tr.mu.Unlock() // Make sure we have a step runner active. If not, start one. tr.runStepIfNeeded(stepCtx, ss) // Inject the target. - err := tr.injectTarget(ctx, ts, ss, log) + err := tr.injectTarget(ctx, tgs, ss, log) // Await result. It will be communicated to us by the step runner - // and returned in ts.res. + // and returned in tgs.res. if err == nil { - err = tr.awaitTargetResult(ctx, ts, ss, log) + err = tr.awaitTargetResult(ctx, tgs, ss, log) } if err != nil { ss.log.Errorf("%s", err) @@ -475,25 +475,25 @@ loop: ss.runErr = err tr.mu.Unlock() } else { - log.Debugf("%s: canceled 1", ts) + log.Debugf("%s: canceled 1", tgs) } break } tr.mu.Lock() - if ts.res != nil { + if tgs.res != nil { tr.mu.Unlock() break } i++ if i < len(tr.steps) { - ts.CurStep = i - ts.CurPhase = targetStepPhaseInit + tgs.CurStep = i + tgs.CurPhase = targetStepPhaseInit } tr.mu.Unlock() } - log.Debugf("%s: target runner finished", ts) + log.Debugf("%s: target runner finished", tgs) tr.mu.Lock() - ts.resCh = nil + tgs.resCh = nil tr.cond.Signal() tr.mu.Unlock() } @@ -566,8 +566,8 @@ func (tr *TestRunner) reportTargetResult(ctx statectx.Context, ss *stepState, tg resCh, err := func() (chan error, error) { tr.mu.Lock() defer tr.mu.Unlock() - ts := tr.targets[tgt.ID] - if ts == nil { + tgs := tr.targets[tgt.ID] + if tgs == nil { return nil, fmt.Errorf("%s: result for nonexistent target %s %v", ss, tgt, res) } if ss.tgtDone[tgt] { @@ -578,18 +578,18 @@ func (tr *TestRunner) reportTargetResult(ctx statectx.Context, ss *stepState, tg } ss.tgtDone[tgt] = true // Begin is also allowed here because it may happen that we get a result before target runner updates phase. - if ts.CurStep != ss.stepIndex || (ts.CurPhase != targetStepPhaseBegin && ts.CurPhase != targetStepPhaseRun) { + if tgs.CurStep != ss.stepIndex || (tgs.CurPhase != targetStepPhaseBegin && tgs.CurPhase != targetStepPhaseRun) { return nil, &cerrors.ErrTestStepReturnedUnexpectedResult{ StepName: ss.sb.TestStepLabel, Target: tgt.ID, } } - if ts.resCh == nil { + if tgs.resCh == nil { // This should not happen, must be an internal error. - return nil, fmt.Errorf("%s: target runner %s is not there, dropping result on the floor", ss, ts) + return nil, fmt.Errorf("%s: target runner %s is not there, dropping result on the floor", ss, tgs) } - ss.log.Debugf("%s: result for %s: %v", ss, ts, res) - return ts.resCh, nil + ss.log.Debugf("%s: result for %s: %v", ss, tgs, res) + return tgs.resCh, nil }() if err != nil { return err @@ -716,9 +716,9 @@ func (tr *TestRunner) runMonitor() error { // First, compute the starting step of the pipeline (it may be non-zero // if the pipeline was resumed). minStep := len(tr.steps) - for _, ts := range tr.targets { - if ts.CurStep < minStep { - minStep = ts.CurStep + for _, tgs := range tr.targets { + if tgs.CurStep < minStep { + minStep = tgs.CurStep } } if minStep < len(tr.steps) { @@ -734,13 +734,13 @@ loop: tr.log.Debugf("monitor pass %d: current step %s", pass, ss) // Check if all the targets have either made it past the injection phase or terminated. ok := true - for _, ts := range tr.targets { - tr.log.Debugf("monitor pass %d: %s: %s", pass, ss, ts) - if ts.resCh == nil { // Not running anymore + for _, tgs := range tr.targets { + tr.log.Debugf("monitor pass %d: %s: %s", pass, ss, tgs) + if tgs.resCh == nil { // Not running anymore continue } - if ts.CurStep < step || ts.CurPhase < targetStepPhaseRun { - tr.log.Debugf("monitor pass %d: %s: not all targets injected yet (%s)", pass, ss, ts) + if tgs.CurStep < step || tgs.CurPhase < targetStepPhaseRun { + tr.log.Debugf("monitor pass %d: %s: not all targets injected yet (%s)", pass, ss, tgs) ok = false break } @@ -795,10 +795,10 @@ func (ss *stepState) String() string { return fmt.Sprintf("[#%d %s]", ss.stepIndex, ss.sb.TestStepLabel) } -func (ts *targetState) String() string { +func (tgs *targetState) String() string { var resText string - if ts.res != nil { - resStr := fmt.Sprintf("%s", ts.res) + if tgs.res != nil { + resStr := fmt.Sprintf("%s", tgs.res) if len(resStr) > 20 { resStr = resStr[:20] + "..." } @@ -806,7 +806,7 @@ func (ts *targetState) String() string { } else { resText = "" } - finished := ts.resCh == nil + finished := tgs.resCh == nil return fmt.Sprintf("[%s %d %s %t %s]", - ts.tgt, ts.CurStep, ts.CurPhase, finished, resText) + tgs.tgt, tgs.CurStep, tgs.CurPhase, finished, resText) }