Skip to content

Commit

Permalink
feat(retries) Design and implement retries
Browse files Browse the repository at this point in the history
closes #221
  • Loading branch information
joseblas committed Apr 4, 2019
1 parent a669012 commit 5c45e67
Show file tree
Hide file tree
Showing 16 changed files with 5,394 additions and 1,636 deletions.
10 changes: 10 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ tasks:
name: build-push
```

There is an optional attribute called `retries`, which declares how many times that task should be retries in case of failure,

```yaml
tasks:
- name: build-the-image
retries: 1
taskRef:
name: build-push
```

[Declared `PipelineResources`](#declared-resources) can be given to `Task`s in
the `Pipeline` as inputs and outputs, for example:

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/pipeline/v1alpha1/pipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ type PipelineTask struct {
Name string `json:"name"`
TaskRef TaskRef `json:"taskRef"`

// Number of retries to be run
// +optional
Retries int `json:"retries",omitempty`

// RunAfter is the list of PipelineTask names that should be executed before
// this Task executes. (Used to force a specific ordering in graph execution.)
// +optional
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/pipeline/v1alpha1/taskrun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ type TaskRunStatus struct {
// Steps describes the state of each build step container.
// +optional
Steps []StepState `json:"steps,omitempty"`
// Failed Status History
// +optional
RetriesStatus []TaskRunStatus `json:"retriesStatus,omitempty"`
}

// GetCondition returns the Condition matching the given type.
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/pipeline/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 31 additions & 7 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,6 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {

c.timeoutHandler.StatusUnlock(original)

if pr.IsDone() {
c.timeoutHandler.Release(pr)
c.Recorder.Event(pr, corev1.EventTypeNormal, eventReasonSucceeded, "PipelineRun completed successfully.")
return nil
}

if err := c.tracker.Track(pr.GetTaskRunRef(), pr); err != nil {
c.Logger.Errorf("Failed to create tracker for TaskRuns for PipelineRun %s: %v", pr.Name, err)
c.Recorder.Event(pr, corev1.EventTypeWarning, eventReasonFailed, "Failed to create tracker for TaskRuns for PipelineRun")
Expand Down Expand Up @@ -273,6 +267,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
func(name string) (v1alpha1.TaskInterface, error) {
return c.taskLister.Tasks(pr.Namespace).Get(name)
},
func(name string) (*v1alpha1.TaskRun, error) {
return c.taskRunLister.TaskRuns(pr.Namespace).Get(name)
},
func(name string) (v1alpha1.TaskInterface, error) {
return c.clusterTaskLister.Get(name)
},
Expand Down Expand Up @@ -309,6 +306,13 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
}
return nil
}

if pipelineState.IsDone() && pr.IsDone() {
c.timeoutHandler.Release(pr)
c.Recorder.Event(pr, corev1.EventTypeNormal, eventReasonSucceeded, "PipelineRun completed successfully.")
return nil
}

if err := resources.ValidateFrom(pipelineState); err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.SetCondition(&duckv1alpha1.Condition{
Expand Down Expand Up @@ -421,7 +425,17 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
}
labels[pipeline.GroupName+pipeline.PipelineRunLabelKey] = pr.Name

tr := &v1alpha1.TaskRun{
tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)
if tr != nil {
//is a retry
addRetryHistory(tr)
tr.Status.SetCondition(&duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionUnknown,
})
return tr, nil
}
tr = &v1alpha1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rprt.TaskRunName,
Namespace: pr.Namespace,
Expand All @@ -446,6 +460,16 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
return c.PipelineClientSet.TektonV1alpha1().TaskRuns(pr.Namespace).Create(tr)
}

func addRetryHistory(task *v1alpha1.TaskRun) {
newStatus := *task.Status.DeepCopy()
newStatus.RetriesStatus = nil
task.Status.RetriesStatus = append(task.Status.RetriesStatus, newStatus)
task.Status.StartTime = nil
task.Status.CompletionTime = nil
task.Status.Results = nil
task.Status.PodName = ""
}

func (c *Reconciler) updateStatus(pr *v1alpha1.PipelineRun) (*v1alpha1.PipelineRun, error) {
newPr, err := c.pipelineRunLister.PipelineRuns(pr.Namespace).Get(pr.Name)
if err != nil {
Expand Down
158 changes: 155 additions & 3 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package pipelinerun

import (
"context"
"fmt"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -457,12 +458,31 @@ func TestReconcileOnCompletedPipelineRun(t *testing.T) {
})),
)}
ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hellow-world")))}
tb.PipelineTask("hello-world-1", "hello-world")))}
ts := []*v1alpha1.Task{tb.Task("hello-world", "foo")}

trs := []*v1alpha1.TaskRun{
tb.TaskRun("hello-world-1", "foo",
tb.TaskRunStatus(
tb.PodName("my-pod-name"),
tb.Condition(duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionTrue,
}),
)),
}

prs[0].Status.TaskRuns = make(map[string]*v1alpha1.PipelineRunTaskRunStatus)
prs[0].Status.TaskRuns["hello-world-1"] = &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: "hello-world-1",
Status: &trs[0].Status,
}

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}

// create fake recorder for testing
Expand All @@ -477,8 +497,14 @@ func TestReconcileOnCompletedPipelineRun(t *testing.T) {
// make sure there is no failed events
validateEvents(t, fr)

if len(clients.Pipeline.Actions()) != 0 {
t.Fatalf("Expected client to not have created a TaskRun for the completed PipelineRun, but it did")
actions := clients.Pipeline.Actions()
for _, action := range actions {
if action != nil {
resource := action.GetResource().Resource
if resource != "pipelineruns" {
t.Fatalf("Expected client to not have created a TaskRun for the completed PipelineRun, but it did")
}
}
}

// Check that the PipelineRun was reconciled correctly
Expand Down Expand Up @@ -685,3 +711,129 @@ func TestReconcilePropagateLabels(t *testing.T) {
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRun, d)
}
}

func TestReconcileWithTimeoutAndRetry(t *testing.T) {

tcs := []struct {
name string
retries int
}{
{
name: "One try has to be done",
retries: 1,
},
{
name: "No more retries are needed",
retries: 2,
},
}

for _, tc := range tcs {

t.Run(tc.name, func(t *testing.T) {
ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline-retry", "foo", tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hello-world", tb.Retries(tc.retries)),
))}
prs := []*v1alpha1.PipelineRun{tb.PipelineRun("test-pipeline-retry-run-with-timeout", "foo",
tb.PipelineRunSpec("test-pipeline-retry",
tb.PipelineRunServiceAccount("test-sa"),
tb.PipelineRunTimeout(&metav1.Duration{Duration: 12 * time.Hour}),
),
tb.PipelineRunStatus(
tb.PipelineRunStartTime(time.Now().AddDate(0, 0, -1))),
)}

ts := []*v1alpha1.Task{
tb.Task("hello-world", "foo"),
}
trs := []*v1alpha1.TaskRun{
tb.TaskRun("hello-world-1", "foo",
tb.TaskRunStatus(
tb.PodName("my-pod-name"),
tb.Condition(duckv1alpha1.Condition{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionUnknown,
}),
tb.Retry(v1alpha1.TaskRunStatus{
Conditions: []duckv1alpha1.Condition{{
Type: duckv1alpha1.ConditionSucceeded,
Status: corev1.ConditionFalse,
}},
}),
)),
}

prtrs := &v1alpha1.PipelineRunTaskRunStatus{
PipelineTaskName: "hello-world-1",
Status: &trs[0].Status,
}
prs[0].Status.TaskRuns = make(map[string]*v1alpha1.PipelineRunTaskRunStatus)
prs[0].Status.TaskRuns["hello-world-1"] = prtrs

d := test.Data{
PipelineRuns: prs,
Pipelines: ps,
Tasks: ts,
TaskRuns: trs,
}

fr := record.NewFakeRecorder(2)

testAssets := getPipelineRunController(d, fr)
c := testAssets.Controller
clients := testAssets.Clients

err := c.Reconciler.Reconcile(context.Background(), "foo/test-pipeline-retry-run-with-timeout")
if err != nil {
t.Errorf("Did not expect to see error when reconciling completed PipelineRun but saw %s", err)
}

// Check that the PipelineRun was reconciled correctly
reconciledRun, err := clients.Pipeline.TektonV1alpha1().PipelineRuns("foo").Get("test-pipeline-retry-run-with-timeout", metav1.GetOptions{})
if err != nil {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

fmt.Printf("TaskRun %v \n", reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(duckv1alpha1.ConditionSucceeded))

if len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus) != tc.retries {
t.Fatalf(" One retries expected but %d ", len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus))
}

if status := reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(duckv1alpha1.ConditionSucceeded).Status; status != corev1.ConditionUnknown {
t.Fatalf("Succedded expected to be Unknown but is %s", status)
}

// Check that the expected TaskRun was created
actual := clients.Pipeline.Actions()[0].(ktesting.UpdateAction).GetObject().(*v1alpha1.PipelineRun)
if actual == nil {
t.Errorf("Expected a TaskRun to be created, but it wasn't.")
}

// The TaskRun timeout should be less than or equal to the PipelineRun timeout.
if actual.Spec.Timeout.Duration > prs[0].Spec.Timeout.Duration {
t.Errorf("TaskRun timeout %s should be less than or equal to PipelineRun timeout %s", actual.Spec.Timeout.Duration.String(), prs[0].Spec.Timeout.Duration.String())
}

err = c.Reconciler.Reconcile(context.Background(), "foo/test-pipeline-retry-run-with-timeout")
if err != nil {
t.Errorf("Did not expect to see error when reconciling completed PipelineRun but saw %s", err)
}

// Check that the PipelineRun was reconciled correctly
reconciledRun2, err3 := clients.Pipeline.TektonV1alpha1().PipelineRuns("foo").Get("test-pipeline-retry-run-with-timeout", metav1.GetOptions{})
if err3 != nil {
t.Fatalf("Somehow had error getting completed reconciled run out of fake client: %s", err)
}

if reconciledRun2 != nil {
fmt.Println("Second recon ", reconciledRun2.Status)
}

if reconciledRun != nil {
fmt.Printf("Retries %v \n", len(reconciledRun.Status.TaskRuns["hello-world-1"].Status.RetriesStatus))
fmt.Printf("condition %v \n", reconciledRun.Status.TaskRuns["hello-world-1"].Status.GetCondition(duckv1alpha1.ConditionSucceeded))
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,37 @@ type ResolvedPipelineRunTask struct {
// state of the PipelineRun.
type PipelineRunState []*ResolvedPipelineRunTask

func (state PipelineRunState) IsDone() (isDone bool) {
isDone = true
for _, t := range state {
if t.TaskRun == nil {
return false
}
status := t.TaskRun.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
retriesDone := len(t.TaskRun.Status.RetriesStatus)
retries := t.PipelineTask.Retries
println(status, retries, retriesDone)
isDone = isDone && (status.IsTrue() || status.IsFalse() && retriesDone > retries)
if !isDone {
return
}
}
return
}

// GetNextTasks will return the next ResolvedPipelineRunTasks to execute, which are the ones in the
// list of candidateTasks which aren't yet indicated in state to be running.
func (state PipelineRunState) GetNextTasks(candidateTasks map[string]v1alpha1.PipelineTask) []*ResolvedPipelineRunTask {
func (state *PipelineRunState) GetNextTasks(candidateTasks map[string]v1alpha1.PipelineTask) []*ResolvedPipelineRunTask {
tasks := []*ResolvedPipelineRunTask{}
for _, t := range state {
for _, t := range *state {
if _, ok := candidateTasks[t.PipelineTask.Name]; ok && t.TaskRun == nil {
tasks = append(tasks, t)
}
if _, ok := candidateTasks[t.PipelineTask.Name]; ok && t.TaskRun != nil {
if len(t.TaskRun.Status.RetriesStatus) < t.PipelineTask.Retries {
tasks = append(tasks, t)
}
}
}
return tasks
}
Expand Down Expand Up @@ -171,6 +194,7 @@ func (e *ResourceNotFoundError) Error() string {
func ResolvePipelineRun(
pipelineRun v1alpha1.PipelineRun,
getTask resources.GetTask,
getTaskRun resources.GetTaskRun,
getClusterTask resources.GetClusterTask,
getResource resources.GetResource,
tasks []v1alpha1.PipelineTask,
Expand Down Expand Up @@ -214,6 +238,10 @@ func ResolvePipelineRun(
}
rprt.ResolvedTaskResources = rtr

taskRun, err := getTaskRun(rprt.TaskRunName)
if taskRun != nil {
rprt.TaskRun = taskRun
}
// Add this task to the state of the PipelineRun
state = append(state, &rprt)
}
Expand Down
Loading

0 comments on commit 5c45e67

Please sign in to comment.