Skip to content

Commit

Permalink
Fix recursion issue on Skip
Browse files Browse the repository at this point in the history
The pipelinerun state is made of resolved pipelinerun tasks (rprt),
which are build from the actual status of the associated taskruns.

It is computationaly easy to know if a taskrun started, or completed
successfully or unsuccessfully; however determining whether a taskrun
has been skipped or will be skipped in the pipeline run execution,
requires evaluating the entire pipeline run status and associated dag.

The Skip method used to apply to a single rprt, evaluate the entire
pipeline run status and dag, return whether the specific rprt was
going to be skipped, and throw away the rest.

We used to invoke the Skip method on every rprt in the pipeline state
to calculate candidate tasks for execution. To make things worse,
we also invoked the "Skip" method as part of the "isDone", defined as
a logical OR between "isSuccessful", "isFailed" and "Skip".

With this change we compute the list of tasks to be skipped once,
incrementally, by caching the results of each invocation of 'Skip'.
We store the result in a map to the pipelinerun facts, along with
pipelinerun state and associated dags. We introdce a new method on the
pipelinerun facts called "IsTaskSkipped".

This solution manages to hide some of the details of the skip logic from
the core reconciler logic, bit it still requires the cache to be
manually reset in a couple of places. I believe further refactor could
help, but I wanted to keep this PR as little as possible.
I will further pursue this work by revining #2821

This changes adds a unit test that reproduces the issue in #3521, which
used to fail (with timeout 30s) and now succeedes for pipelines roughly
up to 120 tasks / 120 links. On my laptop, going beyond 120 tasks/links
takes longer than 30s, so I left the unit test at 80 to avoid
introducing a flaky test in CI. There is still work to do to improve
this further, some profiling / tracing work might help.

Breaking large pipelines in logical groups (branches or pipelines in
pipelines) would help reduce the complexity and computational cost for
very large pipelines.

Fixes #3521

Co-authored-by: Scott <[email protected]>

Signed-off-by: Andrea Frittoli <[email protected]>
  • Loading branch information
afrittoli committed Nov 16, 2020
1 parent cdc171b commit aaec640
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 7 deletions.
6 changes: 6 additions & 0 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun, get
return err
}

// Reset the skipped status to trigger recalculation
pipelineRunFacts.ResetSkippedCache()

after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger)
switch after.Status {
case corev1.ConditionTrue:
Expand Down Expand Up @@ -565,6 +568,9 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
}

resources.ApplyTaskResults(nextRprts, resolvedResultRefs)
// After we apply Task Results, we may be able to evaluate more
// when expressions, so reset the skipped cache
pipelineRunFacts.ResetSkippedCache()

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)
Expand Down
24 changes: 17 additions & 7 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,7 @@ func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool
return true
}

// Skip returns true if a PipelineTask will not be run because
// (1) its When Expressions evaluated to false
// (2) its Condition Checks failed
// (3) its parent task was skipped
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means Skip returns false if a conditionCheck is in progress
func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
func (t *ResolvedPipelineRunTask) skip(facts *PipelineRunFacts) bool {
if facts.isFinalTask(t.PipelineTask.Name) || t.IsStarted() {
return false
}
Expand All @@ -151,6 +145,22 @@ func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
return false
}

// Skip returns true if a PipelineTask will not be run because
// (1) its When Expressions evaluated to false
// (2) its Condition Checks failed
// (3) its parent task was skipped
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means Skip returns false if a conditionCheck is in progress
func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
if facts.SkipCache == nil {
facts.SkipCache = make(map[string]bool)
}
if _, cached := facts.SkipCache[t.PipelineTask.Name]; !cached {
facts.SkipCache[t.PipelineTask.Name] = t.skip(facts) // t.skip() is same as our existing t.Skip()
}
return facts.SkipCache[t.PipelineTask.Name]
}

func (t *ResolvedPipelineRunTask) conditionsSkip() bool {
if len(t.ResolvedConditionChecks) > 0 {
if t.ResolvedConditionChecks.IsDone() && !t.ResolvedConditionChecks.IsSuccess() {
Expand Down
15 changes: 15 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type PipelineRunFacts struct {
State PipelineRunState
TasksGraph *dag.Graph
FinalTasksGraph *dag.Graph

// SkipCache is a hash of PipelineTask names that stores whether a task will be
// executed or not, because it's either not reachable via the DAG due to the pipeline
// state, or because it has failed conditions.
// We cache this data along the state, because it's expensive to compute, it requires
// traversing potentially the whole graph; this way it can built incrementally, when
// needed, via the `Skip` method in pipelinerunresolution.go
// The skip data is sensitive to changes in the state. The ResetSkippedCache method
// can be used to clean the cache and force re-computation when needed.
SkipCache map[string]bool
}

// pipelineRunStatusCount holds the count of successful, failed, cancelled, skipped, and incomplete tasks
Expand All @@ -53,6 +63,11 @@ type pipelineRunStatusCount struct {
Incomplete int
}

// ResetSkippedCache resets the skipped cache in the facts map
func (facts *PipelineRunFacts) ResetSkippedCache() {
facts.SkipCache = make(map[string]bool)
}

// ToMap returns a map that maps pipeline task name to the resolved pipeline run task
func (state PipelineRunState) ToMap() map[string]*ResolvedPipelineRunTask {
m := make(map[string]*ResolvedPipelineRunTask)
Expand Down
65 changes: 65 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package resources

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -408,6 +409,7 @@ func TestGetNextTaskWithRetries(t *testing.T) {
}

func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
largePipelineState := buildPipelineStateWithLargeDepencyGraph(t)
tcs := []struct {
name string
state PipelineRunState
Expand Down Expand Up @@ -454,6 +456,10 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
"not skipped since it failed",
state: conditionCheckFailedWithOthersFailedState,
expectedNames: []string{pts[5].Name},
}, {
name: "large deps, not started",
state: largePipelineState,
expectedNames: []string{},
}}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand All @@ -474,6 +480,65 @@ func TestPipelineRunState_SuccessfulOrSkippedDAGTasks(t *testing.T) {
}
}

func buildPipelineStateWithLargeDepencyGraph(t *testing.T) PipelineRunState {
t.Helper()
var task = &v1beta1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "task",
},
Spec: v1beta1.TaskSpec{
Steps: []v1beta1.Step{{Container: corev1.Container{
Name: "step1",
}}},
},
}
var pipelineRunState PipelineRunState
pipelineRunState = []*ResolvedPipelineRunTask{{
PipelineTask: &v1beta1.PipelineTask{
Name: "t1",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}}
for i := 2; i < 60; i++ {
dependFrom := 1
if i > 10 {
if i%10 == 0 {
dependFrom = i - 10
} else {
dependFrom = i - (i % 10)
}
}
params := []v1beta1.Param{}
var alpha byte
for alpha = 'a'; alpha <= 'j'; alpha++ {
params = append(params, v1beta1.Param{
Name: fmt.Sprintf("%c", alpha),
Value: v1beta1.ArrayOrString{
Type: v1beta1.ParamTypeString,
StringVal: fmt.Sprintf("$(tasks.t%d.results.%c)", dependFrom, alpha),
},
})
}
pipelineRunState = append(pipelineRunState, &ResolvedPipelineRunTask{
PipelineTask: &v1beta1.PipelineTask{
Name: fmt.Sprintf("t%d", i),
Params: params,
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
},
)
}
return pipelineRunState
}

func TestPipelineRunState_GetFinalTasks(t *testing.T) {
tcs := []struct {
name string
Expand Down

0 comments on commit aaec640

Please sign in to comment.