From cdf533c678c83ee86e1a3d4f5c798e71172c0cd2 Mon Sep 17 00:00:00 2001 From: Andrea Frittoli Date: Sun, 15 Nov 2020 11:22:49 +0000 Subject: [PATCH] Fix recursion issue on Skip The pipelinerun state is made of resolved pipelinerun tasks (rprt), which are build from the actual status of the associated taskruns. It is computationaly easy to know if a taskrun started, or completed successfully or unsuccessfully; however determining whether a taskrun has been skipped or will be skipped in the pipeline run execution, requires evaluating the entire pipeline run status and associated dag. The Skip method used to apply to a single rprt, evaluate the entire pipeline run status and dag, return whether the specific rprt was going to be skipped, and throw away the rest. We used to invoke the Skip method on every rprt in the pipeline state to calculate candidate tasks for execution. To make things worse, we also invoked the "Skip" method as part of the "isDone", defined as a logical OR between "isSuccessful", "isFailed" and "Skip". With this change we compute the list of tasks to be skipped once, and we store the result in a map to the pipelinerun facts, along with pipelinerun state and associated dags. We introdce a new method on the pipelinerun facts called "IsTaskSkipped". This method performs the lazy computation of the skips map the first time it is invoked. Any following invocation is able to provide the skip status of a specific pipeline task by looking up in the map. This solution manages to hide some of the details of the skip logic from the core reconciler logic. I believe further refactor could help, but I wanted to keep this PR as little as possible. I will further pursue this work by revining https://github.com/tektoncd/pipeline/pull/2821 I converted the unit test for "Skip" to a unit test for "SkipMap", to ensure that the new logic gives the same result as we used to have. The test should be moved to a different module as "SkipMap" lives in a different module, however leaving it in place very much helps with the review. I will move it in a follow-up patch. This changes adds a unit test that reproduces the issue in #3521, which used to fail (with timeout 30s) and now succeedes for pipelines roughly up to 120 tasks / 120 links. On my laptop, going beyond 120 tasks/links takes longer than 30s, so I left the unit test at 80 to avoid introducing a flaky test in CI. There is still work to do to improve this further, some profiling / tracing work might help. Breaking large pipelines in logical groups (branches or pipelines in pipelines) would help reduce the complexity and computational cost for very large pipelines. Fixes #3521 Signed-off-by: Andrea Frittoli --- pkg/reconciler/pipelinerun/pipelinerun.go | 8 +- .../pipelinerun/pipelinerun_test.go | 2 +- .../resources/pipelinerunresolution.go | 54 +-------- .../resources/pipelinerunresolution_test.go | 30 +++-- .../pipelinerun/resources/pipelinerunstate.go | 106 +++++++++++++++++- .../resources/pipelinerunstate_test.go | 65 +++++++++++ 6 files changed, 201 insertions(+), 64 deletions(-) diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index f6687352742..4d66169a98c 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -524,6 +524,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get return err } + // Reset the skipped status to trigger recalculation + pipelineRunFacts.ResetSkippedCache() + after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger) switch after.Status { case corev1.ConditionTrue: @@ -565,12 +568,15 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip } resources.ApplyTaskResults(nextRprts, resolvedResultRefs) + // After we apply Task Results, we may be able to evaluate more + // when expressions, so reset the skipped cache + pipelineRunFacts.ResetSkippedCache() // GetFinalTasks only returns tasks when a DAG is complete nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...) for _, rprt := range nextRprts { - if rprt == nil || rprt.Skip(pipelineRunFacts) { + if rprt == nil || pipelineRunFacts.IsTaskSkipped(rprt.PipelineTask.Name) { continue } if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() { diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 1a0ce877505..598fa91f848 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -2287,7 +2287,7 @@ func TestReconcileWithWhenExpressionsWithTaskResults(t *testing.T) { t.Fatalf("Failure to list TaskRun's %s", err) } if len(actualSkippedTask.Items) != 0 { - t.Fatalf("Expected 0 TaskRuns got %d", len(actualSkippedTask.Items)) + t.Fatalf("When listing %s, expected 0 TaskRuns got %d", skippedTask, len(actualSkippedTask.Items)) } } } diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go index 787a89a1ef8..e9b43961c09 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go @@ -71,7 +71,7 @@ type ResolvedPipelineRunTask struct { // IsDone returns true only if the task is skipped, succeeded or failed func (t ResolvedPipelineRunTask) IsDone(facts *PipelineRunFacts) bool { - return t.Skip(facts) || t.IsSuccessful() || t.IsFailure() + return facts.IsTaskSkipped(t.PipelineTask.Name) || t.IsSuccessful() || t.IsFailure() } // IsSuccessful returns true only if the taskrun itself has completed successfully @@ -122,35 +122,6 @@ func (t ResolvedPipelineRunTask) IsStarted() bool { return true } -func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool { - stateMap := facts.State.ToMap() - node := facts.TasksGraph.Nodes[t.PipelineTask.Name] - for _, p := range node.Prev { - if !stateMap[p.Task.HashKey()].IsDone(facts) { - return false - } - } - return true -} - -// Skip returns true if a PipelineTask will not be run because -// (1) its When Expressions evaluated to false -// (2) its Condition Checks failed -// (3) its parent task was skipped -// (4) Pipeline is in stopping state (one of the PipelineTasks failed) -// Note that this means Skip returns false if a conditionCheck is in progress -func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool { - if facts.isFinalTask(t.PipelineTask.Name) || t.IsStarted() { - return false - } - - if t.conditionsSkip() || t.whenExpressionsSkip(facts) || t.parentTasksSkip(facts) || facts.IsStopping() { - return true - } - - return false -} - func (t *ResolvedPipelineRunTask) conditionsSkip() bool { if len(t.ResolvedConditionChecks) > 0 { if t.ResolvedConditionChecks.IsDone() && !t.ResolvedConditionChecks.IsSuccess() { @@ -160,30 +131,17 @@ func (t *ResolvedPipelineRunTask) conditionsSkip() bool { return false } -func (t *ResolvedPipelineRunTask) whenExpressionsSkip(facts *PipelineRunFacts) bool { - if t.checkParentsDone(facts) { - if len(t.PipelineTask.WhenExpressions) > 0 { - if !t.PipelineTask.WhenExpressions.HaveVariables() { - if !t.PipelineTask.WhenExpressions.AllowsExecution() { - return true - } +func (t *ResolvedPipelineRunTask) whenExpressionsSkip() bool { + if len(t.PipelineTask.WhenExpressions) > 0 { + if !t.PipelineTask.WhenExpressions.HaveVariables() { + if !t.PipelineTask.WhenExpressions.AllowsExecution() { + return true } } } return false } -func (t *ResolvedPipelineRunTask) parentTasksSkip(facts *PipelineRunFacts) bool { - stateMap := facts.State.ToMap() - node := facts.TasksGraph.Nodes[t.PipelineTask.Name] - for _, p := range node.Prev { - if stateMap[p.Task.HashKey()].Skip(facts) { - return true - } - } - return false -} - // GetTaskRun is a function that will retrieve the TaskRun name. type GetTaskRun func(name string) (*v1beta1.TaskRun, error) diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go index 3c2471af97a..5fa6b65207c 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go @@ -535,7 +535,11 @@ func DagFromState(state PipelineRunState) (*dag.Graph, error) { return dag.Build(v1beta1.PipelineTaskList(pts)) } -func TestIsSkipped(t *testing.T) { +func TestSkipMap(t *testing.T) { + // NOTE(afrittoli) This test now logically belongs to + // pipelinerunstate_test.go, but I left it here for now as it + // makes the review much easier. I will move this to the other + // module in a follow-up patch. tcs := []struct { name string @@ -611,6 +615,7 @@ func TestIsSkipped(t *testing.T) { PipelineTask: &pts[6], }}, expected: map[string]bool{ + "mytask6": false, "mytask7": false, }, }, { @@ -627,6 +632,7 @@ func TestIsSkipped(t *testing.T) { PipelineTask: &pts[6], }}, expected: map[string]bool{ + "mytask6": true, "mytask7": true, }, }, { @@ -647,6 +653,7 @@ func TestIsSkipped(t *testing.T) { PipelineTask: &pts[6], }}, expected: map[string]bool{ + "mytask6": false, "mytask7": false, }, }, { @@ -685,6 +692,7 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask6": false, "mytask7": true, }, }, { @@ -705,6 +713,7 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask6": false, "mytask7": true, }, }, { @@ -736,6 +745,8 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask6": false, + "mytask7": true, "mytask10": true, }, }, { @@ -763,6 +774,8 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask6": false, + "mytask1": false, "mytask8": true, }, }, { @@ -790,6 +803,8 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask1": false, + "mytask6": false, "mytask7": true, }, }, { @@ -834,6 +849,7 @@ func TestIsSkipped(t *testing.T) { }, }}, expected: map[string]bool{ + "mytask1": false, "mytask12": false, }, }} @@ -844,19 +860,15 @@ func TestIsSkipped(t *testing.T) { if err != nil { t.Fatalf("Could not get a dag from the TC state %#v: %v", tc.state, err) } - stateMap := tc.state.ToMap() facts := PipelineRunFacts{ State: tc.state, TasksGraph: d, FinalTasksGraph: &dag.Graph{}, } - for taskName, isSkipped := range tc.expected { - rprt := stateMap[taskName] - if rprt == nil { - t.Fatalf("Could not get task %s from the state: %v", taskName, tc.state) - } - if d := cmp.Diff(isSkipped, rprt.Skip(&facts)); d != "" { - t.Errorf("Didn't get expected isSkipped %s", diff.PrintWantGot(d)) + actual := facts.SkipMap() + for taskname, expected := range tc.expected { + if d := cmp.Diff(actual[taskname], expected); d != "" { + t.Errorf("Skipped status did not match for a task %s", diff.PrintWantGot(d)) } } }) diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go index f9a5e965d70..539de2424ee 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate.go @@ -34,9 +34,23 @@ type PipelineRunState []*ResolvedPipelineRunTask // PipelineRunFacts is a collection of list of ResolvedPipelineTask, graph of DAG tasks, and graph of finally tasks type PipelineRunFacts struct { - State PipelineRunState - TasksGraph *dag.Graph + // State represents the state of all pipeline tasks in the pipeline + State PipelineRunState + + // TasksGraph is the graph of all the pipeline tasks in the main pipeline + TasksGraph *dag.Graph + + // FinalTasksGraph is the graph of all the final tasks executed once the main pipeline is complete FinalTasksGraph *dag.Graph + + // WontRun is a hash of all PipelineTask names that stores whether a task will be + // executed or because it's either not reachable via the DAG due to the pipeline state, + // or because it has failed conditions. + // We store this list in along the state, because it requires traversing the whole + // graph; this weay so we can build it once, when the reconcile starts, and update + // at the end of reconcile, instead of having to traverse the whole graph every time + // we want to know if a specific task will be skipped. + WontRun map[string]bool } // pipelineRunStatusCount holds the count of successful, failed, cancelled, skipped, and incomplete tasks @@ -185,6 +199,84 @@ func (facts *PipelineRunFacts) IsStopping() bool { return false } +// IsTaskSkipped returns true if the PipelineRun won't be scheduling the name task +func (facts *PipelineRunFacts) IsTaskSkipped(pipelineTaskName string) bool { + if facts.WontRun == nil { + // We do lazy initalisation of this map + facts.WontRun = facts.SkipMap() + } + wontRun, _ := facts.WontRun[pipelineTaskName] + return wontRun +} + +// ResetSkippedCache resets the skipped cache in the facts map +func (facts *PipelineRunFacts) ResetSkippedCache() { + facts.WontRun = nil +} + +// SkipMap returns map pipeline task -> bool that tells whether a +// PipelineTask will not be run because: +// (1) its When Expressions evaluated to false (and all parents were done) +// (2) its Condition Checks failed +// (3) any of it parent tasks were skipped +// (4) Pipeline is in stopping state (one of the PipelineTasks failed) +// Note that this means Skip returns false if a conditionCheck is in progress +func (facts *PipelineRunFacts) SkipMap() map[string]bool { + skipMapResult := make(map[string]bool) + pipelineTaskMap := facts.State.ToMap() + isStopping := facts.IsStopping() + skipMap(pipelineTaskMap, isStopping, skipMapResult, facts.TasksGraph) + return skipMapResult +} + +func skipMap(pipelineTaskMap map[string]*ResolvedPipelineRunTask, isStopping bool, partialSkipMap map[string]bool, g *dag.Graph, doneTasks ...string) { + // We can safely ignore errors here because we're artifically + // traversing the graph to discover skip tasks + nextTasks, _ := dag.GetSchedulable(g, doneTasks...) + // If there are tasks left, we need to continue to traverse the graph + if len(nextTasks) > 0 { + TASKLOOP: + for task := range nextTasks { + t, _ := pipelineTaskMap[task] + // Default to false, in case no check matches + // Regular task waiting for its turn to run + partialSkipMap[task] = false + // Checks that do not depend on parents + if t.IsStarted() { + partialSkipMap[task] = false + continue + } else if isStopping || t.conditionsSkip() { + partialSkipMap[task] = true + continue + } + // Check that depend on parents + node := g.Nodes[t.PipelineTask.Name] + parentsDone := true + for _, p := range node.Prev { + parent := pipelineTaskMap[p.Task.HashKey()] + // Parents must be tracked in the partial skipped map + skipped, _ := partialSkipMap[parent.PipelineTask.Name] + // One parent is enough to mark this as skipped + if skipped { + partialSkipMap[task] = true + continue TASKLOOP + } + done := parent.IsSuccessful() || parent.IsFailure() + // One parent not done is enough to mark parents as not done + if !done { + parentsDone = false + break + } + } + if parentsDone && t.whenExpressionsSkip() { + partialSkipMap[task] = true + } + } + newDoneTasks := append(doneTasks, nextTasks.List()...) + skipMap(pipelineTaskMap, isStopping, partialSkipMap, g, newDoneTasks...) + } +} + // DAGExecutionQueue returns a list of DAG tasks which needs to be scheduled next func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) { tasks := PipelineRunState{} @@ -296,9 +388,13 @@ func (facts *PipelineRunFacts) GetPipelineConditionStatus(pr *v1beta1.PipelineRu // GetSkippedTasks constructs a list of SkippedTask struct to be included in the PipelineRun Status func (facts *PipelineRunFacts) GetSkippedTasks() []v1beta1.SkippedTask { + // GetSkippedTasks is invoked at the end of the reconcile cycle + // to update the pipelinerun message with the number of skipped tasks + // `facts.IsTaskSkipped` reflects the status at the beginning of the + // reconcile, so we need to force it to recompute the correct skipped list var skipped []v1beta1.SkippedTask for _, rprt := range facts.State { - if rprt.Skip(facts) { + if facts.IsTaskSkipped(rprt.PipelineTask.Name) { skippedTask := v1beta1.SkippedTask{ Name: rprt.PipelineTask.Name, WhenExpressions: rprt.PipelineTask.WhenExpressions, @@ -315,7 +411,7 @@ func (facts *PipelineRunFacts) successfulOrSkippedDAGTasks() []string { tasks := []string{} for _, t := range facts.State { if facts.isDAGTask(t.PipelineTask.Name) { - if t.IsSuccessful() || t.Skip(facts) { + if t.IsSuccessful() || facts.IsTaskSkipped(t.PipelineTask.Name) { tasks = append(tasks, t.PipelineTask.Name) } } @@ -367,7 +463,7 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount { case t.IsCancelled(): s.Cancelled++ // increment skip counter since the task is skipped - case t.Skip(facts): + case facts.IsTaskSkipped(t.PipelineTask.Name): s.Skipped++ // increment incomplete counter since the task is pending and not executed yet default: diff --git a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go index 0407c89f19e..a2d331feda3 100644 --- a/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go +++ b/pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" "time" @@ -408,6 +409,7 @@ func TestGetNextTaskWithRetries(t *testing.T) { } func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { + largePipelineState := buildPipelineStateWithLargeDepencyGraph(t) tcs := []struct { name string state PipelineRunState @@ -454,6 +456,10 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { "not skipped since it failed", state: conditionCheckFailedWithOthersFailedState, expectedNames: []string{pts[5].Name}, + }, { + name: "large deps, not started", + state: largePipelineState, + expectedNames: []string{}, }} for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { @@ -474,6 +480,65 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) { } } +func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState { + t.Helper() + var task = &v1beta1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "task", + }, + Spec: v1beta1.TaskSpec{ + Steps: []v1beta1.Step{{Container: corev1.Container{ + Name: "step1", + }}}, + }, + } + var pipelineRunState PipelineRunState + pipelineRunState = []*ResolvedPipelineRunTask{{ + PipelineTask: &v1beta1.PipelineTask{ + Name: "t1", + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRun: nil, + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + }} + for i := 2; i < 80; i++ { + dependFrom := 1 + if i > 10 { + if i%10 == 0 { + dependFrom = i - 10 + } else { + dependFrom = i - (i % 10) + } + } + params := []v1beta1.Param{} + var alpha byte + for alpha = 'a'; alpha <= 'j'; alpha++ { + params = append(params, v1beta1.Param{ + Name: fmt.Sprintf("%c", alpha), + Value: v1beta1.ArrayOrString{ + Type: v1beta1.ParamTypeString, + StringVal: fmt.Sprintf("$(tasks.t%d.results.%c)", dependFrom, alpha), + }, + }) + } + pipelineRunState = append(pipelineRunState, &ResolvedPipelineRunTask{ + PipelineTask: &v1beta1.PipelineTask{ + Name: fmt.Sprintf("t%d", i), + Params: params, + TaskRef: &v1beta1.TaskRef{Name: "task"}, + }, + TaskRun: nil, + ResolvedTaskResources: &resources.ResolvedTaskResources{ + TaskSpec: &task.Spec, + }, + }, + ) + } + return pipelineRunState +} + func TestPipelineRunState_GetFinalTasks(t *testing.T) { tcs := []struct { name string