Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactoring PipelineRunState to simplify logic for retrieving next tasks #3254

Merged
merged 1 commit into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return controller.NewPermanentError(err)
}

for _, rprt := range pipelineRunState {
// Build PipelineRunFacts with a list of resolved pipeline tasks,
// dag tasks graph and final tasks graph
pipelineRunFacts := &resources.PipelineRunFacts{
State: pipelineRunState,
TasksGraph: d,
FinalTasksGraph: dfinally,
}

for _, rprt := range pipelineRunFacts.State {
err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources)
if err != nil {
logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err)
Expand All @@ -436,7 +444,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}

if pipelineRunState.IsBeforeFirstTaskRun() {
if pipelineRunFacts.State.IsBeforeFirstTaskRun() {
if pr.HasVolumeClaimTemplate() {
// create workspace PVC from template
if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(pr.Spec.Workspaces, pr.GetOwnerReference(), pr.Namespace); err != nil {
Expand Down Expand Up @@ -466,11 +474,11 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return controller.NewPermanentError(err)
}

if err := c.runNextSchedulableTask(ctx, pr, d, dfinally, pipelineRunState, as); err != nil {
if err := c.runNextSchedulableTask(ctx, pr, pipelineRunFacts, as); err != nil {
return err
}

after := pipelineRunState.GetPipelineConditionStatus(pr, logger, d, dfinally)
after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
Expand All @@ -481,37 +489,28 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
pr.Status.TaskRuns = pipelineRunState.GetTaskRunsStatus(pr)
pr.Status.SkippedTasks = pipelineRunState.GetSkippedTasks(pr, d)
pr.Status.TaskRuns = pipelineRunFacts.State.GetTaskRunsStatus(pr)
pr.Status.SkippedTasks = pipelineRunFacts.GetSkippedTasks()
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current
// pipeline run state, and starts them
// after all DAG tasks are done, it's responsible for scheduling final tasks and start executing them
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, dfinally *dag.Graph, pipelineRunState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error {
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, pipelineRunFacts *resources.PipelineRunFacts, as artifacts.ArtifactStorageInterface) error {

logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

var nextRprts []*resources.ResolvedPipelineRunTask

// when pipeline run is stopping, do not schedule any new task and only
// wait for all running tasks to complete and report their status
if !pipelineRunState.IsStopping(d) {
// candidateTasks is initialized to DAG root nodes to start pipeline execution
// candidateTasks is derived based on successfully finished tasks and/or skipped tasks
candidateTasks, err := dag.GetSchedulable(d, pipelineRunState.SuccessfulOrSkippedDAGTasks(d)...)
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return controller.NewPermanentError(err)
}
// nextRprts holds a list of pipeline tasks which should be executed next
nextRprts = pipelineRunState.GetNextTasks(candidateTasks)
// nextRprts holds a list of pipeline tasks which should be executed next
nextRprts, err := pipelineRunFacts.DAGExecutionQueue()
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return controller.NewPermanentError(err)
}

resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunState, nextRprts)
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunFacts.State, nextRprts)
if err != nil {
logger.Infof("Failed to resolve all task params for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonFailedValidation, err.Error())
Expand All @@ -521,10 +520,10 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineRunState.GetFinalTasks(d, dfinally)...)
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)

for _, rprt := range nextRprts {
if rprt == nil || rprt.Skip(pipelineRunState, d) {
if rprt == nil || rprt.Skip(pipelineRunFacts) {
continue
}
if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() {
Expand Down
38 changes: 19 additions & 19 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/tektoncd/pipeline/pkg/contexts"
"github.com/tektoncd/pipeline/pkg/list"
"github.com/tektoncd/pipeline/pkg/names"
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
)

Expand Down Expand Up @@ -134,17 +133,15 @@ func (t ResolvedPipelineRunTask) IsStarted() bool {
return true
}

func (t *ResolvedPipelineRunTask) checkParentsDone(state PipelineRunState, d *dag.Graph) bool {
stateMap := state.ToMap()
func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool {
stateMap := facts.State.ToMap()
// check if parent tasks are done executing,
// if any of the parents is not yet scheduled or still running,
// wait for it to complete before evaluating when expressions
node := d.Nodes[t.PipelineTask.Name]
if isTaskInGraph(t.PipelineTask.Name, d) {
for _, p := range node.Prev {
if !stateMap[p.Task.HashKey()].IsDone() {
return false
}
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
for _, p := range node.Prev {
if !stateMap[p.Task.HashKey()].IsDone() {
return false
}
}
return true
Expand All @@ -156,7 +153,12 @@ func (t *ResolvedPipelineRunTask) checkParentsDone(state PipelineRunState, d *da
// (3) its parent task was skipped
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means Skip returns false if a conditionCheck is in progress
func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) bool {
func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
// finally tasks are never skipped. If this is a final task, return false
if facts.isFinalTask(t.PipelineTask.Name) {
return false
}

// it already has TaskRun associated with it - PipelineTask not skipped
if t.IsStarted() {
return false
Expand All @@ -170,7 +172,7 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo
}

// Check if the when expressions are false, based on the input's relationship to the values
if t.checkParentsDone(state, d) {
if t.checkParentsDone(facts) {
if len(t.PipelineTask.WhenExpressions) > 0 {
if !t.PipelineTask.WhenExpressions.HaveVariables() {
if !t.PipelineTask.WhenExpressions.AllowsExecution() {
Expand All @@ -181,19 +183,17 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo
}

// Skip the PipelineTask if pipeline is in stopping state
if isTaskInGraph(t.PipelineTask.Name, d) && state.IsStopping(d) {
if facts.IsStopping() {
return true
}

stateMap := state.ToMap()
stateMap := facts.State.ToMap()
// Recursively look at parent tasks to see if they have been skipped,
// if any of the parents have been skipped, skip as well
node := d.Nodes[t.PipelineTask.Name]
if isTaskInGraph(t.PipelineTask.Name, d) {
for _, p := range node.Prev {
if stateMap[p.Task.HashKey()].Skip(state, d) {
return true
}
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
for _, p := range node.Prev {
if stateMap[p.Task.HashKey()].Skip(facts) {
return true
}
}
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func TestIsSkipped(t *testing.T) {

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
dag, err := DagFromState(tc.state)
d, err := DagFromState(tc.state)
if err != nil {
t.Fatalf("Could not get a dag from the TC state %#v: %v", tc.state, err)
}
Expand All @@ -844,7 +844,12 @@ func TestIsSkipped(t *testing.T) {
if rprt == nil {
t.Fatalf("Could not get task %s from the state: %v", tc.taskName, tc.state)
}
isSkipped := rprt.Skip(tc.state, dag)
facts := PipelineRunFacts{
State: tc.state,
TasksGraph: d,
FinalTasksGraph: &dag.Graph{},
}
isSkipped := rprt.Skip(&facts)
if d := cmp.Diff(isSkipped, tc.expected); d != "" {
t.Errorf("Didn't get expected isSkipped %s", diff.PrintWantGot(d))
}
Expand Down
Loading