From 78f54793880f4494e6d5748d7c27cb8633164148 Mon Sep 17 00:00:00 2001 From: Priti Desai Date: Tue, 15 Sep 2020 22:38:47 -0700 Subject: [PATCH] refactoring pipelinerunstate Introducing PipelineRunFacts as a combination of PipelineRunState, DAG Graph, and finally Graph. This simplifies function definitions without having to pass graphs to PipelineRunState attributes. --- pkg/reconciler/pipelinerun/pipelinerun.go | 47 ++++--- .../resources/pipelinerunresolution.go | 38 +++--- .../resources/pipelinerunresolution_test.go | 9 +- .../pipelinerun/resources/pipelinerunstate.go | 129 +++++++++++------- .../resources/pipelinerunstate_test.go | 99 ++++++++------ 5 files changed, 189 insertions(+), 133 deletions(-) diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index 36a0c542514..5879cd7bac2 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -428,7 +428,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err return controller.NewPermanentError(err) } - for _, rprt := range pipelineRunState { + // Build PipelineRunFacts with a list of resolved pipeline tasks, + // dag tasks graph and final tasks graph + pipelineRunFacts := &resources.PipelineRunFacts{ + State: pipelineRunState, + TasksGraph: d, + FinalTasksGraph: dfinally, + } + + for _, rprt := range pipelineRunFacts.State { err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources) if err != nil { logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err) @@ -437,7 +445,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err } } - if pipelineRunState.IsBeforeFirstTaskRun() { + if pipelineRunFacts.State.IsBeforeFirstTaskRun() { if pr.HasVolumeClaimTemplate() { // create workspace PVC from template if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(pr.Spec.Workspaces, pr.GetOwnerReference(), pr.Namespace); err != nil { @@ -467,11 +475,11 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err return controller.NewPermanentError(err) } - if err := c.runNextSchedulableTask(ctx, pr, d, dfinally, pipelineRunState, as); err != nil { + if err := c.runNextSchedulableTask(ctx, pr, pipelineRunFacts, as); err != nil { return err } - after := pipelineRunState.GetPipelineConditionStatus(pr, logger, d, dfinally) + after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger) switch after.Status { case corev1.ConditionTrue: pr.Status.MarkSucceeded(after.Reason, after.Message) @@ -482,8 +490,8 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err } // Read the condition the way it was set by the Mark* helpers after = pr.Status.GetCondition(apis.ConditionSucceeded) - pr.Status.TaskRuns = pipelineRunState.GetTaskRunsStatus(pr) - pr.Status.SkippedTasks = pipelineRunState.GetSkippedTasks(pr, d) + pr.Status.TaskRuns = pipelineRunFacts.State.GetTaskRunsStatus(pr) + pr.Status.SkippedTasks = pipelineRunFacts.GetSkippedTasks() logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after) return nil } @@ -491,28 +499,19 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err // runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current // pipeline run state, and starts them // after all DAG tasks are done, it's responsible for scheduling final tasks and start executing them -func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, dfinally *dag.Graph, pipelineRunState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error { +func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, pipelineRunFacts *resources.PipelineRunFacts, as artifacts.ArtifactStorageInterface) error { logger := logging.FromContext(ctx) recorder := controller.GetEventRecorder(ctx) - var nextRprts []*resources.ResolvedPipelineRunTask - - // when pipeline run is stopping, do not schedule any new task and only - // wait for all running tasks to complete and report their status - if !pipelineRunState.IsStopping(d) { - // candidateTasks is initialized to DAG root nodes to start pipeline execution - // candidateTasks is derived based on successfully finished tasks and/or skipped tasks - candidateTasks, err := dag.GetSchedulable(d, pipelineRunState.SuccessfulOrSkippedDAGTasks(d)...) - if err != nil { - logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err) - return controller.NewPermanentError(err) - } - // nextRprts holds a list of pipeline tasks which should be executed next - nextRprts = pipelineRunState.GetNextTasks(candidateTasks) + // nextRprts holds a list of pipeline tasks which should be executed next + nextRprts, err := pipelineRunFacts.DAGExecutionQueue() + if err != nil { + logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err) + return controller.NewPermanentError(err) } - resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunState, nextRprts) + resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunFacts.State, nextRprts) if err != nil { logger.Infof("Failed to resolve all task params for %q with error %v", pr.Name, err) pr.Status.MarkFailed(ReasonFailedValidation, err.Error()) @@ -522,10 +521,10 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip resources.ApplyTaskResults(nextRprts, resolvedResultRefs) // GetFinalTasks only returns tasks when a DAG is complete - nextRprts = append(nextRprts, pipelineRunState.GetFinalTasks(d, dfinally)...) + nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...) for _, rprt := range nextRprts { - if rprt == nil || rprt.Skip(pipelineRunState, d) { + if rprt == nil || rprt.Skip(pipelineRunFacts) { continue } if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go index ab80f7bea16..7c23ea4e0d7 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go @@ -30,7 +30,6 @@ import ( "github.com/tektoncd/pipeline/pkg/contexts" "github.com/tektoncd/pipeline/pkg/list" "github.com/tektoncd/pipeline/pkg/names" - "github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag" "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources" ) @@ -134,17 +133,15 @@ func (t ResolvedPipelineRunTask) IsStarted() bool { return true } -func (t *ResolvedPipelineRunTask) checkParentsDone(state PipelineRunState, d *dag.Graph) bool { - stateMap := state.ToMap() +func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool { + stateMap := facts.State.ToMap() // check if parent tasks are done executing, // if any of the parents is not yet scheduled or still running, // wait for it to complete before evaluating when expressions - node := d.Nodes[t.PipelineTask.Name] - if isTaskInGraph(t.PipelineTask.Name, d) { - for _, p := range node.Prev { - if !stateMap[p.Task.HashKey()].IsDone() { - return false - } + node := facts.TasksGraph.Nodes[t.PipelineTask.Name] + for _, p := range node.Prev { + if !stateMap[p.Task.HashKey()].IsDone() { + return false } } return true @@ -156,7 +153,12 @@ func (t *ResolvedPipelineRunTask) checkParentsDone(state PipelineRunState, d *da // (3) its parent task was skipped // (4) Pipeline is in stopping state (one of the PipelineTasks failed) // Note that this means Skip returns false if a conditionCheck is in progress -func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) bool { +func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool { + // finally tasks are never skipped. If this is a final task, return false + if facts.isFinalTask(t.PipelineTask.Name) { + return false + } + // it already has TaskRun associated with it - PipelineTask not skipped if t.IsStarted() { return false @@ -170,7 +172,7 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo } // Check if the when expressions are false, based on the input's relationship to the values - if t.checkParentsDone(state, d) { + if t.checkParentsDone(facts) { if len(t.PipelineTask.WhenExpressions) > 0 { if !t.PipelineTask.WhenExpressions.HaveVariables() { if !t.PipelineTask.WhenExpressions.AllowsExecution() { @@ -181,19 +183,17 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo } // Skip the PipelineTask if pipeline is in stopping state - if isTaskInGraph(t.PipelineTask.Name, d) && state.IsStopping(d) { + if facts.IsStopping() { return true } - stateMap := state.ToMap() + stateMap := facts.State.ToMap() // Recursively look at parent tasks to see if they have been skipped, // if any of the parents have been skipped, skip as well - node := d.Nodes[t.PipelineTask.Name] - if isTaskInGraph(t.PipelineTask.Name, d) { - for _, p := range node.Prev { - if stateMap[p.Task.HashKey()].Skip(state, d) { - return true - } + node := facts.TasksGraph.Nodes[t.PipelineTask.Name] + for _, p := range node.Prev { + if stateMap[p.Task.HashKey()].Skip(facts) { + return true } } return false diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go index 5091353ac3b..ecfd37cd602 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go @@ -835,7 +835,7 @@ func TestIsSkipped(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - dag, err := DagFromState(tc.state) + d, err := DagFromState(tc.state) if err != nil { t.Fatalf("Could not get a dag from the TC state %#v: %v", tc.state, err) } @@ -844,7 +844,12 @@ func TestIsSkipped(t *testing.T) { if rprt == nil { t.Fatalf("Could not get task %s from the state: %v", tc.taskName, tc.state) } - isSkipped := rprt.Skip(tc.state, dag) + facts := PipelineRunFacts{ + State: tc.state, + TasksGraph: d, + FinalTasksGraph: &dag.Graph{}, + } + isSkipped := rprt.Skip(&facts) if d := cmp.Diff(isSkipped, tc.expected); d != "" { t.Errorf("Didn't get expected isSkipped %s", diff.PrintWantGot(d)) } diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go index fc18949076b..b2e4a09627c 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go @@ -32,6 +32,12 @@ import ( // state of the PipelineRun. type PipelineRunState []*ResolvedPipelineRunTask +type PipelineRunFacts struct { + State PipelineRunState + TasksGraph *dag.Graph + FinalTasksGraph *dag.Graph +} + // ToMap returns a map that maps pipeline task name to the resolved pipeline run task func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask { m := make(map[string]*ResolvedPipelineRunTask) @@ -41,17 +47,6 @@ func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask { return m } -// IsDone returns true when all pipeline tasks have respective taskRun created and -// that taskRun has either succeeded or failed after all possible retry attempts -func (state PipelineRunState) IsDone() bool { - for _, t := range state { - if !t.IsDone() { - return false - } - } - return true -} - // IsBeforeFirstTaskRun returns true if the PipelineRun has not yet started its first TaskRun func (state PipelineRunState) IsBeforeFirstTaskRun() bool { for _, t := range state { @@ -62,22 +57,6 @@ func (state PipelineRunState) IsBeforeFirstTaskRun() bool { return true } -// IsStopping returns true if the PipelineRun won't be scheduling any new Task because -// at least one task already failed or was cancelled in the specified dag -func (state PipelineRunState) IsStopping(d *dag.Graph) bool { - for _, t := range state { - if isTaskInGraph(t.PipelineTask.Name, d) { - if t.IsCancelled() { - return true - } - if t.IsFailure() { - return true - } - } - } - return false -} - // GetNextTasks returns a list of tasks which should be executed next i.e. // a list of tasks from candidateTasks which aren't yet indicated in state to be running and // a list of cancelled/failed tasks from candidateTasks which haven't exhausted their retries @@ -101,13 +80,29 @@ func (state PipelineRunState) GetNextTasks(candidateTasks sets.String) []*Resolv return tasks } -// SuccessfulOrSkippedDAGTasks returns a list of the names of all of the PipelineTasks in state +// IsStopping returns true if the PipelineRun won't be scheduling any new Task because +// at least one task already failed or was cancelled in the specified dag +func (facts *PipelineRunFacts) IsStopping() bool { + for _, t := range facts.State { + if facts.isDAGTask(t.PipelineTask.Name) { + if t.IsCancelled() { + return true + } + if t.IsFailure() { + return true + } + } + } + return false +} + +// SuccessfulOrSkippedTasks returns a list of the names of all of the PipelineTasks in state // which have successfully completed or skipped -func (state PipelineRunState) SuccessfulOrSkippedDAGTasks(d *dag.Graph) []string { +func (facts *PipelineRunFacts) SuccessfulOrSkippedDAGTasks() []string { tasks := []string{} - for _, t := range state { - if isTaskInGraph(t.PipelineTask.Name, d) { - if t.IsSuccessful() || t.Skip(state, d) { + for _, t := range facts.State { + if facts.isDAGTask(t.PipelineTask.Name) { + if t.IsSuccessful() || t.Skip(facts) { tasks = append(tasks, t.PipelineTask.Name) } } @@ -117,14 +112,14 @@ func (state PipelineRunState) SuccessfulOrSkippedDAGTasks(d *dag.Graph) []string // checkTasksDone returns true if all tasks from the specified graph are finished executing // a task is considered done if it has failed/succeeded/skipped -func (state PipelineRunState) checkTasksDone(d *dag.Graph) bool { - for _, t := range state { +func (facts *PipelineRunFacts) checkTasksDone(d *dag.Graph) bool { + for _, t := range facts.State { if isTaskInGraph(t.PipelineTask.Name, d) { if t.TaskRun == nil { // this task might have skipped if taskRun is nil // continue and ignore if this task was skipped // skipped task is considered part of done - if t.Skip(state, d) { + if t.Skip(facts) { continue } return false @@ -137,29 +132,53 @@ func (state PipelineRunState) checkTasksDone(d *dag.Graph) bool { return true } +func (facts *PipelineRunFacts) CheckDAGTasksDone() bool { + return facts.checkTasksDone(facts.TasksGraph) +} + +func (facts *PipelineRunFacts) CheckFinalTasksDone() bool { + return facts.checkTasksDone(facts.FinalTasksGraph) +} + +func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) { + tasks := PipelineRunState{} + // when pipeline run is stopping, do not schedule any new task and only + // wait for all running tasks to complete and report their status + if !facts.IsStopping() { + // candidateTasks is initialized to DAG root nodes to start pipeline execution + // candidateTasks is derived based on successfully finished tasks and/or skipped tasks + candidateTasks, err := dag.GetSchedulable(facts.TasksGraph, facts.SuccessfulOrSkippedDAGTasks()...) + if err != nil { + return tasks, err + } + tasks = facts.State.GetNextTasks(candidateTasks) + } + return tasks, nil +} + // GetFinalTasks returns a list of final tasks without any taskRun associated with it // GetFinalTasks returns final tasks only when all DAG tasks have finished executing successfully or skipped or // any one DAG task resulted in failure -func (state PipelineRunState) GetFinalTasks(d *dag.Graph, dfinally *dag.Graph) []*ResolvedPipelineRunTask { - tasks := []*ResolvedPipelineRunTask{} +func (facts *PipelineRunFacts) GetFinalTasks() PipelineRunState { + tasks := PipelineRunState{} finalCandidates := sets.NewString() // check either pipeline has finished executing all DAG pipelineTasks // or any one of the DAG pipelineTask has failed - if state.checkTasksDone(d) { + if facts.CheckDAGTasksDone() { // return list of tasks with all final tasks - for _, t := range state { - if isTaskInGraph(t.PipelineTask.Name, dfinally) && !t.IsSuccessful() { + for _, t := range facts.State { + if facts.isFinalTask(t.PipelineTask.Name) && !t.IsSuccessful() { finalCandidates.Insert(t.PipelineTask.Name) } } - tasks = state.GetNextTasks(finalCandidates) + tasks = facts.State.GetNextTasks(finalCandidates) } return tasks } // GetPipelineConditionStatus will return the Condition that the PipelineRun prName should be // updated with, based on the status of the TaskRuns in state. -func (state PipelineRunState) GetPipelineConditionStatus(pr *v1beta1.PipelineRun, logger *zap.SugaredLogger, dag *dag.Graph, dfinally *dag.Graph) *apis.Condition { +func (facts *PipelineRunFacts) GetPipelineConditionStatus(pr *v1beta1.PipelineRun, logger *zap.SugaredLogger) *apis.Condition { // We have 4 different states here: // 1. Timed out -> Failed // 2. All tasks are done and at least one has failed or has been cancelled -> Failed @@ -193,12 +212,12 @@ func (state PipelineRunState) GetPipelineConditionStatus(pr *v1beta1.PipelineRun // - Some successful, some skipped: ReasonCompleted // - Some cancelled, none failed: ReasonCancelled // - At least one failed: ReasonFailed - for _, rprt := range state { + for _, rprt := range facts.State { allTasks = append(allTasks, rprt.PipelineTask.Name) switch { case rprt.IsSuccessful(): withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name) - case rprt.Skip(state, dag): + case rprt.Skip(facts): withStatusTasks = append(withStatusTasks, rprt.PipelineTask.Name) skipTasks = append(skipTasks, v1beta1.SkippedTask{Name: rprt.PipelineTask.Name}) // At least one is skipped and no failure yet, mark as completed @@ -237,7 +256,7 @@ func (state PipelineRunState) GetPipelineConditionStatus(pr *v1beta1.PipelineRun // transition pipeline into stopping state when one of the tasks(dag/final) cancelled or one of the dag tasks failed // for a pipeline with final tasks, single dag task failure does not transition to interim stopping state // pipeline stays in running state until all final tasks are done before transitioning to failed state - if cancelledTasks > 0 || (failedTasks > 0 && state.checkTasksDone(dfinally)) { + if cancelledTasks > 0 || (failedTasks > 0 && facts.CheckFinalTasksDone()) { reason = v1beta1.PipelineRunReasonStopping.String() } else { reason = v1beta1.PipelineRunReasonRunning.String() @@ -251,10 +270,10 @@ func (state PipelineRunState) GetPipelineConditionStatus(pr *v1beta1.PipelineRun } } -func (state PipelineRunState) GetSkippedTasks(pr *v1beta1.PipelineRun, d *dag.Graph) []v1beta1.SkippedTask { +func (facts *PipelineRunFacts) GetSkippedTasks() []v1beta1.SkippedTask { skipped := []v1beta1.SkippedTask{} - for _, rprt := range state { - if rprt.Skip(state, d) { + for _, rprt := range facts.State { + if rprt.Skip(facts) { skipped = append(skipped, v1beta1.SkippedTask{Name: rprt.PipelineTask.Name}) } } @@ -310,6 +329,20 @@ func (state PipelineRunState) GetTaskRunsStatus(pr *v1beta1.PipelineRun) map[str return status } +func (facts *PipelineRunFacts) isDAGTask(pipelineTaskName string) bool { + if _, ok := facts.TasksGraph.Nodes[pipelineTaskName]; ok { + return true + } + return false +} + +func (facts *PipelineRunFacts) isFinalTask(pipelineTaskName string) bool { + if _, ok := facts.FinalTasksGraph.Nodes[pipelineTaskName]; ok { + return true + } + return false +} + // Check if a PipelineTask belongs to the specified Graph func isTaskInGraph(pipelineTaskName string, d *dag.Graph) bool { if _, ok := d.Nodes[pipelineTaskName]; ok { diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go index 4bad1e02b50..c1f2cc5d20b 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go @@ -34,8 +34,7 @@ import ( duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1" ) -func TestIsDone(t *testing.T) { - +func TestPipelineRunFacts_CheckDAGTasksDoneDone(t *testing.T) { var taskCancelledByStatusState = PipelineRunState{{ PipelineTask: &pts[4], // 2 retries needed TaskRunName: "pipelinerun-mytask1", @@ -90,15 +89,6 @@ func TestIsDone(t *testing.T) { }, }} - var noPipelineTaskState = PipelineRunState{{ - PipelineTask: nil, - TaskRunName: "pipelinerun-mytask1", - TaskRun: withRetries(makeFailed(trs[0])), - ResolvedTaskResources: &resources.ResolvedTaskResources{ - TaskSpec: &task.Spec, - }, - }} - var noTaskRunState = PipelineRunState{{ PipelineTask: &pts[4], // 2 retries needed TaskRunName: "pipelinerun-mytask1", @@ -143,11 +133,6 @@ func TestIsDone(t *testing.T) { state: taskExpectedState, expected: false, ptExpected: []bool{false}, - }, { - name: "no-pipelineTask", - state: noPipelineTaskState, - expected: false, - ptExpected: []bool{false}, }, { name: "No-taskrun", state: noTaskRunState, @@ -157,15 +142,24 @@ func TestIsDone(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { + d, err := DagFromState(tc.state) + if err != nil { + t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err) + } + facts := PipelineRunFacts{ + State: tc.state, + TasksGraph: d, + FinalTasksGraph: &dag.Graph{}, + } - isDone := tc.state.IsDone() + isDone := facts.checkTasksDone(d) if d := cmp.Diff(isDone, tc.expected); d != "" { t.Errorf("Didn't get expected IsDone %s", diff.PrintWantGot(d)) } for i, pt := range tc.state { isDone = pt.IsDone() if d := cmp.Diff(isDone, tc.ptExpected[i]); d != "" { - t.Errorf("Didn't get expected (ResolvedPipelineRunTask) IsDone %s", diff.PrintWantGot(d)) + t.Errorf("Didn't get expected (ResolvedPipelineRunTask) checkTasksDone %s", diff.PrintWantGot(d)) } } @@ -463,11 +457,16 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { }} for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - dag, err := DagFromState(tc.state) + d, err := DagFromState(tc.state) if err != nil { t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err) } - names := tc.state.SuccessfulOrSkippedDAGTasks(dag) + facts := PipelineRunFacts{ + State: tc.state, + TasksGraph: d, + FinalTasksGraph: &dag.Graph{}, + } + names := facts.SuccessfulOrSkippedDAGTasks() if d := cmp.Diff(names, tc.expectedNames); d != "" { t.Errorf("Expected to get completed names %v but got something different %s", tc.expectedNames, diff.PrintWantGot(d)) } @@ -482,7 +481,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state PipelineRunState DAGTasks []v1beta1.PipelineTask finalTasks []v1beta1.PipelineTask - expectedFinalTasks []*ResolvedPipelineRunTask + expectedFinalTasks PipelineRunState }{{ // tasks: [ mytask1, mytask2] // none finally @@ -492,7 +491,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: oneStartedState, DAGTasks: []v1beta1.PipelineTask{pts[0], pts[1]}, finalTasks: []v1beta1.PipelineTask{}, - expectedFinalTasks: []*ResolvedPipelineRunTask{}, + expectedFinalTasks: PipelineRunState{}, }, { // tasks: [ mytask1] // finally: [mytask2] @@ -501,7 +500,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: noneStartedState, DAGTasks: []v1beta1.PipelineTask{pts[0]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{}, + expectedFinalTasks: PipelineRunState{}, }, { // tasks: [ mytask1] // finally: [mytask2] @@ -510,7 +509,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: oneStartedState, DAGTasks: []v1beta1.PipelineTask{pts[0]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{}, + expectedFinalTasks: PipelineRunState{}, }, { // tasks: [ mytask1] // finally: [mytask2] @@ -519,7 +518,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: oneFinishedState, DAGTasks: []v1beta1.PipelineTask{pts[0]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{oneFinishedState[1]}, + expectedFinalTasks: PipelineRunState{oneFinishedState[1]}, }, { // tasks: [ mytask1] // finally: [mytask2] @@ -528,7 +527,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: oneFailedState, DAGTasks: []v1beta1.PipelineTask{pts[0]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{oneFinishedState[1]}, + expectedFinalTasks: PipelineRunState{oneFinishedState[1]}, }, { // tasks: [ mytask6 with condition] // finally: [mytask2] @@ -537,7 +536,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(conditionCheckStartedState, noneStartedState[0]), DAGTasks: []v1beta1.PipelineTask{pts[5]}, finalTasks: []v1beta1.PipelineTask{pts[0]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{}, + expectedFinalTasks: PipelineRunState{}, }, { // tasks: [ mytask6 with condition] // finally: [mytask2] @@ -546,7 +545,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(conditionCheckSuccessNoTaskStartedState, noneStartedState[0]), DAGTasks: []v1beta1.PipelineTask{pts[5]}, finalTasks: []v1beta1.PipelineTask{pts[0]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{}, + expectedFinalTasks: PipelineRunState{}, }, { // tasks: [ mytask6 with condition] // finally: [mytask2] @@ -555,7 +554,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(conditionCheckFailedWithNoOtherTasksState, noneStartedState[0]), DAGTasks: []v1beta1.PipelineTask{pts[5]}, finalTasks: []v1beta1.PipelineTask{pts[0]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{noneStartedState[0]}, + expectedFinalTasks: PipelineRunState{noneStartedState[0]}, }, { // tasks: [ mytask1, mytask6 with condition] // finally: [mytask2] @@ -564,7 +563,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(conditionCheckFailedWithOthersPassedState, noneStartedState[1]), DAGTasks: []v1beta1.PipelineTask{pts[5], pts[0]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{noneStartedState[1]}, + expectedFinalTasks: PipelineRunState{noneStartedState[1]}, }, { // tasks: [ mytask1, mytask6 with condition] // finally: [mytask2] @@ -573,7 +572,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(conditionCheckFailedWithOthersFailedState, noneStartedState[1]), DAGTasks: []v1beta1.PipelineTask{pts[5], pts[0]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{noneStartedState[1]}, + expectedFinalTasks: PipelineRunState{noneStartedState[1]}, }, { // tasks: [ mytask6 with condition, mytask7 runAfter mytask6] // finally: [mytask2] @@ -582,7 +581,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(taskWithParentSkippedState, noneStartedState[1]), DAGTasks: []v1beta1.PipelineTask{pts[5], pts[6]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{noneStartedState[1]}, + expectedFinalTasks: PipelineRunState{noneStartedState[1]}, }, { // tasks: [ mytask1, mytask6 with condition, mytask8 runAfter mytask6] // finally: [mytask2] @@ -591,7 +590,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(taskWithMultipleParentsSkippedState, noneStartedState[1]), DAGTasks: []v1beta1.PipelineTask{pts[0], pts[5], pts[7]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{noneStartedState[1]}, + expectedFinalTasks: PipelineRunState{noneStartedState[1]}, }, { // tasks: [ mytask1, mytask6 with condition, mytask8 runAfter mytask6, mytask9 runAfter mytask1 and mytask6] // finally: [mytask2] @@ -601,7 +600,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(taskWithGrandParentSkippedState, noneStartedState[1]), DAGTasks: []v1beta1.PipelineTask{pts[0], pts[5], pts[7], pts[8]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{noneStartedState[1]}, + expectedFinalTasks: PipelineRunState{noneStartedState[1]}, }, { //tasks: [ mytask1, mytask6 with condition, mytask8 runAfter mytask6, mytask9 runAfter mytask1 and mytask6] //finally: [mytask2] @@ -611,7 +610,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(taskWithGrandParentsOneFailedState, noneStartedState[1]), DAGTasks: []v1beta1.PipelineTask{pts[0], pts[5], pts[7], pts[8]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{noneStartedState[1]}, + expectedFinalTasks: PipelineRunState{noneStartedState[1]}, }, { //tasks: [ mytask1, mytask6 with condition, mytask8 runAfter mytask6, mytask9 runAfter mytask1 and mytask6] //finally: [mytask2] @@ -620,7 +619,7 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { state: append(taskWithGrandParentsOneNotRunState, noneStartedState[1]), DAGTasks: []v1beta1.PipelineTask{pts[0], pts[5], pts[7], pts[8]}, finalTasks: []v1beta1.PipelineTask{pts[1]}, - expectedFinalTasks: []*ResolvedPipelineRunTask{}, + expectedFinalTasks: PipelineRunState{}, }} for _, tc := range tcs { dagGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.DAGTasks)) @@ -632,7 +631,12 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) { t.Fatalf("Unexpected error while buildig DAG for final pipelineTasks %v: %v", tc.finalTasks, err) } t.Run(tc.name, func(t *testing.T) { - next := tc.state.GetFinalTasks(dagGraph, finalGraph) + facts := PipelineRunFacts{ + State: tc.state, + TasksGraph: dagGraph, + FinalTasksGraph: finalGraph, + } + next := facts.GetFinalTasks() if d := cmp.Diff(tc.expectedFinalTasks, next); d != "" { t.Errorf("Didn't get expected final Tasks for %s (%s): %s", tc.name, tc.desc, diff.PrintWantGot(d)) } @@ -882,7 +886,12 @@ func TestGetPipelineConditionStatus(t *testing.T) { if err != nil { t.Fatalf("Unexpected error while buildig DAG for state %v: %v", tc.state, err) } - c := tc.state.GetPipelineConditionStatus(pr, zap.NewNop().Sugar(), d, &dag.Graph{}) + facts := PipelineRunFacts{ + State: tc.state, + TasksGraph: d, + FinalTasksGraph: &dag.Graph{}, + } + c := facts.GetPipelineConditionStatus(pr, zap.NewNop().Sugar()) wantCondition := &apis.Condition{ Type: apis.ConditionSucceeded, Status: tc.expectedStatus, @@ -993,7 +1002,12 @@ func TestGetPipelineConditionStatus_WithFinalTasks(t *testing.T) { if err != nil { t.Fatalf("Unexpected error while buildig graph for final tasks %v: %v", tc.finalTasks, err) } - c := tc.state.GetPipelineConditionStatus(pr, zap.NewNop().Sugar(), d, df) + facts := PipelineRunFacts{ + State: tc.state, + TasksGraph: d, + FinalTasksGraph: df, + } + c := facts.GetPipelineConditionStatus(pr, zap.NewNop().Sugar()) wantCondition := &apis.Condition{ Type: apis.ConditionSucceeded, Status: tc.expectedStatus, @@ -1025,7 +1039,12 @@ func TestGetPipelineConditionStatus_PipelineTimeouts(t *testing.T) { }, }, } - c := oneFinishedState.GetPipelineConditionStatus(pr, zap.NewNop().Sugar(), d, &dag.Graph{}) + facts := PipelineRunFacts{ + State: oneFinishedState, + TasksGraph: d, + FinalTasksGraph: &dag.Graph{}, + } + c := facts.GetPipelineConditionStatus(pr, zap.NewNop().Sugar()) if c.Status != corev1.ConditionFalse && c.Reason != v1beta1.PipelineRunReasonTimedOut.String() { t.Fatalf("Expected to get status %s but got %s for state %v", corev1.ConditionFalse, c.Status, oneFinishedState) }