Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assume task not skipped if the run is associated #4583

Merged
merged 2 commits into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
// when expressions, so reset the skipped cache
pipelineRunFacts.ResetSkippedCache()

// GetFinalTasks only returns tasks when a DAG is complete
// GetFinalTasks only returns final tasks when a DAG is complete
fnextRprts := pipelineRunFacts.GetFinalTasks()
if len(fnextRprts) != 0 {
// apply the runtime context just before creating taskRuns for final tasks in queue
Expand Down
13 changes: 11 additions & 2 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ func (t ResolvedPipelineRunTask) isCancelled() bool {
}
}

// isScheduled returns true when the PipelineRunTask itself has a TaskRun
// or Run associated.
func (t ResolvedPipelineRunTask) isScheduled() bool {
if t.IsCustomTask() {
return t.Run != nil
}
return t.TaskRun != nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afrittoli @jerop @vdemeester my understanding was this is not possible 🙃 A taskRun or run is associated must have a succeeded condition with unknown i.e. running status.

We rely on the same check for the non-final tasks:

case facts.isFinalTask(t.PipelineTask.Name) || t.IsStarted():

Copy link
Contributor Author

@devholic devholic Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not sure about the root cause (my current assumption is a timing issue), but I think it makes sense to check if Run exists only and change GetFinalTasks() to use it instead of IsSuccessful()

// GetFinalTasks returns a list of final tasks without any taskRun associated with it
// GetFinalTasks returns final tasks only when all DAG tasks have finished executing successfully or skipped or
// any one DAG task resulted in failure
func (facts *PipelineRunFacts) GetFinalTasks() PipelineRunState {
	tasks := PipelineRunState{}
	finalCandidates := sets.NewString()
	// check either pipeline has finished executing all DAG pipelineTasks
	// or any one of the DAG pipelineTask has failed
	if facts.checkDAGTasksDone() {
		// return list of tasks with all final tasks
		for _, t := range facts.State {
			// if facts.isFinalTask(t.PipelineTask.Name) && !t.IsSuccessful() {
			if facts.isFinalTask(t.PipelineTask.Name) && !t.IsScheduled() {
				finalCandidates.Insert(t.PipelineTask.Name)
			}
		}
		tasks = facts.State.getNextTasks(finalCandidates)
	}
	return tasks
}

Whether it has a Succeeded-type condition or not, I think we can assume the task is scheduled since the reconciler found that it has associated Run.

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pritidesai also thought that a TaskRun or Run must have a ConditionSucceeded - but it looks like we don't initialize it when we create a TaskRun in the PipelineRun reconciler:

func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun, storageBasePath string, getTimeoutFunc getTimeoutFunc) (*v1beta1.TaskRun, error) {
logger := logging.FromContext(ctx)
tr, _ := c.taskRunLister.TaskRuns(pr.Namespace).Get(rprt.TaskRunName)
if tr != nil {
// Don't modify the lister cache's copy.
tr = tr.DeepCopy()
// is a retry
addRetryHistory(tr)
clearStatus(tr)
tr.Status.MarkResourceOngoing("", "")
logger.Infof("Updating taskrun %s with cleared status and retry history (length: %d).", tr.GetName(), len(tr.Status.RetriesStatus))
return c.PipelineClientSet.TektonV1beta1().TaskRuns(pr.Namespace).UpdateStatus(ctx, tr, metav1.UpdateOptions{})
}
rprt.PipelineTask = resources.ApplyPipelineTaskContexts(rprt.PipelineTask)
taskRunSpec := pr.GetTaskRunSpec(rprt.PipelineTask.Name)
tr = &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: rprt.TaskRunName,
Namespace: pr.Namespace,
OwnerReferences: []metav1.OwnerReference{*kmeta.NewControllerRef(pr)},
Labels: combineTaskRunAndTaskSpecLabels(pr, rprt.PipelineTask),
Annotations: combineTaskRunAndTaskSpecAnnotations(pr, rprt.PipelineTask),
},
Spec: v1beta1.TaskRunSpec{
Params: rprt.PipelineTask.Params,
ServiceAccountName: taskRunSpec.TaskServiceAccountName,
Timeout: getTimeoutFunc(ctx, pr, rprt, c.Clock),
PodTemplate: taskRunSpec.TaskPodTemplate,
StepOverrides: taskRunSpec.StepOverrides,
SidecarOverrides: taskRunSpec.SidecarOverrides,
}}
if rprt.ResolvedTaskResources.TaskName != "" {
// We pass the entire, original task ref because it may contain additional references like a Bundle url.
tr.Spec.TaskRef = rprt.PipelineTask.TaskRef
} else if rprt.ResolvedTaskResources.TaskSpec != nil {
tr.Spec.TaskSpec = rprt.ResolvedTaskResources.TaskSpec
}
var pipelinePVCWorkspaceName string
var err error
tr.Spec.Workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(pr, rprt)
if err != nil {
return nil, err
}
if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" {
tr.Annotations[workspace.AnnotationAffinityAssistantName] = getAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name)
}
resources.WrapSteps(&tr.Spec, rprt.PipelineTask, rprt.ResolvedTaskResources.Inputs, rprt.ResolvedTaskResources.Outputs, storageBasePath)
logger.Infof("Creating a new TaskRun object %s for pipeline task %s", rprt.TaskRunName, rprt.PipelineTask.Name)
return c.PipelineClientSet.TektonV1beta1().TaskRuns(pr.Namespace).Create(ctx, tr, metav1.CreateOptions{})
}

