Skip to content

Commit

Permalink
Implement Skipping
Browse files Browse the repository at this point in the history
When  `WhenExpressions` evaluate to `False`, the `Task` and its branch
(of dependent `Tasks`) will be skipped by default while the rest of the
`Pipeline` will execute.

A `Task` branch is made up of dependent `Tasks`, where there are two
types of dependencies:
- _Resource dependency_: based on resources needed from parent `Task`,
which includes [`Results`](#using-results) and [`Resources`](#specifying-resources).
- _Ordering dependency_: based on [`runAfter`](#using-the-runafter-parameter)
which provides sequencing of `Tasks` when there may not be resource
dependencies.

In some use cases, when `WhenExpressions` evaluate to `False`,
users need to skip the guarded `Task` only and allow ordering-dependent
`Tasks` to execute.

In `Tasks` guarded using `WhenExpressions` and that have ordering
dependencies only, use `whenSkipped` to specify what happens when the
guarded `Task` is skipped.

The `whenSkipped` takes either `SkipBranch` or `RunBranch` policies.
To allow execution of ordering-dependent `Tasks`, set `whenSkipped` to
`RunBranch` in the guarded parent task.

If neither `SkipBranch` nor `RunBranch` is specified, the default one
is `SkipBranch`.

However, setting `whenSkipped` in `Tasks` without `WhenExpressions` or
`Tasks` with resource dependencies is invalid and will cause `Pipeline`
validation errors.
  • Loading branch information
jerop committed Mar 1, 2021
1 parent 93e368d commit 433219a
Show file tree
Hide file tree
Showing 12 changed files with 614 additions and 2 deletions.
31 changes: 31 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,37 @@ tasks:
name: lint-source
```

When `WhenExpressions` evaluate to `False`, the `Task` and its branch (of dependent `Tasks`) will be skipped by default while the rest of the `Pipeline` will execute.

A `Task` branch is made up of dependent `Tasks`, where there are two types of dependencies:
- _Resource dependency_: based on resources needed from parent `Task`, which includes [`Results`](#using-results) and [`Resources`](#specifying-resources).
- _Ordering dependency_: based on [`runAfter`](#using-the-runafter-parameter) which provides sequencing of `Tasks` when there may not be resource dependencies.

In `Tasks` guarded using `WhenExpressions` and that have ordering dependencies only, use `whenSkipped` to specify what happens when the guarded `Task` is skipped.
The `whenSkipped` take either `SkipBranch` or `RunBranch` policies. To allow execution of ordering-dependent `Tasks`, set `whenSkipped` to `RunBranch` in the guarded parent task.
If neither `SkipBranch` nor `RunBranch` is specified, the default one is `SkipBranch`.

However, setting `whenSkipped` in `Tasks` without `WhenExpressions` or `Tasks` with resource dependencies is invalid and will cause `Pipeline` validation errors.

In this example, `task-should-be-skipped` will be skipped and `task-should-be-executed` will be executed:

```yaml
tasks:
- name: task-should-be-skipped
when:
- input: "foo"
operator: in
values: ["bar"]
whenSkipped: RunBranch
taskRef:
name: exit-1
- name: task-should-be-executed
runAfter:
- task-should-be-skipped
taskRef:
name: exit-0
```

For an end-to-end example, see [PipelineRun with WhenExpressions](../examples/v1beta1/pipelineruns/pipelinerun-with-when-expressions.yaml).

When `WhenExpressions` are specified in a `Task`, [`Conditions`](#guard-task-execution-using-conditions) should not be specified in the same `Task`. The `Pipeline` will be rejected as invalid if both `WhenExpressions` and `Conditions` are included.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ spec:
- input: "$(params.path)"
operator: notin
values: ["README.md"]
whenSkipped: RunBranch
taskSpec:
steps:
- name: echo
Expand All @@ -99,6 +100,18 @@ spec:
- name: echo
image: ubuntu
script: exit 1
- name: task-should-be-executed-after-skipped-parent-task # whenSkipped set to runBranch in parent task
runAfter:
- task-should-be-skipped-2
when:
- input: "$(params.path)"
operator: in
values: ["README.md"]
taskSpec:
steps:
- name: echo
image: ubuntu
script: 'echo created README.md'
finally:
- name: finally-task-should-be-skipped-1 # when expression using execution status, evaluates to false
when:
Expand Down
9 changes: 9 additions & 0 deletions internal/builder/v1beta1/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,15 @@ func PipelineTaskWhenExpression(input string, operator selection.Operator, value
}
}

// PipelineTaskWhenSkipped adds a WhenSkippedPolicy that describes should happen when a Task is skipped because its
// WhenExpressions evaluated to false. WhenSkippedPolicy can be specified only in Tasks that are guarded with
// WhenExpressions and do not have resource dependencies.
func PipelineTaskWhenSkipped(whenSkippedPolicy v1beta1.WhenSkippedPolicy) PipelineTaskOp {
return func(pt *v1beta1.PipelineTask) {
pt.WhenSkipped = whenSkippedPolicy
}
}

// PipelineTaskWorkspaceBinding adds a workspace with the specified name, workspace and subpath on a PipelineTask.
func PipelineTaskWorkspaceBinding(name, workspace, subPath string) PipelineTaskOp {
return func(pt *v1beta1.PipelineTask) {
Expand Down
22 changes: 22 additions & 0 deletions internal/builder/v1beta1/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ func TestPipeline(t *testing.T) {
tb.RunAfter("foo"),
tb.PipelineTaskTimeout(5*time.Second),
),
tb.PipelineTask("let-you-down", "let-you-down",
tb.PipelineTaskWhenExpression("foo", selection.In, []string{"bar"}),
tb.PipelineTaskWhenSkipped(v1beta1.RunBranch),
tb.RunAfter("foo"),
tb.PipelineTaskTimeout(5*time.Second),
),
tb.PipelineTask("make-you-cry", "make-you-cry",
tb.RunAfter("let-you-down"),
tb.PipelineTaskTimeout(5*time.Second),
),
tb.PipelineTask("foo", "", tb.PipelineTaskSpec(getTaskSpec())),
tb.PipelineTask("task-with-taskSpec", "",
tb.TaskSpecMetadata(v1beta1.PipelineTaskMetadata{
Expand Down Expand Up @@ -145,6 +155,18 @@ func TestPipeline(t *testing.T) {
WhenExpressions: []v1beta1.WhenExpression{{Input: "foo", Operator: selection.In, Values: []string{"foo", "bar"}}},
RunAfter: []string{"foo"},
Timeout: &metav1.Duration{Duration: 5 * time.Second},
}, {
Name: "let-you-down",
TaskRef: &v1beta1.TaskRef{Name: "let-you-down"},
WhenExpressions: []v1beta1.WhenExpression{{Input: "foo", Operator: selection.In, Values: []string{"bar"}}},
WhenSkipped: v1beta1.RunBranch,
RunAfter: []string{"foo"},
Timeout: &metav1.Duration{Duration: 5 * time.Second},
}, {
Name: "make-you-cry",
TaskRef: &v1beta1.TaskRef{Name: "make-you-cry"},
RunAfter: []string{"let-you-down"},
Timeout: &metav1.Duration{Duration: 5 * time.Second},
}, {
Name: "foo",
TaskSpec: &v1beta1.EmbeddedTask{
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/pipeline/v1beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ type PipelineTask struct {
// +optional
WhenExpressions WhenExpressions `json:"when,omitempty"`

// WhenSkipped specifies what should happen when a Task is skipped because its WhenExpressions evaluated to false.
// WhenSkipped can be specified only in Tasks that are guarded with WhenExpressions and do not have resource dependencies.
// If WhenSkipped is not specified, the default policy is SkipBranch.
// +optional
WhenSkipped WhenSkippedPolicy `json:"whenSkipped,omitempty"`

// Retries represents how many times this task should be retried in case of task failure: ConditionSucceeded set to False
// +optional
Retries int `json:"retries,omitempty"`
Expand Down
49 changes: 49 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (ps *PipelineSpec) Validate(ctx context.Context) (errs *apis.FieldError) {
errs = errs.Also(validateTasksAndFinallySection(ps))
errs = errs.Also(validateFinalTasks(ps.Tasks, ps.Finally))
errs = errs.Also(validateWhenExpressions(ps.Tasks, ps.Finally))
errs = errs.Also(validateWhenSkipped(ps.Tasks))
return errs
}

Expand Down Expand Up @@ -618,3 +619,51 @@ func validateGraph(tasks []PipelineTask) *apis.FieldError {
}
return nil
}

func validateWhenSkipped(tasks []PipelineTask) (errs *apis.FieldError) {
d, err := dag.Build(PipelineTaskList(tasks), PipelineTaskList(tasks).Deps())
if err != nil {
return apis.ErrInvalidValue(err.Error(), "tasks")
}
for i, t := range tasks {
if t.WhenSkipped != "" {
if t.WhenExpressions == nil || hasResourceDependencies(t, toMap(tasks), d) {
errs = errs.Also(apis.ErrDisallowedFields("whenSkipped").ViaFieldIndex("tasks", i))
}
if t.WhenSkipped != RunBranch && t.WhenSkipped != SkipBranch {
errs = errs.Also(apis.ErrInvalidValue("RunBranch and SkipBranch only allowed in", "whenSkipped").ViaFieldIndex("tasks", i))
}
}
}
return errs
}

func toMap(tasks []PipelineTask) map[string]PipelineTask {
taskMap := make(map[string]PipelineTask)
for _, task := range tasks {
taskMap[task.Name] = task
}
return taskMap
}

func hasResourceDependencies(parentTask PipelineTask, taskMap map[string]PipelineTask, d *dag.Graph) bool {
if node, ok := d.Nodes[parentTask.Name]; ok {
for _, childNode := range node.Next {
childTask := taskMap[childNode.Task.HashKey()]
if isResourceDependent(parentTask, childTask) {
return true
}
}
}
return false
}

func isResourceDependent(parentTask PipelineTask, childTask PipelineTask) bool {
resourceDeps := childTask.resourceDeps()
for _, resourceParent := range resourceDeps {
if resourceParent == parentTask.Name {
return true
}
}
return false
}
112 changes: 112 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,69 @@ func TestPipelineSpec_Validate_Failure(t *testing.T) {
},
expectedError: *apis.ErrGeneric(`invalid value: couldn't add link between foo and bar: task foo depends on bar but bar wasn't present in Pipeline`, "tasks").Also(
apis.ErrInvalidValue("expected resource great-resource to be from task bar, but task bar doesn't exist", "tasks[1].resources.inputs[0].from")),
}, {
name: "invalid pipeline with one pipeline task having whenSkipped without WhenExpressions",
ps: &PipelineSpec{
Description: "this is an invalid pipeline with invalid pipeline task",
Tasks: []PipelineTask{{
Name: "valid-pipeline-task",
TaskRef: &TaskRef{Name: "foo-task"},
}, {
Name: "invalid-pipeline-task",
TaskRef: &TaskRef{Name: "foo-task"},
WhenSkipped: RunBranch,
}},
},
expectedError: apis.FieldError{
Message: `must not set the field(s)`,
Paths: []string{"tasks[1].whenSkipped"},
},
}, {
name: "invalid pipeline with one pipeline task having whenSkipped with resource dependencies",
ps: &PipelineSpec{
Description: "this is an invalid pipeline with invalid pipeline task",
Tasks: []PipelineTask{{
Name: "invalid-pipeline-task",
TaskRef: &TaskRef{Name: "bar-task"},
WhenExpressions: []WhenExpression{{
Input: "foo",
Operator: selection.In,
Values: []string{"foo"},
}},
WhenSkipped: RunBranch,
}, {
Name: "valid-pipeline-task",
TaskRef: &TaskRef{Name: "bar-task"},
WhenExpressions: []WhenExpression{{
Input: "$(tasks.invalid-pipeline-task.results.foo)",
Operator: selection.In,
Values: []string{"foo"},
}},
}},
},
expectedError: apis.FieldError{
Message: `must not set the field(s)`,
Paths: []string{"tasks[0].whenSkipped"},
},
}, {
name: "invalid pipeline with one pipeline task having whenSkipped with invalid value",
ps: &PipelineSpec{
Description: "this is an invalid pipeline with invalid pipeline task",
Tasks: []PipelineTask{{
Name: "invalid-pipeline-task",
TaskRef: &TaskRef{Name: "bar-task"},
WhenExpressions: []WhenExpression{{
Input: "foo",
Operator: selection.In,
Values: []string{"foo"},
}},
WhenSkipped: "invalidValue",
}},
},
expectedError: apis.FieldError{
Message: `invalid value: RunBranch and SkipBranch only allowed in`,
Paths: []string{"tasks[0].whenSkipped"},
},
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -748,6 +811,55 @@ func TestPipelineSpec_Validate_Failure_CycleDAG(t *testing.T) {
}
}

func TestPipelineSpec_Validate_Success(t *testing.T) {
tests := []struct {
name string
ps *PipelineSpec
}{{
name: "valid pipeline with one pipeline task having whenSkipped with valid when expression",
ps: &PipelineSpec{
Description: "this is an valid pipeline with valid pipeline task",
Tasks: []PipelineTask{{
Name: "valid-pipeline-task",
TaskRef: &TaskRef{Name: "foo-task"},
WhenExpressions: []WhenExpression{{
Input: "foo",
Operator: selection.In,
Values: []string{"foo"},
}},
WhenSkipped: RunBranch,
}},
},
}, {
name: "valid pipeline with one pipeline task having whenSkipped with ordering dependencies",
ps: &PipelineSpec{
Description: "this is an invalid pipeline with invalid pipeline task",
Tasks: []PipelineTask{{
Name: "a-task",
TaskRef: &TaskRef{Name: "bar-task"},
WhenExpressions: []WhenExpression{{
Input: "foo",
Operator: selection.NotIn,
Values: []string{"foo"},
}},
WhenSkipped: RunBranch,
}, {
Name: "b-task",
TaskRef: &TaskRef{Name: "bar-task"},
RunAfter: []string{"a-task"},
}},
},
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.ps.Validate(context.Background())
if err != nil {
t.Errorf("PipelineSpec.Validate() returned an error for valid pipelineSpec: %s, %s", tt.name, err)
}
})
}
}

func TestValidatePipelineTasks_Success(t *testing.T) {
tests := []struct {
name string
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,10 @@
"$ref": "#/definitions/v1beta1.WhenExpression"
}
},
"whenSkipped": {
"description": "WhenSkipped specifies what should happen when a Task is skipped because its WhenExpressions evaluated to false. WhenSkipped can be specified only in Tasks that are guarded with WhenExpressions and do not have resource dependencies. If WhenSkipped is not specified, the default policy is SkipBranch.",
"type": "string"
},
"workspaces": {
"description": "Workspaces maps workspaces from the pipeline spec to the workspaces declared in the Task.",
"type": "array",
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1beta1/when_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,13 @@ func (wes WhenExpressions) ReplaceWhenExpressionsVariables(replacements map[stri
}
return replaced
}

// WhenSkippedPolicy describes what should happen when a Task is skipped because its WhenExpressions evaluated to false.
// WhenSkippedPolicy can be specified only in Tasks that are guarded with WhenExpressions and do not have resource dependencies.
// If none of the following policies is specified, the default one is SkipBranch.
type WhenSkippedPolicy string

const (
SkipBranch WhenSkippedPolicy = "SkipBranch"
RunBranch WhenSkippedPolicy = "RunBranch"
)
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,13 @@ func (t *ResolvedPipelineRunTask) parentTasksSkip(facts *PipelineRunFacts) bool
stateMap := facts.State.ToMap()
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
for _, p := range node.Prev {
if stateMap[p.Task.HashKey()].Skip(facts) {
parentTask := stateMap[p.Task.HashKey()]
if parentTask.Skip(facts) {
if parentTask.PipelineTask.WhenSkipped == v1beta1.RunBranch {
if parentTask.whenExpressionsSkip(facts) {
continue
}
}
return true
}
}
Expand Down
Loading

0 comments on commit 433219a

Please sign in to comment.