Skip to content

Commit

Permalink
refactoring pipelinerunstate
Browse files Browse the repository at this point in the history
Introducing PipelineRunFacts as a combination of PipelineRunState, DAG
Graph, and finally Graph. This simplifies function definitions without having
to pass graphs to PipelineRunState attributes.
  • Loading branch information
pritidesai committed Sep 18, 2020
1 parent d18e94f commit f412b11
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 133 deletions.
47 changes: 23 additions & 24 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return controller.NewPermanentError(err)
}

for _, rprt := range pipelineRunState {
// Build PipelineRunFacts with a list of resolved pipeline tasks,
// dag tasks graph and final tasks graph
pipelineRunFacts := &resources.PipelineRunFacts{
State: pipelineRunState,
TasksGraph: d,
FinalTasksGraph: dfinally,
}

for _, rprt := range pipelineRunFacts.State {
err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources)
if err != nil {
logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err)
Expand All @@ -436,7 +444,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
}

if pipelineRunState.IsBeforeFirstTaskRun() {
if pipelineRunFacts.State.IsBeforeFirstTaskRun() {
if pr.HasVolumeClaimTemplate() {
// create workspace PVC from template
if err = c.pvcHandler.CreatePersistentVolumeClaimsForWorkspaces(pr.Spec.Workspaces, pr.GetOwnerReference(), pr.Namespace); err != nil {
Expand Down Expand Up @@ -466,11 +474,11 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return controller.NewPermanentError(err)
}

if err := c.runNextSchedulableTask(ctx, pr, d, dfinally, pipelineRunState, as); err != nil {
if err := c.runNextSchedulableTask(ctx, pr, pipelineRunFacts, as); err != nil {
return err
}

after := pipelineRunState.GetPipelineConditionStatus(pr, logger, d, dfinally)
after := pipelineRunFacts.GetPipelineConditionStatus(pr, logger)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
Expand All @@ -481,37 +489,28 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
pr.Status.TaskRuns = pipelineRunState.GetTaskRunsStatus(pr)
pr.Status.SkippedTasks = pipelineRunState.GetSkippedTasks(pr, d)
pr.Status.TaskRuns = pipelineRunFacts.State.GetTaskRunsStatus(pr)
pr.Status.SkippedTasks = pipelineRunFacts.GetSkippedTasks()
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current
// pipeline run state, and starts them
// after all DAG tasks are done, it's responsible for scheduling final tasks and start executing them
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, dfinally *dag.Graph, pipelineRunState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error {
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, pipelineRunFacts *resources.PipelineRunFacts, as artifacts.ArtifactStorageInterface) error {

logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

var nextRprts []*resources.ResolvedPipelineRunTask

// when pipeline run is stopping, do not schedule any new task and only
// wait for all running tasks to complete and report their status
if !pipelineRunState.IsStopping(d) {
// candidateTasks is initialized to DAG root nodes to start pipeline execution
// candidateTasks is derived based on successfully finished tasks and/or skipped tasks
candidateTasks, err := dag.GetSchedulable(d, pipelineRunState.SuccessfulOrSkippedDAGTasks(d)...)
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return controller.NewPermanentError(err)
}
// nextRprts holds a list of pipeline tasks which should be executed next
nextRprts = pipelineRunState.GetNextTasks(candidateTasks)
// nextRprts holds a list of pipeline tasks which should be executed next
nextRprts, err := pipelineRunFacts.DAGExecutionQueue()
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return controller.NewPermanentError(err)
}

resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunState, nextRprts)
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineRunFacts.State, nextRprts)
if err != nil {
logger.Infof("Failed to resolve all task params for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonFailedValidation, err.Error())
Expand All @@ -521,10 +520,10 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
resources.ApplyTaskResults(nextRprts, resolvedResultRefs)

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineRunState.GetFinalTasks(d, dfinally)...)
nextRprts = append(nextRprts, pipelineRunFacts.GetFinalTasks()...)

for _, rprt := range nextRprts {
if rprt == nil || rprt.Skip(pipelineRunState, d) {
if rprt == nil || rprt.Skip(pipelineRunFacts) {
continue
}
if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() {
Expand Down
38 changes: 19 additions & 19 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/tektoncd/pipeline/pkg/contexts"
"github.com/tektoncd/pipeline/pkg/list"
"github.com/tektoncd/pipeline/pkg/names"
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
)

Expand Down Expand Up @@ -134,17 +133,15 @@ func (t ResolvedPipelineRunTask) IsStarted() bool {
return true
}

func (t *ResolvedPipelineRunTask) checkParentsDone(state PipelineRunState, d *dag.Graph) bool {
stateMap := state.ToMap()
func (t *ResolvedPipelineRunTask) checkParentsDone(facts *PipelineRunFacts) bool {
stateMap := facts.State.ToMap()
// check if parent tasks are done executing,
// if any of the parents is not yet scheduled or still running,
// wait for it to complete before evaluating when expressions
node := d.Nodes[t.PipelineTask.Name]
if isTaskInGraph(t.PipelineTask.Name, d) {
for _, p := range node.Prev {
if !stateMap[p.Task.HashKey()].IsDone() {
return false
}
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
for _, p := range node.Prev {
if !stateMap[p.Task.HashKey()].IsDone() {
return false
}
}
return true
Expand All @@ -156,7 +153,12 @@ func (t *ResolvedPipelineRunTask) checkParentsDone(state PipelineRunState, d *da
// (3) its parent task was skipped
// (4) Pipeline is in stopping state (one of the PipelineTasks failed)
// Note that this means Skip returns false if a conditionCheck is in progress
func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) bool {
func (t *ResolvedPipelineRunTask) Skip(facts *PipelineRunFacts) bool {
// finally tasks are never skipped. If this is a final task, return false
if facts.isFinalTask(t.PipelineTask.Name) {
return false
}

// it already has TaskRun associated with it - PipelineTask not skipped
if t.IsStarted() {
return false
Expand All @@ -170,7 +172,7 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo
}

// Check if the when expressions are false, based on the input's relationship to the values
if t.checkParentsDone(state, d) {
if t.checkParentsDone(facts) {
if len(t.PipelineTask.WhenExpressions) > 0 {
if !t.PipelineTask.WhenExpressions.HaveVariables() {
if !t.PipelineTask.WhenExpressions.AllowsExecution() {
Expand All @@ -181,19 +183,17 @@ func (t *ResolvedPipelineRunTask) Skip(state PipelineRunState, d *dag.Graph) boo
}

// Skip the PipelineTask if pipeline is in stopping state
if isTaskInGraph(t.PipelineTask.Name, d) && state.IsStopping(d) {
if facts.IsStopping() {
return true
}

stateMap := state.ToMap()
stateMap := facts.State.ToMap()
// Recursively look at parent tasks to see if they have been skipped,
// if any of the parents have been skipped, skip as well
node := d.Nodes[t.PipelineTask.Name]
if isTaskInGraph(t.PipelineTask.Name, d) {
for _, p := range node.Prev {
if stateMap[p.Task.HashKey()].Skip(state, d) {
return true
}
node := facts.TasksGraph.Nodes[t.PipelineTask.Name]
for _, p := range node.Prev {
if stateMap[p.Task.HashKey()].Skip(facts) {
return true
}
}
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ func TestIsSkipped(t *testing.T) {

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
dag, err := DagFromState(tc.state)
d, err := DagFromState(tc.state)
if err != nil {
t.Fatalf("Could not get a dag from the TC state %#v: %v", tc.state, err)
}
Expand All @@ -844,7 +844,12 @@ func TestIsSkipped(t *testing.T) {
if rprt == nil {
t.Fatalf("Could not get task %s from the state: %v", tc.taskName, tc.state)
}
isSkipped := rprt.Skip(tc.state, dag)
facts := PipelineRunFacts{
State: tc.state,
TasksGraph: d,
FinalTasksGraph: &dag.Graph{},
}
isSkipped := rprt.Skip(&facts)
if d := cmp.Diff(isSkipped, tc.expected); d != "" {
t.Errorf("Didn't get expected isSkipped %s", diff.PrintWantGot(d))
}
Expand Down
Loading

0 comments on commit f412b11

Please sign in to comment.