Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle dag in pipelineresoultion #2821

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,16 @@ func (ps *PipelineSpec) Validate(ctx context.Context) *apis.FieldError {

// Validate the pipeline task graph
if err := validateGraph(ps.Tasks); err != nil {
return apis.ErrInvalidValue(err.Error(), "spec.tasks")
ferr := apis.ErrInvalidValue(err.Error(), "spec.tasks")
ferr.Details = "Invalid Graph"
return ferr
}

// Validate the pipeline finally graph
if err := validateGraph(ps.Finally); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should not be added here. finally section is a list of tasks not graph from users perspective. Tasks are validated to not include runAfter here. This check will result in confusion. I would rather have the checks be done explicitly for runAfter the way its implemented.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason I added the check here was to be consistent with the current behaviour. We have a unit test right now that enforces emitting an InvalidDagError to the user in case the finally DAG is not valid, and the only way to keep that with the new code org was to add it here.

I'd be happy to drop this and drop the unit test and just bubble up any validation error as it's reported by the dag module directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please, will add it back when we change finally to be graph instead of list, thanks 🙏

ferr := apis.ErrInvalidValue(err.Error(), "spec.finally")
ferr.Details = "Invalid Graph"
return ferr
}

if err := validateParamResults(ps.Tasks); err != nil {
Expand Down
68 changes: 24 additions & 44 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/tektoncd/pipeline/pkg/contexts"
"github.com/tektoncd/pipeline/pkg/reconciler"
"github.com/tektoncd/pipeline/pkg/reconciler/events"
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
"github.com/tektoncd/pipeline/pkg/reconciler/pipelinerun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/taskrun"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
Expand Down Expand Up @@ -284,31 +283,15 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
pr.ObjectMeta.Annotations[key] = value
}

d, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Tasks))
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonInvalidGraph,
"PipelineRun %s/%s's Pipeline DAG is invalid: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}

// build DAG with a list of final tasks, this DAG is used later to identify
// if a task in PipelineRunState is final task or not
// the finally section is optional and might not exist
// dfinally holds an empty Graph in the absence of finally clause
dfinally, err := dag.Build(v1beta1.PipelineTaskList(pipelineSpec.Finally))
if err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonInvalidGraph,
"PipelineRun %s's Pipeline DAG is invalid for finally clause: %s",
pr.Namespace, pr.Name, err)
return controller.NewPermanentError(err)
}

