Skip to content

Commit

Permalink
Handle dag in pipelineresoultion
Browse files Browse the repository at this point in the history
In the pipelinerun controller, today we follow this logic:

- build the dag from the spec, using the dag module
- resolve the pipelinerun state using the spec and status, using the
  resources module
- get a list of candidate next tasks from the dag module, passing part
  of the state
- get a list of next tasks from the resources module, using the list
  of candidates and the pipeline run status

The separation of concerns between the dag, resources and reconciler
modules feels a bit mixed up.

This resolves part of the issue, by moving the invocation of the dag
building as well as obtaining the list of candidates to the dag
module, and aggregating the dag and the pipeline state into a new
struct ResolvedPipelineRun.
  • Loading branch information
afrittoli committed Jun 28, 2020
1 parent f7d78a1 commit 911d8de
Show file tree
Hide file tree
Showing 5 changed files with 508 additions and 507 deletions.
11 changes: 10 additions & 1 deletion pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,16 @@ func (ps *PipelineSpec) Validate(ctx context.Context) *apis.FieldError {

// Validate the pipeline task graph
if err := validateGraph(ps.Tasks); err != nil {
return apis.ErrInvalidValue(err.Error(), "spec.tasks")
ferr := apis.ErrInvalidValue(err.Error(), "spec.tasks")
ferr.Details = "Invalid Graph"
return ferr
}

// Validate the pipeline finally graph
if err := validateGraph(ps.Finally); err != nil {
ferr := apis.ErrInvalidValue(err.Error(), "spec.finally")
ferr.Details = "Invalid Graph"
return ferr
}

if err := validateParamResults(ps.Tasks); err != nil {
Expand Down
68 changes: 24 additions & 44 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/tektoncd/pipeline/pkg/contexts"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
Expand Down Expand Up @@ -284,31 +283,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
pr.ObjectMeta.Annotations[key] = value
}

d, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Tasks))
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonInvalidGraph,
"PipelineRun %s/%s's Pipeline DAG is invalid: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}

// build DAG with a list of final tasks, this DAG is used later to identify
// if a task in PipelineRunState is final task or not
// the finally section is optional and might not exist
// dfinally holds an empty Graph in the absence of finally clause
dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally))
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonInvalidGraph,
"PipelineRun %s's Pipeline DAG is invalid for finally clause: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}

if err := pipelineSpec.Validate(ctx); err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonFailedValidation,
var reason = ReasonFailedValidation
if err.Details == "Invalid Graph" {
// When the validation error reports tasks or finally in the error path
// we can use a more specific validation error reason
reason = ReasonInvalidGraph
}
pr.Status.MarkFailed(reason,
"Pipeline %s/%s can't be Run; it has an invalid spec: %s",
pipelineMeta.Namespace, pipelineMeta.Name, err)
return controller.NewPermanentError(err)
Expand Down Expand Up @@ -372,7 +355,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
// pipelineState also holds a taskRun for each pipeline task after the taskRun is created
// pipelineState is instantiated and updated on every reconcile cycle
pipelineState, err := resources.ResolvePipelineRun(ctx,
*pr,
*pipelineSpec, *pr,
func(name string) (v1beta1.TaskInterface, error) {
return c.taskLister.Tasks(pr.Namespace).Get(name)
},
Expand All @@ -385,7 +368,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
func(name string) (*v1alpha1.Condition, error) {
return c.conditionLister.Conditions(pr.Namespace).Get(name)
},
append(pipelineSpec.Tasks, pipelineSpec.Finally...), providedResources,
providedResources,
)

if err != nil {
Expand Down Expand Up @@ -413,7 +396,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return nil
}

for _, rprt := range pipelineState {
for _, rprt := range pipelineState.State() {
err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources)
if err != nil {
logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err)
Expand Down Expand Up @@ -452,11 +435,11 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return controller.NewPermanentError(err)
}

