Skip to content

Commit

Permalink
Add Pipeline-level timeouts
Browse files Browse the repository at this point in the history
Rebased and squashed the initial work, more to come.

Fixes tektoncd#222
  • Loading branch information
abayer committed Jan 23, 2019
1 parent 2ce21d9 commit b945146
Show file tree
Hide file tree
Showing 16 changed files with 254 additions and 23 deletions.
9 changes: 7 additions & 2 deletions pkg/apis/pipeline/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@ import (

// PipelineSpec defines the desired state of PipeLine.
type PipelineSpec struct {
Tasks []PipelineTask `json:"tasks"`
Generation int64 `json:"generation,omitempty"`
Tasks []PipelineTask `json:"tasks"`
// Time after which the build times out. Defaults to never.
// Specified build timeout should be less than 24h.
// Refer Go's ParseDuration documentation for expected format: https://golang.org/pkg/time/#ParseDuration
// +optional
Timeout *metav1.Duration `json:"timeout,omitempty"`
Generation int64 `json:"generation,omitempty"`
}

// PipelineStatus does not contain anything because Pipelines on their own
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"fmt"
"time"

"github.com/knative/pkg/apis"
"k8s.io/apimachinery/pkg/api/equality"
Expand Down Expand Up @@ -58,5 +59,17 @@ func (ps *PipelineSpec) Validate() *apis.FieldError {
}
}
}

if ps.Timeout != nil {
// timeout should be a valid duration of between 0 and 24 hours.
maxTimeout := time.Duration(24 * time.Hour)

if ps.Timeout.Duration > maxTimeout {
return apis.ErrInvalidValue(fmt.Sprintf("%s should be < 24h", ps.Timeout.Duration.String()), "spec.timeout")
} else if ps.Timeout.Duration <= 0 {
return apis.ErrInvalidValue(fmt.Sprintf("%s should be > 0", ps.Timeout.Duration.String()), "spec.timeout")
}
}

return nil
}
40 changes: 40 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ package v1alpha1

import (
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestPipelineSpec_Validate_Error(t *testing.T) {
type fields struct {
Tasks []PipelineTask
Timeout *metav1.Duration
Generation int64
}
tests := []struct {
Expand Down Expand Up @@ -50,11 +54,34 @@ func TestPipelineSpec_Validate_Error(t *testing.T) {
}},
},
},
{
name: "negative pipeline timeout",
fields: fields{
Tasks: []PipelineTask{{
Name: "foo",
}, {
Name: "baz",
}},
Timeout: &metav1.Duration{Duration: -48 * time.Hour},
},
},
{
name: "maximum timeout",
fields: fields{
Tasks: []PipelineTask{{
Name: "foo",
}, {
Name: "baz",
}},
Timeout: &metav1.Duration{Duration: 48 * time.Hour},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ps := &PipelineSpec{
Tasks: tt.fields.Tasks,
Timeout: tt.fields.Timeout,
Generation: tt.fields.Generation,
}
if err := ps.Validate(); err == nil {
Expand All @@ -67,6 +94,7 @@ func TestPipelineSpec_Validate_Error(t *testing.T) {
func TestPipelineSpec_Validate_Valid(t *testing.T) {
type fields struct {
Tasks []PipelineTask
Timeout *metav1.Duration
Generation int64
}
tests := []struct {
Expand Down Expand Up @@ -96,11 +124,23 @@ func TestPipelineSpec_Validate_Valid(t *testing.T) {
}},
},
},
{
name: "valid timeout",
fields: fields{
Tasks: []PipelineTask{{
Name: "foo",
}, {
Name: "baz",
}},
Timeout: &metav1.Duration{Duration: 24 * time.Hour},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ps := &PipelineSpec{
Tasks: tt.fields.Tasks,
Timeout: tt.fields.Timeout,
Generation: tt.fields.Generation,
}
if err := ps.Validate(); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"fmt"
"time"

duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"github.com/knative/pkg/webhook"
Expand Down Expand Up @@ -118,6 +119,12 @@ type PipelineRunStatus struct {
// In #107 should be updated to hold the location logs have been uploaded to
// +optional
Results *Results `json:"results,omitempty"`
// StartTime is the time the build is actually started.
// +optional
StartTime *metav1.Time `json:"startTime,omitempty"`
// CompletionTime is the time the build completed.
// +optional
CompletionTime *metav1.Time `json:"completionTime,omitempty"`
// map of TaskRun Status with the taskRun name as the key
//+optional
TaskRuns map[string]TaskRunStatus `json:"taskRuns,omitempty"`
Expand All @@ -135,6 +142,9 @@ func (pr *PipelineRunStatus) InitializeConditions() {
if pr.TaskRuns == nil {
pr.TaskRuns = make(map[string]TaskRunStatus)
}
if pr.StartTime.IsZero() {
pr.StartTime = &metav1.Time{time.Now()}
}
pipelineRunCondSet.Manage(pr).InitializeConditions()
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipelinerun_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func TestInitializeConditions(t *testing.T) {
t.Fatalf("PipelineRun status not initialized correctly")
}

if p.Status.StartTime.IsZero() {
t.Fatalf("PipelineRun StartTime not initialized correctly")
}

p.Status.TaskRuns["fooTask"] = TaskRunStatus{}

p.Status.InitializeConditions()
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func (ts *TaskRunSpec) Validate() *apis.FieldError {
return err
}
}

return nil
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 20 additions & 7 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,17 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt.ResolvedTaskResources.TaskName, rprt.TaskRunName, pr, rprt.PipelineTask, serviceAccount)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt.ResolvedTaskResources.TaskName, rprt.TaskRunName, rprt.ResolvedTaskResources.TaskSpec,
pr, rprt.PipelineTask, serviceAccount, p.Spec.Timeout)
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}
}

before := pr.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
after := resources.GetPipelineConditionStatus(pr.Name, pipelineState, c.Logger)
after := resources.GetPipelineConditionStatus(pr.Name, pipelineState, c.Logger, pr.Status.StartTime, p.Spec.Timeout)

pr.Status.SetCondition(after)

reconciler.EmitEvent(c.Recorder, before, after, pr)
Expand All @@ -283,7 +285,8 @@ func updateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.R
}
}

