Skip to content

Commit

Permalink
Add Pipeline-level timeouts
Browse files Browse the repository at this point in the history
Rebased and squashed the initial work, more to come.

Fixes tektoncd#222
  • Loading branch information
abayer committed Jan 25, 2019
1 parent a561dd3 commit cc1c191
Show file tree
Hide file tree
Showing 18 changed files with 240 additions and 24 deletions.
12 changes: 12 additions & 0 deletions docs/using.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [How do I make Resources?](#creating-resources)
- [How do I run a Pipeline?](#running-a-pipeline)
- [How do I run a Task on its own?](#running-a-task)
- [How do I ensure a Pipeline or Task stops if it runs for too long?](#timing-out-pipelines-and-tasks)
- [How do I troubleshoot a PipelineRun?](#troubleshooting)
- [How do I follow logs?](../test/logs/README.md)

Expand Down Expand Up @@ -913,6 +914,17 @@ Below is an example on how to create a storage resource with service account.
secretKey: service_account.json
```

## Timing Out Pipelines and Tasks

If you want to ensure that your `Pipeline` or `Task` will be stopped if it runs
past a certain duration, you can use the `Timeout` field on either `Pipeline`
or `Task`. In both cases, add the following to the `spec`:

```yaml
spec:
timeout: 5m
```

## Troubleshooting

All objects created by the build-pipeline controller show the lineage of where
Expand Down
10 changes: 7 additions & 3 deletions pkg/apis/pipeline/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ import (

// PipelineSpec defines the desired state of PipeLine.
type PipelineSpec struct {
Resources []PipelineDeclaredResource `json:"resources"`
Tasks []PipelineTask `json:"tasks"`
Generation int64 `json:"generation,omitempty"`
Resources []PipelineDeclaredResource `json:"resources"`
Tasks []PipelineTask `json:"tasks"`
// Time after which the Pipeline times out. Defaults to never.
// Refer Go's ParseDuration documentation for expected format: https://golang.org/pkg/time/#ParseDuration
// +optional
Timeout *metav1.Duration `json:"timeout,omitempty"`
Generation int64 `json:"generation,omitempty"`
}

// PipelineStatus does not contain anything because Pipelines on their own
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,5 +124,13 @@ func (ps *PipelineSpec) Validate() *apis.FieldError {
if err := validateProvidedBy(ps.Tasks); err != nil {
return apis.ErrInvalidValue(err.Error(), "spec.tasks.resources.inputs.providedBy")
}

if ps.Timeout != nil {
// timeout should be a valid duration of at least 0.
if ps.Timeout.Duration <= 0 {
return apis.ErrInvalidValue(fmt.Sprintf("%s should be > 0", ps.Timeout.Duration.String()), "spec.timeout")
}
}

return nil
}
18 changes: 18 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package v1alpha1_test

import (
"testing"
"time"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
tb "github.com/knative/build-pipeline/test/builder"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPipelineSpec_Validate_Error(t *testing.T) {
Expand Down Expand Up @@ -100,6 +102,14 @@ func TestPipelineSpec_Validate_Error(t *testing.T) {
tb.PipelineTaskInputResource("wow-image", "wonderful-resource", tb.ProvidedBy("bar"))),
)),
},
{
name: "negative pipeline timeout",
p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec(
tb.PipelineTask("foo", "foo-task"),
tb.PipelineTask("bar", "bar-task"),
tb.PipelineTimeout(&metav1.Duration{Duration: -48 * time.Hour}),
)),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -144,6 +154,14 @@ func TestPipelineSpec_Validate_Valid(t *testing.T) {
tb.PipelineTaskInputResource("wow-image", "wonderful-resource", tb.ProvidedBy("bar"))),
)),
},
{
name: "valid timeout",
p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec(
tb.PipelineTask("foo", "foo-task"),
tb.PipelineTask("bar", "bar-task"),
tb.PipelineTimeout(&metav1.Duration{Duration: 24 * time.Hour}),
)),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"fmt"
"time"

duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"github.com/knative/pkg/webhook"
Expand Down Expand Up @@ -107,6 +108,12 @@ type PipelineRunStatus struct {
// In #107 should be updated to hold the location logs have been uploaded to
// +optional
Results *Results `json:"results,omitempty"`
// StartTime is the time the PipelineRun is actually started.
// +optional
StartTime *metav1.Time `json:"startTime,omitempty"`
// CompletionTime is the time the PipelineRun completed.
// +optional
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
// map of TaskRun Status with the taskRun name as the key
//+optional
TaskRuns map[string]TaskRunStatus `json:"taskRuns,omitempty"`
Expand All @@ -124,6 +131,9 @@ func (pr *PipelineRunStatus) InitializeConditions() {
if pr.TaskRuns == nil {
pr.TaskRuns = make(map[string]TaskRunStatus)
}
if pr.StartTime.IsZero() {
pr.StartTime = &metav1.Time{time.Now()}
}
pipelineRunCondSet.Manage(pr).InitializeConditions()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func TestInitializeConditions(t *testing.T) {
t.Fatalf("PipelineRun status not initialized correctly")
}

if p.Status.StartTime.IsZero() {
t.Fatalf("PipelineRun StartTime not initialized correctly")
}

p.Status.TaskRuns["fooTask"] = TaskRunStatus{}

p.Status.InitializeConditions()
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"fmt"
"time"

"github.com/knative/pkg/apis"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
Expand Down Expand Up @@ -136,6 +137,9 @@ func (tr *TaskRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha
return taskRunCondSet.Manage(tr).GetCondition(t)
}
func (tr *TaskRunStatus) InitializeConditions() {
if tr.StartTime.IsZero() {
tr.StartTime = &metav1.Time{time.Now()}
}
taskRunCondSet.Manage(tr).InitializeConditions()
}

Expand Down
1 change: 1 addition & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (ts *TaskRunSpec) Validate() *apis.FieldError {
return err
}
}

return nil
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 24 additions & 8 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,16 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, serviceAccount)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, serviceAccount, p.Spec.Timeout)
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}
}

before := pr.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
after := resources.GetPipelineConditionStatus(pr.Name, pipelineState, c.Logger)
after := resources.GetPipelineConditionStatus(pr.Name, pipelineState, c.Logger, pr.Status.StartTime, p.Spec.Timeout)

pr.Status.SetCondition(after)

reconciler.EmitEvent(c.Recorder, before, after, pr)
Expand All @@ -321,7 +322,7 @@ func updateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.R
}
}

func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, sa string) (*v1alpha1.TaskRun, error) {
func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, sa string, pipelineTimeout *metav1.Duration) (*v1alpha1.TaskRun, error) {
tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rprt.TaskRunName,
Expand All @@ -333,17 +334,32 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
},
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Name: rprt.ResolvedTaskResources.TaskName,
},
Inputs: v1alpha1.TaskRunInputs{
Params: rprt.PipelineTask.Params,
},
ServiceAccount: sa,
},
}
}}

resources.WrapSteps(&tr.Spec, rprt.PipelineTask, rprt.ResolvedTaskResources.Inputs, rprt.ResolvedTaskResources.Outputs)

if pipelineTimeout != nil {
pTimeoutTime := pr.Status.StartTime.Add(pipelineTimeout.Duration)
if rprt.ResolvedTaskResources.TaskSpec.Timeout == nil || time.Now().Add(rprt.ResolvedTaskResources.TaskSpec.Timeout.Duration).After(pTimeoutTime) {
// Just in case something goes awry and we're creating the TaskRun after it should have already timed out,
// set a timeout of 0.
taskRunTimeout := pTimeoutTime.Sub(time.Now())
if taskRunTimeout < 0 {
taskRunTimeout = 0
}
rprt.ResolvedTaskResources.TaskSpec.Timeout = &metav1.Duration{Duration: taskRunTimeout}
}
tr.Spec.TaskSpec = rprt.ResolvedTaskResources.TaskSpec
} else {
tr.Spec.TaskRef = &v1alpha1.TaskRef{
Name: rprt.ResolvedTaskResources.TaskName,
}
}

return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(pr.Namespace).Create(tr)
}

Expand Down
67 changes: 62 additions & 5 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,7 @@ func TestReconcileOnCompletedPipelineRun(t *testing.T) {
})),
)}
ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hellow-world"),
))}
tb.PipelineTask("hello-world-1", "hellow-world")))}
ts := []*v1alpha1.Task{tb.Task("hello-world", "foo")}
d := test.Data{
PipelineRuns: prs,
Expand Down Expand Up @@ -496,6 +495,7 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) {
),
),
}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Expand All @@ -517,12 +517,69 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) {

// Check that the PipelineRun was reconciled correctly
reconciledRun, err := clients.Pipeline.Pipeline().PipelineRuns("foo").Get("test-pipeline-run-cancelled", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

// This PipelineRun should still be complete and false, and the status should reflect that
if !reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded).IsFalse() {
t.Errorf("Expected PipelineRun status to be complete and false, but was %v", reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded))
}
}

func TestReconcileWithTimeout(t *testing.T) {
ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hello-world"),
tb.PipelineTimeout(&metav1.Duration{Duration: 12 * time.Hour}),
))}
prs := []*v1alpha1.PipelineRun{tb.PipelineRun("test-pipeline-run-with-timeout", "foo",
tb.PipelineRunSpec("test-pipeline",
tb.PipelineRunServiceAccount("test-sa"),
),
tb.PipelineRunStatus(
tb.PipelineRunStartTime(time.Now().AddDate(0, 0, -1))),
)}
ts := []*v1alpha1.Task{tb.Task("hello-world", "foo")}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
}

// create fake recorder for testing
fr := record.NewFakeRecorder(2)

testAssets := getPipelineRunController(d, fr)
c := testAssets.Controller
clients := testAssets.Clients

err := c.Reconciler.Reconcile(context.Background(), "foo/test-pipeline-run-with-timeout")
if err != nil {
t.Errorf("Did not expect to see error when reconciling completed PipelineRun but saw %s", err)
}

// Check that the PipelineRun was reconciled correctly
reconciledRun, err := clients.Pipeline.Pipeline().PipelineRuns("foo").Get("test-pipeline-run-with-timeout", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

// The PipelineRun should be timed out.
if reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded).Reason != resources.ReasonTimedOut {
t.Errorf("Expected PipelineRun to be timed out, but condition reason is %s", reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded))
}

// Check that the expected TaskRun was created
actual := clients.Pipeline.Actions()[0].(ktesting.CreateAction).GetObject().(*v1alpha1.TaskRun)
if actual == nil {
t.Errorf("Expected a TaskRun to be created, but it wasn't.")
}

// There should be a time out for the TaskRun
if actual.Spec.TaskSpec.Timeout == nil {
t.Fatalf("TaskSpec.Timeout shouldn't be nil")
}

// The TaskRun timeout should be less than or equal to the PipelineRun timeout.
if actual.Spec.TaskSpec.Timeout.Duration > ps[0].Spec.Timeout.Duration {
t.Errorf("TaskRun timeout %s should be less than or equal to PipelineRun timeout %s", actual.Spec.TaskSpec.Timeout.Duration.String(), ps[0].Spec.Timeout.Duration.String())
}
}
Loading

0 comments on commit cc1c191

Please sign in to comment.