if err := c.runNextSchedulableTask(ctx, pr, d, dfinally, pipelineState, as); err != nil {
if err := c.runNextSchedulableTask(ctx, pr, *pipelineState, as); err != nil {
return err
}

after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d, dfinally)
after := resources.GetPipelineConditionStatus(*pr, *pipelineState, logger)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
Expand All @@ -467,39 +450,36 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState.State())
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current
// pipeline run state, and starts them
// after all DAG tasks are done, it's responsible for scheduling final tasks and start executing them
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, dfinally *dag.Graph, pipelineState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error {

// runNextSchedulableTask gets the next schedulable Tasks based the pipeline run state and starts them
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, pipelineState resources.ResolvedPipelineRun, as artifacts.ArtifactStorageInterface) error {
logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

var nextRprts []*resources.ResolvedPipelineRunTask
var (
nextRprts []*resources.ResolvedPipelineRunTask
err error
)

// when pipeline run is stopping, do not schedule any new task and only
// wait for all running tasks to complete and report their status
if !pipelineState.IsStopping(d) {
// candidateTasks is initialized to DAG root nodes to start pipeline execution
// candidateTasks is derived based on successfully finished tasks and/or skipped tasks
candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulOrSkippedDAGTasks(d)...)
if !pipelineState.IsStopping() {
// nextRprts holds a list of pipeline tasks which should be executed next
nextRprts, err = pipelineState.GetNextTasks()
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return controller.NewPermanentError(err)
}
// nextRprts holds a list of pipeline tasks which should be executed next
nextRprts = pipelineState.GetNextTasks(candidateTasks)
}

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineState.GetFinalTasks(d, dfinally)...)
nextRprts = append(nextRprts, pipelineState.GetFinalTasks()...)

resolvedResultRefs, err := resources.ResolveResultRefs(pipelineState, nextRprts)
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineState.State(), nextRprts)
if err != nil {
logger.Infof("Failed to resolve all task params for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonFailedValidation, err.Error())
Expand Down
16 changes: 9 additions & 7 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ func TestReconcile_InvalidPipelineRuns(t *testing.T) {
))),
tb.PipelineRun("pipeline-invalid-final-graph", tb.PipelineRunNamespace("foo"), tb.PipelineRunSpec("", tb.PipelineRunPipelineSpec(
tb.PipelineTask("dag-task-1", "taskName"),
tb.FinalPipelineTask("final-task-1", "taskName"),
tb.FinalPipelineTask("final-task-1", "taskName")))),
tb.FinalPipelineTask("final-task-1", "taskName", tb.RunAfter("dag-task-2")),
))),
}

d := test.Data{
Expand Down Expand Up @@ -1741,13 +1741,15 @@ func TestReconcileWithConditionChecks(t *testing.T) {
expectedConditionChecks[index] = makeExpectedTr(condition.Name, ccNames[condition.Name], condition.Labels, condition.Annotations)
}

// Check that the expected TaskRun was created
condCheck0 := clients.Pipeline.Actions()[1].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun)
condCheck1 := clients.Pipeline.Actions()[2].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun)
if condCheck0 == nil || condCheck1 == nil {
t.Errorf("Expected two ConditionCheck TaskRuns to be created, but it wasn't.")
// Check that the expected TaskRun were created
actions := clients.Pipeline.Actions()
if !actions[1].Matches("create", "taskruns") || !actions[2].Matches("create", "taskruns") {
t.Fatalf("Expected two ConditionCheck TaskRuns to be created, got instead %d actions: %#v", len(actions), actions)
}

condCheck0 := actions[1].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun)
condCheck1 := actions[2].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun)

actual := []*v1beta1.TaskRun{condCheck0, condCheck1}
if d := cmp.Diff(actual, expectedConditionChecks); d != "" {
t.Errorf("expected to see 2 ConditionCheck TaskRuns created. Diff %s", diff.PrintWantGot(d))
Expand Down
Loading

0 comments on commit 911d8de

Please sign in to comment.