From 23a2b91147417e25094d23a552fe14c04163a56c Mon Sep 17 00:00:00 2001 From: Andrew Bayer Date: Tue, 29 Jan 2019 12:38:09 -0500 Subject: [PATCH] Add Pipeline-level timeouts, implement Task-level timeouts Fixes #222 --- docs/using.md | 12 ++ pkg/apis/pipeline/v1alpha1/pipeline_types.go | 10 +- .../pipeline/v1alpha1/pipeline_validation.go | 8 + .../v1alpha1/pipeline_validation_test.go | 18 +++ .../pipeline/v1alpha1/pipelinerun_types.go | 10 ++ .../v1alpha1/pipelinerun_types_test.go | 4 + pkg/apis/pipeline/v1alpha1/taskrun_types.go | 4 + .../pipeline/v1alpha1/taskrun_validation.go | 1 + .../v1alpha1/zz_generated.deepcopy.go | 27 ++++ .../v1alpha1/pipelinerun/pipelinerun.go | 29 +++- .../v1alpha1/pipelinerun/pipelinerun_test.go | 77 ++++++++- .../resources/pipelinerunresolution.go | 33 +++- .../resources/pipelinerunresolution_test.go | 4 +- pkg/reconciler/v1alpha1/taskrun/taskrun.go | 50 ++++++ .../v1alpha1/taskrun/taskrun_test.go | 40 +++++ test/builder/pipeline.go | 15 ++ test/builder/pipeline_test.go | 7 +- test/builder/task.go | 7 + test/taskrun_test.go | 43 +++++ test/timeout_test.go | 151 ++++++++++++++++++ 20 files changed, 525 insertions(+), 25 deletions(-) create mode 100644 test/timeout_test.go diff --git a/docs/using.md b/docs/using.md index 9e304463f63..1bd16891c07 100644 --- a/docs/using.md +++ b/docs/using.md @@ -5,6 +5,7 @@ - [How do I make Resources?](#creating-resources) - [How do I run a Pipeline?](#running-a-pipeline) - [How do I run a Task on its own?](#running-a-task) +- [How do I ensure a Pipeline or Task stops if it runs for too long?](#timing-out-pipelines-and-tasks) - [How do I troubleshoot a PipelineRun?](#troubleshooting) - [How do I follow logs?](../test/logs/README.md) @@ -992,6 +993,17 @@ Below is an example on how to create a storage resource with service account. secretKey: service_account.json ``` +## Timing Out Pipelines and Tasks + +If you want to ensure that your `Pipeline` or `Task` will be stopped if it runs +past a certain duration, you can use the `Timeout` field on either `Pipeline` +or `Task`. In both cases, add the following to the `spec`: + +```yaml +spec: + timeout: 5m +``` + ## Troubleshooting All objects created by the build-pipeline controller show the lineage of where diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_types.go b/pkg/apis/pipeline/v1alpha1/pipeline_types.go index 8e8acb64997..95d65481d6d 100644 --- a/pkg/apis/pipeline/v1alpha1/pipeline_types.go +++ b/pkg/apis/pipeline/v1alpha1/pipeline_types.go @@ -23,9 +23,13 @@ import ( // PipelineSpec defines the desired state of PipeLine. type PipelineSpec struct { - Resources []PipelineDeclaredResource `json:"resources"` - Tasks []PipelineTask `json:"tasks"` - Generation int64 `json:"generation,omitempty"` + Resources []PipelineDeclaredResource `json:"resources"` + Tasks []PipelineTask `json:"tasks"` + // Time after which the Pipeline times out. Defaults to never. + // Refer Go's ParseDuration documentation for expected format: https://golang.org/pkg/time/#ParseDuration + // +optional + Timeout *metav1.Duration `json:"timeout,omitempty"` + Generation int64 `json:"generation,omitempty"` } // PipelineStatus does not contain anything because Pipelines on their own diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_validation.go b/pkg/apis/pipeline/v1alpha1/pipeline_validation.go index 85cef07eae5..ef7d3dffacb 100644 --- a/pkg/apis/pipeline/v1alpha1/pipeline_validation.go +++ b/pkg/apis/pipeline/v1alpha1/pipeline_validation.go @@ -124,5 +124,13 @@ func (ps *PipelineSpec) Validate() *apis.FieldError { if err := validateFrom(ps.Tasks); err != nil { return apis.ErrInvalidValue(err.Error(), "spec.tasks.resources.inputs.from") } + + if ps.Timeout != nil { + // timeout should be a valid duration of at least 0. + if ps.Timeout.Duration <= 0 { + return apis.ErrInvalidValue(fmt.Sprintf("%s should be > 0", ps.Timeout.Duration.String()), "spec.timeout") + } + } + return nil } diff --git a/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go b/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go index 448bd1dff22..8de330c7ede 100644 --- a/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go +++ b/pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go @@ -18,9 +18,11 @@ package v1alpha1_test import ( "testing" + "time" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" tb "github.com/knative/build-pipeline/test/builder" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestPipelineSpec_Validate_Error(t *testing.T) { @@ -100,6 +102,14 @@ func TestPipelineSpec_Validate_Error(t *testing.T) { tb.PipelineTaskInputResource("wow-image", "wonderful-resource", tb.From("bar"))), )), }, + { + name: "negative pipeline timeout", + p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec( + tb.PipelineTask("foo", "foo-task"), + tb.PipelineTask("bar", "bar-task"), + tb.PipelineTimeout(&metav1.Duration{Duration: -48 * time.Hour}), + )), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -144,6 +154,14 @@ func TestPipelineSpec_Validate_Valid(t *testing.T) { tb.PipelineTaskInputResource("wow-image", "wonderful-resource", tb.From("bar"))), )), }, + { + name: "valid timeout", + p: tb.Pipeline("pipeline", "namespace", tb.PipelineSpec( + tb.PipelineTask("foo", "foo-task"), + tb.PipelineTask("bar", "bar-task"), + tb.PipelineTimeout(&metav1.Duration{Duration: 24 * time.Hour}), + )), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/apis/pipeline/v1alpha1/pipelinerun_types.go b/pkg/apis/pipeline/v1alpha1/pipelinerun_types.go index d877b8d29ee..2004ca8e978 100644 --- a/pkg/apis/pipeline/v1alpha1/pipelinerun_types.go +++ b/pkg/apis/pipeline/v1alpha1/pipelinerun_types.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "fmt" + "time" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/webhook" @@ -107,6 +108,12 @@ type PipelineRunStatus struct { // In #107 should be updated to hold the location logs have been uploaded to // +optional Results *Results `json:"results,omitempty"` + // StartTime is the time the PipelineRun is actually started. + // +optional + StartTime *metav1.Time `json:"startTime,omitempty"` + // CompletionTime is the time the PipelineRun completed. + // +optional + CompletionTime *metav1.Time `json:"completionTime,omitempty"` // map of TaskRun Status with the taskRun name as the key //+optional TaskRuns map[string]TaskRunStatus `json:"taskRuns,omitempty"` @@ -124,6 +131,9 @@ func (pr *PipelineRunStatus) InitializeConditions() { if pr.TaskRuns == nil { pr.TaskRuns = make(map[string]TaskRunStatus) } + if pr.StartTime.IsZero() { + pr.StartTime = &metav1.Time{time.Now()} + } pipelineRunCondSet.Manage(pr).InitializeConditions() } diff --git a/pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go b/pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go index fa8192d2d21..7e4298bdb37 100644 --- a/pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go +++ b/pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go @@ -76,6 +76,10 @@ func TestInitializeConditions(t *testing.T) { t.Fatalf("PipelineRun status not initialized correctly") } + if p.Status.StartTime.IsZero() { + t.Fatalf("PipelineRun StartTime not initialized correctly") + } + p.Status.TaskRuns["fooTask"] = TaskRunStatus{} p.Status.InitializeConditions() diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_types.go b/pkg/apis/pipeline/v1alpha1/taskrun_types.go index c546aa95eac..46741f140c1 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_types.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_types.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "fmt" + "time" "github.com/knative/pkg/apis" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" @@ -136,6 +137,9 @@ func (tr *TaskRunStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha return taskRunCondSet.Manage(tr).GetCondition(t) } func (tr *TaskRunStatus) InitializeConditions() { + if tr.StartTime.IsZero() { + tr.StartTime = &metav1.Time{time.Now()} + } taskRunCondSet.Manage(tr).InitializeConditions() } diff --git a/pkg/apis/pipeline/v1alpha1/taskrun_validation.go b/pkg/apis/pipeline/v1alpha1/taskrun_validation.go index 5e6dc8215f8..5cf3d56f25d 100644 --- a/pkg/apis/pipeline/v1alpha1/taskrun_validation.go +++ b/pkg/apis/pipeline/v1alpha1/taskrun_validation.go @@ -69,6 +69,7 @@ func (ts *TaskRunSpec) Validate() *apis.FieldError { return err } } + return nil } diff --git a/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go index 854fbfeba76..5d077ca0939 100644 --- a/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go @@ -575,6 +575,24 @@ func (in *PipelineRunStatus) DeepCopyInto(out *PipelineRunStatus) { **out = **in } } + if in.StartTime != nil { + in, out := &in.StartTime, &out.StartTime + if *in == nil { + *out = nil + } else { + *out = new(v1.Time) + (*in).DeepCopyInto(*out) + } + } + if in.CompletionTime != nil { + in, out := &in.CompletionTime, &out.CompletionTime + if *in == nil { + *out = nil + } else { + *out = new(v1.Time) + (*in).DeepCopyInto(*out) + } + } if in.TaskRuns != nil { in, out := &in.TaskRuns, &out.TaskRuns *out = make(map[string]TaskRunStatus, len(*in)) @@ -612,6 +630,15 @@ func (in *PipelineSpec) DeepCopyInto(out *PipelineSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + if *in == nil { + *out = nil + } else { + *out = new(v1.Duration) + **out = **in + } + } return } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go index 420dbe9d899..1cac7090b79 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go @@ -294,7 +294,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er if rprt != nil { c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName) - rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, serviceAccount) + rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, serviceAccount, p.Spec.Timeout) if err != nil { c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err) return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err) @@ -302,7 +302,8 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er } before := pr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) - after := resources.GetPipelineConditionStatus(pr.Name, pipelineState, c.Logger) + after := resources.GetPipelineConditionStatus(pr.Name, pipelineState, c.Logger, pr.Status.StartTime, p.Spec.Timeout) + pr.Status.SetCondition(after) reconciler.EmitEvent(c.Recorder, before, after, pr) @@ -321,7 +322,21 @@ func updateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.R } } -func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, sa string) (*v1alpha1.TaskRun, error) { +func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, sa string, pipelineTimeout *metav1.Duration) (*v1alpha1.TaskRun, error) { + ts := rprt.ResolvedTaskResources.TaskSpec.DeepCopy() + if pipelineTimeout != nil { + pTimeoutTime := pr.Status.StartTime.Add(pipelineTimeout.Duration) + if ts.Timeout == nil || time.Now().Add(ts.Timeout.Duration).After(pTimeoutTime) { + // Just in case something goes awry and we're creating the TaskRun after it should have already timed out, + // set a timeout of 0. + taskRunTimeout := pTimeoutTime.Sub(time.Now()) + if taskRunTimeout < 0 { + taskRunTimeout = 0 + } + ts.Timeout = &metav1.Duration{Duration: taskRunTimeout} + } + } + tr := &v1alpha1.TaskRun{ ObjectMeta: metav1.ObjectMeta{ Name: rprt.TaskRunName, @@ -333,15 +348,13 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re }, }, Spec: v1alpha1.TaskRunSpec{ - TaskRef: &v1alpha1.TaskRef{ - Name: rprt.ResolvedTaskResources.TaskName, - }, + TaskSpec: ts, Inputs: v1alpha1.TaskRunInputs{ Params: rprt.PipelineTask.Params, }, ServiceAccount: sa, - }, - } + }} + resources.WrapSteps(&tr.Spec, rprt.PipelineTask, rprt.ResolvedTaskResources.Inputs, rprt.ResolvedTaskResources.Outputs) return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(pr.Namespace).Create(tr) diff --git a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go index e75b0276e61..7e7492865d7 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go @@ -184,7 +184,15 @@ func TestReconcile(t *testing.T) { ), tb.TaskRunLabel("pipeline.knative.dev/pipeline", "test-pipeline"), tb.TaskRunLabel("pipeline.knative.dev/pipelineRun", "test-pipeline-run-success"), - tb.TaskRunSpec(tb.TaskRunTaskRef("unit-test-task"), + tb.TaskRunSpec(tb.TaskRunTaskSpec( + tb.TaskInputs( + tb.InputsResource("workspace", v1alpha1.PipelineResourceTypeGit), + tb.InputsParam("foo"), tb.InputsParam("bar"), tb.InputsParam("templatedparam"), + ), + tb.TaskOutputs( + tb.OutputsResource("image-to-use", v1alpha1.PipelineResourceTypeImage), + tb.OutputsResource("workspace", v1alpha1.PipelineResourceTypeGit), + )), tb.TaskRunServiceAccount("test-sa"), tb.TaskRunInputs( tb.TaskRunInputsParam("foo", "somethingfun"), @@ -425,8 +433,7 @@ func TestReconcileOnCompletedPipelineRun(t *testing.T) { })), )} ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec( - tb.PipelineTask("hello-world-1", "hellow-world"), - ))} + tb.PipelineTask("hello-world-1", "hellow-world")))} ts := []*v1alpha1.Task{tb.Task("hello-world", "foo")} d := test.Data{ PipelineRuns: prs, @@ -496,6 +503,7 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) { ), ), } + d := test.Data{ PipelineRuns: prs, Pipelines: ps, @@ -517,12 +525,69 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) { // Check that the PipelineRun was reconciled correctly reconciledRun, err := clients.Pipeline.Pipeline().PipelineRuns("foo").Get("test-pipeline-run-cancelled", metav1.GetOptions{}) - if err != nil { - t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err) - } // This PipelineRun should still be complete and false, and the status should reflect that if !reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded).IsFalse() { t.Errorf("Expected PipelineRun status to be complete and false, but was %v", reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded)) } } + +func TestReconcileWithTimeout(t *testing.T) { + ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec( + tb.PipelineTask("hello-world-1", "hello-world"), + tb.PipelineTimeout(&metav1.Duration{Duration: 12 * time.Hour}), + ))} + prs := []*v1alpha1.PipelineRun{tb.PipelineRun("test-pipeline-run-with-timeout", "foo", + tb.PipelineRunSpec("test-pipeline", + tb.PipelineRunServiceAccount("test-sa"), + ), + tb.PipelineRunStatus( + tb.PipelineRunStartTime(time.Now().AddDate(0, 0, -1))), + )} + ts := []*v1alpha1.Task{tb.Task("hello-world", "foo")} + + d := test.Data{ + PipelineRuns: prs, + Pipelines: ps, + Tasks: ts, + } + + // create fake recorder for testing + fr := record.NewFakeRecorder(2) + + testAssets := getPipelineRunController(d, fr) + c := testAssets.Controller + clients := testAssets.Clients + + err := c.Reconciler.Reconcile(context.Background(), "foo/test-pipeline-run-with-timeout") + if err != nil { + t.Errorf("Did not expect to see error when reconciling completed PipelineRun but saw %s", err) + } + + // Check that the PipelineRun was reconciled correctly + reconciledRun, err := clients.Pipeline.Pipeline().PipelineRuns("foo").Get("test-pipeline-run-with-timeout", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err) + } + + // The PipelineRun should be timed out. + if reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded).Reason != resources.ReasonTimedOut { + t.Errorf("Expected PipelineRun to be timed out, but condition reason is %s", reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded)) + } + + // Check that the expected TaskRun was created + actual := clients.Pipeline.Actions()[0].(ktesting.CreateAction).GetObject().(*v1alpha1.TaskRun) + if actual == nil { + t.Errorf("Expected a TaskRun to be created, but it wasn't.") + } + + // There should be a time out for the TaskRun + if actual.Spec.TaskSpec.Timeout == nil { + t.Fatalf("TaskSpec.Timeout shouldn't be nil") + } + + // The TaskRun timeout should be less than or equal to the PipelineRun timeout. + if actual.Spec.TaskSpec.Timeout.Duration > ps[0].Spec.Timeout.Duration { + t.Errorf("TaskRun timeout %s should be less than or equal to PipelineRun timeout %s", actual.Spec.TaskSpec.Timeout.Duration.String(), ps[0].Spec.Timeout.Duration.String()) + } +} diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution.go index aa0f4311f23..d7550e2a3a2 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution.go @@ -18,15 +18,16 @@ package resources import ( "fmt" + "time" + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/list" + "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/resources" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - - "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" - "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/list" - "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/taskrun/resources" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -40,6 +41,10 @@ const ( // ReasonSucceeded indicates that the reason for the finished status is that all of the TaskRuns // completed successfully ReasonSucceeded = "Succeeded" + + // ReasonTimedOut indicates that the PipelineRun has taken longer than its configured + // timeout + ReasonTimedOut = "PipelineRunTimeout" ) // GetNextTask returns the next Task for which a TaskRun should be created, @@ -226,8 +231,24 @@ func getTaskRunName(prName string, pt *v1alpha1.PipelineTask) string { // GetPipelineConditionStatus will return the Condition that the PipelineRun prName should be // updated with, based on the status of the TaskRuns in state. -func GetPipelineConditionStatus(prName string, state []*ResolvedPipelineRunTask, logger *zap.SugaredLogger) *duckv1alpha1.Condition { +func GetPipelineConditionStatus(prName string, state []*ResolvedPipelineRunTask, logger *zap.SugaredLogger, startTime *metav1.Time, + pipelineTimeout *metav1.Duration) *duckv1alpha1.Condition { allFinished := true + if !startTime.IsZero() && pipelineTimeout != nil { + timeout := pipelineTimeout.Duration + runtime := time.Since(startTime.Time) + if runtime > timeout { + logger.Infof("PipelineRun %q has timed out(runtime %s over %s)", prName, runtime, timeout) + + timeoutMsg := fmt.Sprintf("PipelineRun %q failed to finish within %q", prName, timeout.String()) + return &duckv1alpha1.Condition{ + Type: duckv1alpha1.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: ReasonTimedOut, + Message: timeoutMsg, + } + } + } for _, rprt := range state { if rprt.TaskRun == nil { logger.Infof("TaskRun %s doesn't have a Status, so PipelineRun %s isn't finished", rprt.TaskRunName, prName) @@ -236,7 +257,7 @@ func GetPipelineConditionStatus(prName string, state []*ResolvedPipelineRunTask, } c := rprt.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded) if c == nil { - logger.Infof("TaskRun %s doens't have a condition, so PipelineRun %s isn't finished", rprt.TaskRunName, prName) + logger.Infof("TaskRun %s doesn't have a condition, so PipelineRun %s isn't finished", rprt.TaskRunName, prName) allFinished = false break } diff --git a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution_test.go b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution_test.go index f59cab1ce97..20fda0ca110 100644 --- a/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution_test.go +++ b/pkg/reconciler/v1alpha1/pipelinerun/resources/pipelinerunresolution_test.go @@ -20,6 +20,7 @@ import ( "fmt" "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -671,7 +672,8 @@ func TestGetPipelineConditionStatus(t *testing.T) { } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - c := GetPipelineConditionStatus("somepipelinerun", tc.state, zap.NewNop().Sugar()) + c := GetPipelineConditionStatus("somepipelinerun", tc.state, zap.NewNop().Sugar(), &metav1.Time{time.Now()}, + nil) if c.Status != tc.expectedStatus { t.Fatalf("Expected to get status %s but got %s for state %v", tc.expectedStatus, c.Status, tc.state) } diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun.go b/pkg/reconciler/v1alpha1/taskrun/taskrun.go index 0dce7ee00a0..15c4a53f685 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "reflect" + "time" "github.com/knative/build-pipeline/pkg/apis/pipeline" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" @@ -63,6 +64,9 @@ const ( // is just starting to be reconciled reasonRunning = "Running" + // reasonTimedOut indicates that the TaskRun has taken longer than its configured timeout + reasonTimedOut = "TaskRunTimeout" + // taskRunAgentName defines logging agent name for TaskRun Controller taskRunAgentName = "taskrun-controller" // taskRunControllerName defines name for TaskRun Controller @@ -231,6 +235,15 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error }) return nil } + + // Check if the TaskRun has timed out; if it is, this will set its status + // accordingly. + if timedOut, err := c.checkTimeout(tr, spec, c.KubeClientSet.CoreV1().Pods(tr.Namespace).Delete); err != nil { + return err + } else if timedOut { + return nil + } + rtr, err := resources.ResolveTaskResources(spec, taskName, tr.Spec.Inputs.Resources, tr.Spec.Outputs.Resources, c.resourceLister.PipelineResources(tr.Namespace).Get) if err != nil { c.Logger.Errorf("Failed to resolve references for taskrun %s: %v", tr.Name, err) @@ -308,6 +321,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1alpha1.TaskRun) error updateStatusFromBuildStatus(tr, buildStatus) after := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + reconciler.EmitEvent(c.Recorder, before, after, tr) c.Logger.Infof("Successfully reconciled taskrun %s/%s with status: %#v", tr.Name, tr.Namespace, after) @@ -528,3 +542,39 @@ func makeLabels(s *v1alpha1.TaskRun) map[string]string { func isDone(status *v1alpha1.TaskRunStatus) bool { return !status.GetCondition(duckv1alpha1.ConditionSucceeded).IsUnknown() } + +type DeletePod func(podName string, options *metav1.DeleteOptions) error + +func (c *Reconciler) checkTimeout(tr *v1alpha1.TaskRun, ts *v1alpha1.TaskSpec, dp DeletePod) (bool, error) { + // If tr has not started, startTime should be zero. + if tr.Status.StartTime.IsZero() { + return false, nil + } + + bs := ts.GetBuildSpec() + + if bs.Timeout != nil { + timeout := bs.Timeout.Duration + runtime := time.Since(tr.Status.StartTime.Time) + if runtime > timeout { + c.Logger.Infof("TaskRun %q is timeout (runtime %s over %s), deleting pod", tr.Name, runtime, timeout) + if err := dp(tr.Status.PodName, &metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + c.Logger.Errorf("Failed to terminate pod: %v", err) + return true, err + } + + timeoutMsg := fmt.Sprintf("TaskRun %q failed to finish within %q", tr.Name, timeout.String()) + tr.Status.SetCondition(&duckv1alpha1.Condition{ + Type: duckv1alpha1.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: reasonTimedOut, + Message: timeoutMsg, + }) + // update tr completed time + tr.Status.CompletionTime = &metav1.Time{Time: time.Now()} + + return true, nil + } + } + return false, nil +} diff --git a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go index 87f63415f15..9f64e62b7d6 100644 --- a/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go +++ b/pkg/reconciler/v1alpha1/taskrun/taskrun_test.go @@ -66,6 +66,7 @@ var ( simpleStep = tb.Step("simple-step", "foo", tb.Command("/mycmd")) simpleTask = tb.Task("test-task", "foo", tb.TaskSpec(simpleStep)) + timeoutTask = tb.Task("timeout-task", "foo", tb.TaskSpec(simpleStep, tb.TaskTimeout(10*time.Second))) clustertask = tb.ClusterTask("test-cluster-task", tb.ClusterTaskSpec(simpleStep)) outputTask = tb.Task("test-output-task", "foo", tb.TaskSpec( @@ -861,6 +862,7 @@ func TestReconcileOnCancelledTaskRun(t *testing.T) { if err != nil { t.Fatalf("Expected completed TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err) } + expectedStatus := &duckv1alpha1.Condition{ Type: duckv1alpha1.ConditionSucceeded, Status: corev1.ConditionFalse, @@ -871,3 +873,41 @@ func TestReconcileOnCancelledTaskRun(t *testing.T) { t.Fatalf("-want, +got: %v", d) } } + +func TestReconcileOnTimedOutTaskRun(t *testing.T) { + taskRun := tb.TaskRun("test-taskrun-timeout", "foo", + tb.TaskRunSpec( + tb.TaskRunTaskRef(timeoutTask.Name), + ), + tb.TaskRunStatus(tb.Condition(duckv1alpha1.Condition{ + Type: duckv1alpha1.ConditionSucceeded, + Status: corev1.ConditionUnknown}), + tb.TaskRunStartTime(time.Now().Add(-15*time.Second)))) + + d := test.Data{ + TaskRuns: []*v1alpha1.TaskRun{taskRun}, + Tasks: []*v1alpha1.Task{timeoutTask}, + } + + testAssets := getTaskRunController(d) + c := testAssets.Controller + clients := testAssets.Clients + + if err := c.Reconciler.Reconcile(context.Background(), fmt.Sprintf("%s/%s", taskRun.Namespace, taskRun.Name)); err != nil { + t.Fatalf("Unexpected error when reconciling completed TaskRun : %v", err) + } + newTr, err := clients.Pipeline.PipelineV1alpha1().TaskRuns(taskRun.Namespace).Get(taskRun.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Expected completed TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err) + } + + expectedStatus := &duckv1alpha1.Condition{ + Type: duckv1alpha1.ConditionSucceeded, + Status: corev1.ConditionFalse, + Reason: "TaskRunTimeout", + Message: `TaskRun "test-taskrun-timeout" failed to finish within "10s"`, + } + if d := cmp.Diff(newTr.Status.GetCondition(duckv1alpha1.ConditionSucceeded), expectedStatus, ignoreLastTransitionTime); d != "" { + t.Fatalf("-want, +got: %v", d) + } +} diff --git a/test/builder/pipeline.go b/test/builder/pipeline.go index fd30c0bd2e9..48d62006c25 100644 --- a/test/builder/pipeline.go +++ b/test/builder/pipeline.go @@ -17,6 +17,7 @@ import ( "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "time" ) // PipelineOp is an operation which modify a Pipeline struct. @@ -114,6 +115,13 @@ func PipelineTask(name, taskName string, ops ...PipelineTaskOp) PipelineSpecOp { } } +// PipelineTimeout sets the timeout to the PipelineSpec. +func PipelineTimeout(duration *metav1.Duration) PipelineSpecOp { + return func(ps *v1alpha1.PipelineSpec) { + ps.Timeout = duration + } +} + // PipelineTaskRefKind sets the TaskKind to the PipelineTaskRef. func PipelineTaskRefKind(kind v1alpha1.TaskKind) PipelineTaskOp { return func(pt *v1alpha1.PipelineTask) { @@ -260,6 +268,13 @@ func PipelineRunStatusCondition(condition duckv1alpha1.Condition) PipelineRunSta } } +// PipelineRunStartTime sets the start time to the PipelineRunStatus. +func PipelineRunStartTime(startTime time.Time) PipelineRunStatusOp { + return func(s *v1alpha1.PipelineRunStatus) { + s.StartTime = &metav1.Time{Time: startTime} + } +} + // PipelineResource creates a PipelineResource with default values. // Any number of PipelineResource modifier can be passed to transform it. func PipelineResource(name, namespace string, ops ...PipelineResourceOp) *v1alpha1.PipelineResource { diff --git a/test/builder/pipeline_test.go b/test/builder/pipeline_test.go index e3e7cf19eba..f133344c0d2 100644 --- a/test/builder/pipeline_test.go +++ b/test/builder/pipeline_test.go @@ -15,6 +15,7 @@ package builder_test import ( "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" @@ -35,6 +36,7 @@ func TestPipeline(t *testing.T) { tb.PipelineTaskInputResource("some-repo", "my-only-git-resource", tb.From("foo")), tb.PipelineTaskOutputResource("some-image", "my-only-image-resource"), ), + tb.PipelineTimeout(&metav1.Duration{Duration: 1 * time.Hour}), )) expectedPipeline := &v1alpha1.Pipeline{ ObjectMeta: metav1.ObjectMeta{Name: "tomatoes", Namespace: "foo"}, @@ -65,6 +67,7 @@ func TestPipeline(t *testing.T) { }}, }, }}, + Timeout: &metav1.Duration{Duration: 1 * time.Hour}, }, } if d := cmp.Diff(expectedPipeline, pipeline); d != "" { @@ -73,12 +76,13 @@ func TestPipeline(t *testing.T) { } func TestPipelineRun(t *testing.T) { + startTime := time.Now() pipelineRun := tb.PipelineRun("pear", "foo", tb.PipelineRunSpec( "tomatoes", tb.PipelineRunServiceAccount("sa"), tb.PipelineRunResourceBinding("some-resource", tb.PipelineResourceBindingRef("my-special-resource")), ), tb.PipelineRunStatus(tb.PipelineRunStatusCondition(duckv1alpha1.Condition{ Type: duckv1alpha1.ConditionSucceeded, - }))) + }), tb.PipelineRunStartTime(startTime))) expectedPipelineRun := &v1alpha1.PipelineRun{ ObjectMeta: metav1.ObjectMeta{Name: "pear", Namespace: "foo"}, Spec: v1alpha1.PipelineRunSpec{ @@ -94,6 +98,7 @@ func TestPipelineRun(t *testing.T) { }, Status: v1alpha1.PipelineRunStatus{ Conditions: []duckv1alpha1.Condition{{Type: duckv1alpha1.ConditionSucceeded}}, + StartTime: &metav1.Time{Time: startTime}, }, } if d := cmp.Diff(expectedPipelineRun, pipelineRun); d != "" { diff --git a/test/builder/task.go b/test/builder/task.go index 28a23afe29a..81cda03a7b5 100644 --- a/test/builder/task.go +++ b/test/builder/task.go @@ -312,6 +312,13 @@ func StepState(ops ...StepStateOp) TaskRunStatusOp { } } +// TaskRunStartTime sets the start time to the TaskRunStatus. +func TaskRunStartTime(startTime time.Time) TaskRunStatusOp { + return func(s *v1alpha1.TaskRunStatus) { + s.StartTime = &metav1.Time{Time: startTime} + } +} + // StateTerminated set Terminated to the StepState. func StateTerminated(exitcode int) StepStateOp { return func(s *v1alpha1.StepState) { diff --git a/test/taskrun_test.go b/test/taskrun_test.go index c67d42387fb..73dea907f1b 100644 --- a/test/taskrun_test.go +++ b/test/taskrun_test.go @@ -19,9 +19,14 @@ import ( "fmt" "strings" "testing" + "time" + tb "github.com/knative/build-pipeline/test/builder" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" knativetest "github.com/knative/pkg/test" "github.com/knative/pkg/test/logging" + corev1 "k8s.io/api/core/v1" ) // TestTaskRun is an integration test that will verify a very simple "hello world" TaskRun can be @@ -54,3 +59,41 @@ func TestTaskRun(t *testing.T) { t.Fatalf("Expected output %s from pod %s but got %s", buildOutput, hwValidationPodName, output) } } + +// TestTaskRunTimeout is an integration test that will verify a TaskRun can be timed out. +func TestTaskRunTimeout(t *testing.T) { + logger := logging.GetContextLogger(t.Name()) + c, namespace := setup(t, logger) + + knativetest.CleanupOnInterrupt(func() { tearDown(t, logger, c, namespace) }, logger) + defer tearDown(t, logger, c, namespace) + + logger.Infof("Creating Task and TaskRun in namespace %s", namespace) + if _, err := c.TaskClient.Create(tb.Task(hwTaskName, namespace, + tb.TaskSpec(tb.Step(hwContainerName, "busybox", tb.Command("/bin/bash"), tb.Args("-c", "sleep 300")), + tb.TaskTimeout(10 * time.Second)))); err != nil { + t.Fatalf("Failed to create Task `%s`: %s", hwTaskName, err) + } + if _, err := c.TaskRunClient.Create(tb.TaskRun(hwTaskRunName, namespace, tb.TaskRunSpec(tb.TaskRunTaskRef(hwTaskName)))); err != nil { + t.Fatalf("Failed to create TaskRun `%s`: %s", hwTaskRunName, err) + } + + logger.Infof("Waiting for TaskRun %s in namespace %s to complete", hwTaskRunName, namespace) + if err := WaitForTaskRunState(c, hwTaskRunName, func(tr *v1alpha1.TaskRun) (bool, error) { + cond := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if cond != nil { + if cond.Status == corev1.ConditionFalse { + if cond.Reason == "TaskRunTimeout" { + return true, nil + } + return true, fmt.Errorf("taskRun %s completed with the wrong reason: %s", hwTaskRunName, cond.Reason) + } else if cond.Status == corev1.ConditionTrue { + return true, fmt.Errorf("taskRun %s completed successfully, should have been timed out", hwTaskRunName) + } + } + + return false, nil + }, "TaskRunTimeout"); err != nil { + t.Errorf("Error waiting for TaskRun %s to finish: %s", hwTaskRunName, err) + } +} diff --git a/test/timeout_test.go b/test/timeout_test.go new file mode 100644 index 00000000000..28aa739284d --- /dev/null +++ b/test/timeout_test.go @@ -0,0 +1,151 @@ +// +build e2e + +/* +Copyright 2019 Knative Authors LLC +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "fmt" + "testing" + "time" + + "github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1" + "github.com/knative/build-pipeline/pkg/reconciler/v1alpha1/pipelinerun/resources" + tb "github.com/knative/build-pipeline/test/builder" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + knativetest "github.com/knative/pkg/test" + "github.com/knative/pkg/test/logging" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// TestPipelineRunTimeout is an integration test that will +// verify that pipelinerun timeout works and leads to the the correct TaskRun statuses +// and pod deletions. +func TestPipelineRunTimeout(t *testing.T) { + logger := logging.GetContextLogger(t.Name()) + c, namespace := setup(t, logger) + + knativetest.CleanupOnInterrupt(func() { tearDown(t, logger, c, namespace) }, logger) + defer tearDown(t, logger, c, namespace) + + logger.Infof("Creating Task in namespace %s", namespace) + task := tb.Task("banana", namespace, tb.TaskSpec( + tb.Step("foo", "busybox", tb.Command("sleep"), tb.Args("10")))) + if _, err := c.TaskClient.Create(task); err != nil { + t.Fatalf("Failed to create Task `%s`: %s", hwTaskName, err) + } + + pipeline := tb.Pipeline("tomatoes", namespace, + tb.PipelineSpec(tb.PipelineTask("foo", "banana"), + tb.PipelineTimeout(&metav1.Duration{Duration: 5 * time.Second}), + ), + ) + pipelineRun := tb.PipelineRun("pear", namespace, tb.PipelineRunSpec(pipeline.Name)) + if _, err := c.PipelineClient.Create(pipeline); err != nil { + t.Fatalf("Failed to create Pipeline `%s`: %s", pipeline.Name, err) + } + if _, err := c.PipelineRunClient.Create(pipelineRun); err != nil { + t.Fatalf("Failed to create PipelineRun `%s`: %s", pipelineRun.Name, err) + } + + logger.Infof("Waiting for Pipelinerun %s in namespace %s to be started", pipelineRun.Name, namespace) + if err := WaitForPipelineRunState(c, pipelineRun.Name, pipelineRunTimeout, func(pr *v1alpha1.PipelineRun) (bool, error) { + c := pr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c != nil { + if c.Status == corev1.ConditionTrue || c.Status == corev1.ConditionFalse { + return true, fmt.Errorf("pipelineRun %s already finished!", pipelineRun.Name) + } else if c.Status == corev1.ConditionUnknown && (c.Reason == "Running" || c.Reason == "Pending") { + return true, nil + } + } + return false, nil + }, "PipelineRunRunning"); err != nil { + t.Fatalf("Error waiting for PipelineRun %s to be running: %s", pipelineRun.Name, err) + } + + trName := fmt.Sprintf("%s-%s", pipelineRun.Name, task.Spec.Steps[0].Name) + + logger.Infof("Waiting for TaskRun %s in namespace %s to be running", trName, namespace) + if err := WaitForTaskRunState(c, trName, func(tr *v1alpha1.TaskRun) (bool, error) { + c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c != nil { + if c.Status == corev1.ConditionTrue || c.Status == corev1.ConditionFalse { + return true, fmt.Errorf("taskRun %s already finished!", trName) + } else if c.Status == corev1.ConditionUnknown && (c.Reason == "Running" || c.Reason == "Pending") { + return true, nil + } + } + return false, nil + }, "TaskRunRunning"); err != nil { + t.Fatalf("Error waiting for TaskRun %s to be running: %s", trName, err) + } + + if _, err := c.PipelineRunClient.Get(pipelineRun.Name, metav1.GetOptions{}); err != nil { + t.Fatalf("Failed to get PipelineRun `%s`: %s", pipelineRun.Name, err) + } + + logger.Infof("Waiting for PipelineRun %s in namespace %s to be timed out", pipelineRun.Name, namespace) + if err := WaitForPipelineRunState(c, pipelineRun.Name, pipelineRunTimeout, func(pr *v1alpha1.PipelineRun) (bool, error) { + c := pr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if c != nil { + if c.Status == corev1.ConditionFalse { + if c.Reason == resources.ReasonTimedOut { + return true, nil + } + return true, fmt.Errorf("pipelineRun %s completed with the wrong reason: %s", pipelineRun.Name, c.Reason) + } else if c.Status == corev1.ConditionTrue { + return true, fmt.Errorf("pipelineRun %s completed successfully, should have been timed out", pipelineRun.Name) + } + } + return false, nil + }, "PipelineRunTimedOut"); err != nil { + t.Errorf("Error waiting for TaskRun %s to finish: %s", trName, err) + } + + logger.Infof("Waiting for TaskRun %s in namespace %s to be cancelled", trName, namespace) + if err := WaitForTaskRunState(c, trName, func(tr *v1alpha1.TaskRun) (bool, error) { + cond := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded) + if cond != nil { + if cond.Status == corev1.ConditionFalse { + if cond.Reason == "TaskRunTimeout" { + return true, nil + } + return true, fmt.Errorf("taskRun %s completed with the wrong reason: %s", trName, cond.Reason) + } else if cond.Status == corev1.ConditionTrue { + return true, fmt.Errorf("taskRun %s completed successfully, should have been timed out", trName) + } + } + return false, nil + }, "TaskRunTimeout"); err != nil { + t.Errorf("Error waiting for TaskRun %s to finish: %s", trName, err) + } + + // Verify that we can create a second Pipeline using the same Task without a Pipeline-level timeout that will not + // time out + secondPipeline := tb.Pipeline("peppers", namespace, + tb.PipelineSpec(tb.PipelineTask("foo", "banana"))) + secondPipelineRun := tb.PipelineRun("kiwi", namespace, tb.PipelineRunSpec("peppers")) + if _, err := c.PipelineClient.Create(secondPipeline); err != nil { + t.Fatalf("Failed to create Pipeline `%s`: %s", secondPipeline.Name, err) + } + if _, err := c.PipelineRunClient.Create(secondPipelineRun); err != nil { + t.Fatalf("Failed to create PipelineRun `%s`: %s", secondPipelineRun.Name, err) + } + + logger.Infof("Waiting for PipelineRun %s in namespace %s to complete", secondPipelineRun.Name, namespace) + if err := WaitForPipelineRunState(c, secondPipelineRun.Name, pipelineRunTimeout, PipelineRunSucceed(secondPipelineRun.Name), "PipelineRunSuccess"); err != nil { + t.Fatalf("Error waiting for PipelineRun %s to finish: %s", secondPipelineRun.Name, err) + } +}