if err := pipelineSpec.Validate(ctx); err != nil {
// This Run has failed, so we need to mark it as failed and stop reconciling it
pr.Status.MarkFailed(ReasonFailedValidation,
var reason = ReasonFailedValidation
if err.Details == "Invalid Graph" {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the graph is created in pipelinerunresources, validation happens first. In this context it was not clear that the error was an invalid graph one, so I added some extra context in the error to be able to catch it and use the specific reason.
The alternatives are:

  • skip validation here, but that's only a temp solution
  • move validation into ResolvePipelineRun - which might make sense, but not in this PR
  • simply return a validation error and rely on the error stack to provide more insight to the user
    I wonder if we really need to run validation here, but I guess we will need to do once tasks / pipelines may come from a container registry or git url.

Copy link
Member

@pritidesai pritidesai Jun 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afrittoli pipelineSpec is being validated here for many different checks/conditions in addition to invalidGraph. I think we do need to run validation here, I dont see validation happening else where, am I missing something? 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could certainly move the check on invalid graph to ResolvePipelineRun.

Also, such reason could be generated where the error is getting created in pipeline_validation.go instead of calculating here. Its a great idea to set the reason to specific validation error in addition to returning error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@afrittoli pipelineSpec is being validated here for many different checks/conditions in addition to invalidGraph. I think we do need to run validation here, I dont see validation happening else where, am I missing something? 🤔

Validation is done by the webhook when resources are submitted to etcd.
However it will not be performed for resources that are not stored in etcd like tasks or pipelines coming from an external source (git/registry). I'm not sure about embedded specs - I'd expect to go through the same validation as normal resources.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could certainly move the check on invalid graph to ResolvePipelineRun.

That's the way I implemented it originally, but because we validate the spec here, we don't really catch the error in ResolvePipelineRun. The only way would be to move the whole validation of the spec into ResolvePipelineRun, which is something we could do eventually, but I didn't want to do it here as well.

Also, such reason could be generated where the error is getting created in pipeline_validation.go instead of calculating here. Its a great idea to set the reason to specific validation error in addition to returning error.

// When the validation error reports tasks or finally in the error path
// we can use a more specific validation error reason
reason = ReasonInvalidGraph
}
pr.Status.MarkFailed(reason,
"Pipeline %s/%s can't be Run; it has an invalid spec: %s",
pipelineMeta.Namespace, pipelineMeta.Name, err)
return controller.NewPermanentError(err)
Expand Down Expand Up @@ -372,7 +355,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
// pipelineState also holds a taskRun for each pipeline task after the taskRun is created
// pipelineState is instantiated and updated on every reconcile cycle
pipelineState, err := resources.ResolvePipelineRun(ctx,
*pr,
*pipelineSpec, *pr,
func(name string) (v1beta1.TaskInterface, error) {
return c.taskLister.Tasks(pr.Namespace).Get(name)
},
Expand All @@ -385,7 +368,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
func(name string) (*v1alpha1.Condition, error) {
return c.conditionLister.Conditions(pr.Namespace).Get(name)
},
append(pipelineSpec.Tasks, pipelineSpec.Finally...), providedResources,
providedResources,
)

if err != nil {
Expand Down Expand Up @@ -413,7 +396,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return nil
}

for _, rprt := range pipelineState {
for _, rprt := range pipelineState.State() {
err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check ValidateResolvedTaskResources can be moved to ResolvePipelineRun().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it sounds like something we good do. Perhaps in different PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup definitely, not everything has to be part of single PR

if err != nil {
logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err)
Expand Down Expand Up @@ -452,11 +435,11 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
return controller.NewPermanentError(err)
}

if err := c.runNextSchedulableTask(ctx, pr, d, dfinally, pipelineState, as); err != nil {
if err := c.runNextSchedulableTask(ctx, pr, *pipelineState, as); err != nil {
return err
}

after := resources.GetPipelineConditionStatus(pr, pipelineState, logger, d, dfinally)
after := resources.GetPipelineConditionStatus(*pr, *pipelineState, logger)
switch after.Status {
case corev1.ConditionTrue:
pr.Status.MarkSucceeded(after.Reason, after.Message)
Expand All @@ -467,39 +450,36 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState.State())
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}

// runNextSchedulableTask gets the next schedulable Tasks from the dag based on the current
// pipeline run state, and starts them
// after all DAG tasks are done, it's responsible for scheduling final tasks and start executing them
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, d *dag.Graph, dfinally *dag.Graph, pipelineState resources.PipelineRunState, as artifacts.ArtifactStorageInterface) error {

// runNextSchedulableTask gets the next schedulable Tasks based the pipeline run state and starts them
func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.PipelineRun, pipelineState resources.ResolvedPipelineRun, as artifacts.ArtifactStorageInterface) error {
logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)

var nextRprts []*resources.ResolvedPipelineRunTask
var (
nextRprts []*resources.ResolvedPipelineRunTask
err error
)

// when pipeline run is stopping, do not schedule any new task and only
// wait for all running tasks to complete and report their status
if !pipelineState.IsStopping(d) {
// candidateTasks is initialized to DAG root nodes to start pipeline execution
// candidateTasks is derived based on successfully finished tasks and/or skipped tasks
candidateTasks, err := dag.GetSchedulable(d, pipelineState.SuccessfulOrSkippedDAGTasks(d)...)
if !pipelineState.IsStopping() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check can be moved in the GetNextTasks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this check is already redundant now, but I need to try it out, and I was planning to do so in a next PR, along with giving a single NextTasks method for the pipeline controller to use.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thanks 👏

// nextRprts holds a list of pipeline tasks which should be executed next
nextRprts, err = pipelineState.GetNextTasks()
if err != nil {
logger.Errorf("Error getting potential next tasks for valid pipelinerun %s: %v", pr.Name, err)
return controller.NewPermanentError(err)
}
// nextRprts holds a list of pipeline tasks which should be executed next
nextRprts = pipelineState.GetNextTasks(candidateTasks)
}

// GetFinalTasks only returns tasks when a DAG is complete
nextRprts = append(nextRprts, pipelineState.GetFinalTasks(d, dfinally)...)
nextRprts = append(nextRprts, pipelineState.GetFinalTasks()...)

resolvedResultRefs, err := resources.ResolveResultRefs(pipelineState, nextRprts)
resolvedResultRefs, err := resources.ResolveResultRefs(pipelineState.State(), nextRprts)
if err != nil {
logger.Infof("Failed to resolve all task params for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonFailedValidation, err.Error())
Expand Down
16 changes: 9 additions & 7 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ func TestReconcile_InvalidPipelineRuns(t *testing.T) {
))),
tb.PipelineRun("pipeline-invalid-final-graph", tb.PipelineRunNamespace("foo"), tb.PipelineRunSpec("", tb.PipelineRunPipelineSpec(
tb.PipelineTask("dag-task-1", "taskName"),
tb.FinalPipelineTask("final-task-1", "taskName"),
tb.FinalPipelineTask("final-task-1", "taskName")))),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error does not trigger an invalid dag when going through validation, it fails first on validatePipelineTaskName

tb.FinalPipelineTask("final-task-1", "taskName", tb.RunAfter("dag-task-2")),
))),
}

d := test.Data{
Expand Down Expand Up @@ -1741,13 +1741,15 @@ func TestReconcileWithConditionChecks(t *testing.T) {
expectedConditionChecks[index] = makeExpectedTr(condition.Name, ccNames[condition.Name], condition.Labels, condition.Annotations)
}

// Check that the expected TaskRun was created
condCheck0 := clients.Pipeline.Actions()[1].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun)
condCheck1 := clients.Pipeline.Actions()[2].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun)
if condCheck0 == nil || condCheck1 == nil {
t.Errorf("Expected two ConditionCheck TaskRuns to be created, but it wasn't.")
// Check that the expected TaskRun were created
actions := clients.Pipeline.Actions()
if !actions[1].Matches("create", "taskruns") || !actions[2].Matches("create", "taskruns") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing a cast without checking first leads to an un-managed failure in the tests when the test fails.
My code changes initially made this test failing and the error was not very informative.

t.Fatalf("Expected two ConditionCheck TaskRuns to be created, got instead %d actions: %#v", len(actions), actions)
}

condCheck0 := actions[1].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun)
condCheck1 := actions[2].(ktesting.CreateAction).GetObject().(*v1beta1.TaskRun)

actual := []*v1beta1.TaskRun{condCheck0, condCheck1}
if d := cmp.Diff(actual, expectedConditionChecks); d != "" {
t.Errorf("expected to see 2 ConditionCheck TaskRuns created. Diff %s", diff.PrintWantGot(d))
Expand Down
Loading