func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, taskName, taskRunName string, pr *v1alpha1.PipelineRun, pt *v1alpha1.PipelineTask, sa string) (*v1alpha1.TaskRun, error) {
func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, taskName, taskRunName string, ts *v1alpha1.TaskSpec,
pr *v1alpha1.PipelineRun, pt *v1alpha1.PipelineTask, sa string, pipelineTimeout *metav1.Duration) (*v1alpha1.TaskRun, error) {
tr := &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: taskRunName,
Expand All @@ -295,15 +298,25 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, taskName, taskRunN
},
},
Spec: v1alpha1.TaskRunSpec{
TaskRef: &v1alpha1.TaskRef{
Name: taskName,
},
Inputs: v1alpha1.TaskRunInputs{
Params: pt.Params,
},
ServiceAccount: sa,
ServiceAccount: sa,
},
}

if pipelineTimeout != nil {
pTimeoutTime := pr.Status.StartTime.Add(pipelineTimeout.Duration)
if ts.Timeout != nil && time.Now().Add(ts.Timeout.Duration).After(pTimeoutTime){
ts.Timeout.Duration = pTimeoutTime.Sub(time.Now())
}
tr.Spec.TaskSpec = ts
} else {
tr.Spec.TaskRef = &v1alpha1.TaskRef{
Name: taskName,
}
}

resources.WrapSteps(&tr.Spec, pr.Spec.PipelineTaskResources, pt)

return c.PipelineClientSet.PipelineV1alpha1().TaskRuns(pr.Namespace).Create(tr)
Expand Down
68 changes: 63 additions & 5 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,7 @@ func TestReconcileOnCompletedPipelineRun(t *testing.T) {
})),
)}
ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hellow-world"),
))}
tb.PipelineTask("hello-world-1", "hellow-world")))}
ts := []*v1alpha1.Task{tb.Task("hello-world", "foo")}
d := test.Data{
PipelineRuns: prs,
Expand Down Expand Up @@ -467,6 +466,7 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) {
),
),
}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Expand All @@ -488,12 +488,70 @@ func TestReconcileOnCancelledPipelineRun(t *testing.T) {

// Check that the PipelineRun was reconciled correctly
reconciledRun, err := clients.Pipeline.Pipeline().PipelineRuns("foo").Get("test-pipeline-run-cancelled", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

// This PipelineRun should still be complete and false, and the status should reflect that
if !reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded).IsFalse() {
t.Errorf("Expected PipelineRun status to be complete and false, but was %v", reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded))
}
}

func TestReconcileWithTimeout(t *testing.T) {
ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hello-world"),
tb.PipelineTimeout(&metav1.Duration{Duration: 12 * time.Hour}),
))}
prs := []*v1alpha1.PipelineRun{tb.PipelineRun("test-pipeline-run-with-timeout", "foo",
tb.PipelineRunSpec("test-pipeline",
tb.PipelineRunServiceAccount("test-sa"),
),
tb.PipelineRunStatus(
tb.PipelineRunStartTime(time.Now().AddDate(0, 0, -1))),
)}
ts := []*v1alpha1.Task{tb.Task("hello-world", "foo")}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
}

// create fake recorder for testing
fr := record.NewFakeRecorder(1)

testAssets := getPipelineRunController(d, fr)
c := testAssets.Controller
clients := testAssets.Clients

c.Reconciler.Reconcile(context.Background(), "foo/test-pipeline-run-with-timeout")

// make sure there is no failed events
validateEvents(t, fr)

// Check that the PipelineRun was reconciled correctly
reconciledRun, err := clients.Pipeline.Pipeline().PipelineRuns("foo").Get("test-pipeline-run-with-timeout", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

// The PipelineRun should be timed out.
if reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded).Reason != resources.ReasonTimedOut {
t.Errorf("Expected PipelineRun to be timed out, but condition reason is %s", reconciledRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded))
}

// Check that the expected TaskRun was created
actual := clients.Pipeline.Actions()[0].(ktesting.CreateAction).GetObject().(*v1alpha1.TaskRun)
if actual == nil {
t.Errorf("Expected a TaskRun to be created, but it wasn't.")
}

// There should be a time out for the TaskRun
if actual.Spec.TaskSpec.Timeout == nil {
t.Errorf("TaskSpec.Timeout shouldn't be nil")
}

// The TaskRun's timeout date should be the same as the PipelineRun's start time plus the PipelineRun timeout duration
if actual.CreationTimestamp.Time.Add(actual.Spec.TaskSpec.Timeout.Duration) != reconciledRun.Status.StartTime.Add(ps[0].Spec.Timeout.Duration) {
t.Errorf("run start: %s, run timeout: %s, task timeout: %s", reconciledRun.Status.StartTime.String(), ps[0].Spec.Timeout.String(),
actual.Spec.TaskSpec.Timeout.Duration.String())
}
}
Loading

0 comments on commit b945146

Please sign in to comment.