Then we initialize ConditionSucceeded in the TaskRun reconciler:

if !tr.HasStarted() {
tr.Status.InitializeConditions()

If the above is correct, that a TaskRun or Run can exist without ConditionsSucceeded, then it makes sense to either initialize ConditionsSucceeded to Unknown in createTaskRun/createRun functions or check for existence of TaskRun or Run instead of ConditionsSucceeded which could be missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerop thanks for the review! 🙂

I prefer checking the existence of TaskRun or Run instead of initializing Conditions, since it is not possible to create resources with Status (as far as I know - please let me know if this is wrong 🙏 ) - which may cause this issue again since the reconciliation time is not guaranteed for the resources.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devholic makes sense to go with that option 👍🏾

cc @tektoncd/core-maintainers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the TaskRun condition is set by the TaskRun reconciler on the first reconciliation loop.
In the TaskRun controller, at least within the ReconcileKind function, the condition will always be set as initialising the condition is the first thing we do in ReconcileKind.
In the PipelineRun controller instead there is no guarantee. It may not be very likely to catch a TaskRun with no condition set, but it is certainly possible and I guess it may happen more under high load.

}

// isStarted returns true only if the PipelineRunTask itself has a TaskRun or
// Run associated that has a Succeeded-type condition.
func (t ResolvedPipelineRunTask) isStarted() bool {
Expand Down Expand Up @@ -249,7 +258,7 @@ func (t *ResolvedPipelineRunTask) skip(facts *PipelineRunFacts) TaskSkipStatus {
var skippingReason v1beta1.SkippingReason

switch {
case facts.isFinalTask(t.PipelineTask.Name) || t.isStarted():
case facts.isFinalTask(t.PipelineTask.Name) || t.isScheduled():
skippingReason = v1beta1.None
case facts.IsStopping():
skippingReason = v1beta1.StoppingSkip
Expand Down Expand Up @@ -349,7 +358,7 @@ func (t *ResolvedPipelineRunTask) IsFinallySkipped(facts *PipelineRunFacts) Task
var skippingReason v1beta1.SkippingReason

switch {
case t.isStarted():
case t.isScheduled():
skippingReason = v1beta1.None
case facts.checkDAGTasksDone() && facts.isFinalTask(t.PipelineTask.Name):
switch {
Expand Down
177 changes: 177 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ var trs = []v1beta1.TaskRun{{
Name: "pipelinerun-mytask2",
},
Spec: v1beta1.TaskRunSpec{},
}, {
ObjectMeta: metav1.ObjectMeta{
Namespace: "namespace",
Name: "pipelinerun-mytask4",
},
Spec: v1beta1.TaskRunSpec{},
}}

var runs = []v1alpha1.Run{{
Expand Down Expand Up @@ -190,6 +196,12 @@ var matrixedPipelineTask = &v1beta1.PipelineTask{
}},
}

func makeScheduled(tr v1beta1.TaskRun) *v1beta1.TaskRun {
newTr := newTaskRun(tr)
newTr.Status = v1beta1.TaskRunStatus{ /* explicitly empty */ }
return newTr
}

func makeStarted(tr v1beta1.TaskRun) *v1beta1.TaskRun {
newTr := newTaskRun(tr)
newTr.Status.Conditions[0].Status = corev1.ConditionUnknown
Expand Down Expand Up @@ -375,6 +387,38 @@ var oneFailedState = PipelineRunState{{
},
}}

var finalScheduledState = PipelineRunState{{
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: makeSucceeded(trs[0]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}, {
PipelineTask: &pts[1],
TaskRunName: "pipelinerun-mytask2",
TaskRun: makeScheduled(trs[1]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}}

var retryableFinalState = PipelineRunState{{
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
TaskRun: makeSucceeded(trs[0]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}, {
PipelineTask: &pts[3],
TaskRunName: "pipelinerun-mytask4",
TaskRun: makeFailed(trs[2]),
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskSpec: &task.Spec,
},
}}

var allFinishedState = PipelineRunState{{
PipelineTask: &pts[0],
TaskRunName: "pipelinerun-mytask1",
Expand Down Expand Up @@ -546,6 +590,13 @@ func TestIsSkipped(t *testing.T) {
expected: map[string]bool{
"mytask5": false,
},
}, {
name: "tasks-scheduled",
state: finalScheduledState,
expected: map[string]bool{
"mytask1": false,
"mytask2": false,
},
}, {
name: "tasks-parent-failed",
state: PipelineRunState{{
Expand Down Expand Up @@ -2656,6 +2707,132 @@ func TestResolvedPipelineRunTask_IsFinallySkipped(t *testing.T) {
}
}

func TestResolvedPipelineRunTask_IsFinallySkippedByCondition(t *testing.T) {
task := &ResolvedPipelineRunTask{
TaskRunName: "dag-task",
TaskRun: &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "dag-task",
},
Status: v1beta1.TaskRunStatus{
Status: duckv1beta1.Status{
Conditions: []apis.Condition{{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
}},
},
},
},
PipelineTask: &v1beta1.PipelineTask{
Name: "dag-task",
TaskRef: &v1beta1.TaskRef{Name: "task"},
},
}
for _, tc := range []struct {
name string
state PipelineRunState
want TaskSkipStatus
}{{
name: "task started",
state: PipelineRunState{
task,
{
TaskRun: &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "final-task",
},
Status: v1beta1.TaskRunStatus{
Status: duckv1beta1.Status{
Conditions: []apis.Condition{{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionUnknown,
}},
},
},
},
PipelineTask: &v1beta1.PipelineTask{
Name: "final-task",
TaskRef: &v1beta1.TaskRef{Name: "task"},
WhenExpressions: []v1beta1.WhenExpression{
{
Input: "$(tasks.dag-task.status)",
Operator: selection.In,
Values: []string{"Succeeded"},
},
},
},
},
},
want: TaskSkipStatus{
IsSkipped: false,
SkippingReason: v1beta1.None,
},
}, {
name: "task scheduled",
state: PipelineRunState{
task,
{
TaskRunName: "final-task",
TaskRun: &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Name: "final-task",
},
Status: v1beta1.TaskRunStatus{
Status: duckv1beta1.Status{
Conditions: []apis.Condition{ /* explicitly empty */ },
},
},
},
PipelineTask: &v1beta1.PipelineTask{
Name: "final-task",
TaskRef: &v1beta1.TaskRef{Name: "task"},
WhenExpressions: []v1beta1.WhenExpression{
{
Input: "$(tasks.dag-task.status)",
Operator: selection.In,
Values: []string{"Succeeded"},
},
},
},
}},
want: TaskSkipStatus{
IsSkipped: false,
SkippingReason: v1beta1.None,
},
}} {
t.Run(tc.name, func(t *testing.T) {
tasks := v1beta1.PipelineTaskList([]v1beta1.PipelineTask{*tc.state[0].PipelineTask})
d, err := dag.Build(tasks, tasks.Deps())
if err != nil {
t.Fatalf("Could not get a dag from the dag tasks %#v: %v", tc.state[0], err)
}

// build graph with finally tasks
var pts v1beta1.PipelineTaskList
for _, state := range tc.state[1:] {
pts = append(pts, *state.PipelineTask)
}
dfinally, err := dag.Build(pts, map[string][]string{})
if err != nil {
t.Fatalf("Could not get a dag from the finally tasks %#v: %v", pts, err)
}

facts := &PipelineRunFacts{
State: tc.state,
TasksGraph: d,
FinalTasksGraph: dfinally,
}

for _, state := range tc.state[1:] {
got := state.IsFinallySkipped(facts)
if d := cmp.Diff(tc.want, got); d != "" {
t.Errorf("IsFinallySkipped: %s", diff.PrintWantGot(d))
}
}
})
}
}

func TestResolvedPipelineRunTask_IsFinalTask(t *testing.T) {
tr := &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (facts *PipelineRunFacts) DAGExecutionQueue() (PipelineRunState, error) {
return tasks, nil
}

// GetFinalTasks returns a list of final tasks without any taskRun associated with it
// GetFinalTasks returns a list of final tasks which needs to be executed next
// GetFinalTasks returns final tasks only when all DAG tasks have finished executing or have been skipped
func (facts *PipelineRunFacts) GetFinalTasks() PipelineRunState {
tasks := PipelineRunState{}
Expand All @@ -412,7 +412,7 @@ func (facts *PipelineRunFacts) GetFinalTasks() PipelineRunState {
if facts.checkDAGTasksDone() {
// return list of tasks with all final tasks
for _, t := range facts.State {
if facts.isFinalTask(t.PipelineTask.Name) && !t.isSuccessful() {
if facts.isFinalTask(t.PipelineTask.Name) {
finalCandidates.Insert(t.PipelineTask.Name)
}
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/reconciler/pipelinerun/resources/pipelinerunstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ func TestIsBeforeFirstTaskRun_WithStartedRun(t *testing.T) {
t.Fatalf("Expected state to be after first taskrun (Run test)")
}
}
func TestIsBeforeFirstTaskRun_WithSucceededTask(t *testing.T) {
if finalScheduledState.IsBeforeFirstTaskRun() {
t.Fatalf("Expected state to be after first taskrun")
}
}

func TestGetNextTasks(t *testing.T) {
tcs := []struct {
Expand Down Expand Up @@ -354,6 +359,26 @@ func TestGetNextTasks(t *testing.T) {
state: oneFailedState,
candidates: sets.NewString("mytask1", "mytask2"),
expectedNext: []*ResolvedPipelineRunTask{oneFailedState[1]},
}, {
name: "final-task-scheduled-no-candidates",
state: finalScheduledState,
candidates: sets.NewString(),
expectedNext: []*ResolvedPipelineRunTask{},
}, {
name: "final-task-finished-one-candidate",
state: finalScheduledState,
candidates: sets.NewString("mytask1"),
expectedNext: []*ResolvedPipelineRunTask{},
}, {
name: "final-task-finished-other-candidate",
state: finalScheduledState,
candidates: sets.NewString("mytask2"),
expectedNext: []*ResolvedPipelineRunTask{},
}, {
name: "final-task-finished-both-candidate",
state: finalScheduledState,
candidates: sets.NewString("mytask1", "mytask2"),
expectedNext: []*ResolvedPipelineRunTask{},
}, {
name: "all-finished-no-candidates",
state: allFinishedState,
Expand Down Expand Up @@ -1160,6 +1185,24 @@ func TestPipelineRunState_GetFinalTasks(t *testing.T) {
DAGTasks: []v1beta1.PipelineTask{pts[0]},
finalTasks: []v1beta1.PipelineTask{pts[1]},
expectedFinalTasks: PipelineRunState{oneFinishedState[1]},
}, {
// tasks: [ mytask1]
// finally: [mytask2]
name: "06 - DAG tasks succeeded, final tasks scheduled - no final tasks",
desc: "DAG task (mytask1) finished successfully - final task (mytask2) scheduled - no final tasks",
state: finalScheduledState,
DAGTasks: []v1beta1.PipelineTask{pts[0]},
finalTasks: []v1beta1.PipelineTask{pts[1]},
expectedFinalTasks: PipelineRunState{},
}, {
// tasks: [ mytask1]
// finally: [mytask4]
name: "07 - DAG tasks succeeded, return retryable final tasks",
desc: "DAG task (mytask1) finished successfully - retry failed final tasks (mytask4)",
state: retryableFinalState,
DAGTasks: []v1beta1.PipelineTask{pts[0]},
finalTasks: []v1beta1.PipelineTask{pts[3]},
expectedFinalTasks: PipelineRunState{retryableFinalState[1]},
}}
for _, tc := range tcs {
dagGraph, err := dag.Build(v1beta1.PipelineTaskList(tc.DAGTasks), v1beta1.PipelineTaskList(tc.DAGTasks).Deps())
Expand Down