Skip to content

Commit

Permalink
pipelinerun resolution checking if taskrun exists
Browse files Browse the repository at this point in the history
Pipelinerun resolution now checking if taskrun already created so it
doesn't generate a new random suffix to it

updated e2e tests to work with variable taskrun names

Signed-off-by: Nader Ziada <[email protected]>
  • Loading branch information
nader-ziada authored and knative-prow-robot committed Feb 14, 2019
1 parent 7abd245 commit 0502dbe
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 22 deletions.
10 changes: 5 additions & 5 deletions pkg/reconciler/v1alpha1/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
p = resources.ApplyParameters(p, pr)

pipelineState, err := resources.ResolvePipelineRun(
pr.Name,
*pr,
func(name string) (v1alpha1.TaskInterface, error) {
return c.taskLister.Tasks(pr.Namespace).Get(name)
},
Expand Down Expand Up @@ -293,6 +293,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
return nil
}
}

err = resources.ResolveTaskRuns(c.taskRunLister.TaskRuns(pr.Namespace).Get, pipelineState)
if err != nil {
return fmt.Errorf("error getting TaskRuns for Pipeline %s: %s", p.Name, err)
Expand All @@ -303,7 +304,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
return cancelPipelineRun(pr, pipelineState, c.PipelineClientSet)
}

serviceAccount := pr.Spec.ServiceAccount
rprt := resources.GetNextTask(pr.Name, pipelineState, c.Logger)

var as artifacts.ArtifactStorageInterface
Expand All @@ -314,7 +314,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

if rprt != nil {
c.Logger.Infof("Creating a new TaskRun object %s", rprt.TaskRunName)
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, serviceAccount, as.StorageBasePath(pr))
rprt.TaskRun, err = c.createTaskRun(c.Logger, rprt, pr, as.StorageBasePath(pr))
if err != nil {
c.Recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %s", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
Expand Down Expand Up @@ -342,7 +342,7 @@ func updateTaskRunsStatus(pr *v1alpha1.PipelineRun, pipelineState []*resources.R
}
}

func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, sa, storageBasePath string) (*v1alpha1.TaskRun, error) {
func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.ResolvedPipelineRunTask, pr *v1alpha1.PipelineRun, storageBasePath string) (*v1alpha1.TaskRun, error) {
var taskRunTimeout = &metav1.Duration{Duration: 0 * time.Second}
if pr.Spec.Timeout != nil {
pTimeoutTime := pr.Status.StartTime.Add(pr.Spec.Timeout.Duration)
Expand Down Expand Up @@ -381,7 +381,7 @@ func (c *Reconciler) createTaskRun(logger *zap.SugaredLogger, rprt *resources.Re
Inputs: v1alpha1.TaskRunInputs{
Params: rprt.PipelineTask.Params,
},
ServiceAccount: sa,
ServiceAccount: pr.Spec.ServiceAccount,
Timeout: taskRunTimeout,
NodeSelector: pr.Spec.NodeSelector,
Affinity: pr.Spec.Affinity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package resources

import (
"fmt"
"strings"
"time"

"github.com/knative/build-pipeline/pkg/apis/pipeline/v1alpha1"
Expand Down Expand Up @@ -163,14 +164,22 @@ func (e *ResourceNotFoundError) Error() string {
// will return an error, otherwise it returns a list of all of the Tasks retrieved.
// It will retrieve the Resources needed for the TaskRun as well using getResource and the mapping
// of providedResources.
func ResolvePipelineRun(prName string, getTask resources.GetTask, getClusterTask resources.GetClusterTask, getResource resources.GetResource, tasks []v1alpha1.PipelineTask, providedResources map[string]v1alpha1.PipelineResourceRef) ([]*ResolvedPipelineRunTask, error) {
func ResolvePipelineRun(
pipelineRun v1alpha1.PipelineRun,
getTask resources.GetTask,
getClusterTask resources.GetClusterTask,
getResource resources.GetResource,
tasks []v1alpha1.PipelineTask,
providedResources map[string]v1alpha1.PipelineResourceRef,
) ([]*ResolvedPipelineRunTask, error) {

state := []*ResolvedPipelineRunTask{}
for i := range tasks {
pt := tasks[i]

rprt := ResolvedPipelineRunTask{
PipelineTask: &pt,
TaskRunName: getTaskRunName(prName, &pt),
TaskRunName: getTaskRunName(pipelineRun.Status.TaskRuns, pipelineRun.Name, &pt),
}

// Find the Task that this task in the Pipeline this PipelineTask is using
Expand Down Expand Up @@ -226,8 +235,15 @@ func ResolveTaskRuns(getTaskRun GetTaskRun, state []*ResolvedPipelineRunTask) er
}

// getTaskRunName should return a uniquie name for a `TaskRun`.
func getTaskRunName(prName string, pt *v1alpha1.PipelineTask) string {
func getTaskRunName(taskRunsStatus map[string]v1alpha1.TaskRunStatus, prName string, pt *v1alpha1.PipelineTask) string {
base := fmt.Sprintf("%s-%s", prName, pt.Name)

for k := range taskRunsStatus {
if strings.HasPrefix(k, base) {
return k
}
}

return names.SimpleNameGenerator.GenerateName(base)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,18 @@ func TestResolvePipelineRun(t *testing.T) {
Type: v1alpha1.PipelineResourceTypeGit,
},
}

pr := v1alpha1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun",
},
}
// The Task "task" doesn't actually take any inputs or outputs, but validating
// that is not done as part of Run resolution
getTask := func(name string) (v1alpha1.TaskInterface, error) { return task, nil }
getClusterTask := func(name string) (v1alpha1.TaskInterface, error) { return nil, nil }
getResource := func(name string) (*v1alpha1.PipelineResource, error) { return r, nil }

pipelineState, err := ResolvePipelineRun("pipelinerun", getTask, getClusterTask, getResource, p.Spec.Tasks, providedResources)
pipelineState, err := ResolvePipelineRun(pr, getTask, getClusterTask, getResource, p.Spec.Tasks, providedResources)
if err != nil {
t.Fatalf("Error getting tasks for fake pipeline %s: %s", p.ObjectMeta.Name, err)
}
Expand Down Expand Up @@ -415,8 +419,12 @@ func TestResolvePipelineRun_PipelineTaskHasNoResources(t *testing.T) {
getTask := func(name string) (v1alpha1.TaskInterface, error) { return task, nil }
getClusterTask := func(name string) (v1alpha1.TaskInterface, error) { return clustertask, nil }
getResource := func(name string) (*v1alpha1.PipelineResource, error) { return nil, fmt.Errorf("should not get called") }

pipelineState, err := ResolvePipelineRun("pipelinerun", getTask, getClusterTask, getResource, pts, providedResources)
pr := v1alpha1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun",
},
}
pipelineState, err := ResolvePipelineRun(pr, getTask, getClusterTask, getResource, pts, providedResources)
if err != nil {
t.Fatalf("Did not expect error when resolving PipelineRun without Resources: %v", err)
}
Expand Down Expand Up @@ -452,8 +460,12 @@ func TestResolvePipelineRun_TaskDoesntExist(t *testing.T) {
return nil, errors.NewNotFound(v1alpha1.Resource("clustertask"), name)
}
getResource := func(name string) (*v1alpha1.PipelineResource, error) { return nil, fmt.Errorf("should not get called") }

_, err := ResolvePipelineRun("pipelinerun", getTask, getClusterTask, getResource, pts, providedResources)
pr := v1alpha1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun",
},
}
_, err := ResolvePipelineRun(pr, getTask, getClusterTask, getResource, pts, providedResources)
switch err := err.(type) {
case nil:
t.Fatalf("Expected error getting non-existent Tasks for Pipeline %s but got none", p.Name)
Expand Down Expand Up @@ -494,7 +506,12 @@ func TestResolvePipelineRun_ResourceBindingsDontExist(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := ResolvePipelineRun("pipelinerun", getTask, getClusterTask, getResource, tt.p.Spec.Tasks, providedResources)
pr := v1alpha1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun",
},
}
_, err := ResolvePipelineRun(pr, getTask, getClusterTask, getResource, tt.p.Spec.Tasks, providedResources)
if err == nil {
t.Fatalf("Expected error when bindings are in incorrect state for Pipeline %s but got none", p.Name)
}
Expand Down Expand Up @@ -538,7 +555,12 @@ func TestResolvePipelineRun_ResourcesDontExist(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := ResolvePipelineRun("pipelinerun", getTask, getClusterTask, getResource, tt.p.Spec.Tasks, providedResources)
pr := v1alpha1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun",
},
}
_, err := ResolvePipelineRun(pr, getTask, getClusterTask, getResource, tt.p.Spec.Tasks, providedResources)
switch err := err.(type) {
case nil:
t.Fatalf("Expected error getting non-existent Resources for Pipeline %s but got none", p.Name)
Expand Down Expand Up @@ -859,3 +881,67 @@ func TestValidateFrom_Invalid(t *testing.T) {
})
}
}

func TestResolvePipelineRun_withExistingTaskRuns(t *testing.T) {
names.TestingSeed()

p := tb.Pipeline("pipelines", "namespace", tb.PipelineSpec(
tb.PipelineDeclaredResource("git-resource", "git"),
tb.PipelineTask("mytask1", "task",
tb.PipelineTaskInputResource("input1", "git-resource"),
),
))
providedResources := map[string]v1alpha1.PipelineResourceRef{
"git-resource": {
Name: "someresource",
},
}

r := &v1alpha1.PipelineResource{
ObjectMeta: metav1.ObjectMeta{
Name: "someresource",
},
Spec: v1alpha1.PipelineResourceSpec{
Type: v1alpha1.PipelineResourceTypeGit,
},
}
taskrunStatus := map[string]v1alpha1.TaskRunStatus{}
taskrunStatus["pipelinerun-mytask1-9l9zj"] = v1alpha1.TaskRunStatus{}

pr := v1alpha1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelinerun",
},
Status: v1alpha1.PipelineRunStatus{
TaskRuns: taskrunStatus,
},
}

// The Task "task" doesn't actually take any inputs or outputs, but validating
// that is not done as part of Run resolution
getTask := func(name string) (v1alpha1.TaskInterface, error) { return task, nil }
getClusterTask := func(name string) (v1alpha1.TaskInterface, error) { return nil, nil }
getResource := func(name string) (*v1alpha1.PipelineResource, error) { return r, nil }

pipelineState, err := ResolvePipelineRun(pr, getTask, getClusterTask, getResource, p.Spec.Tasks, providedResources)
if err != nil {
t.Fatalf("Error getting tasks for fake pipeline %s: %s", p.ObjectMeta.Name, err)
}
expectedState := []*ResolvedPipelineRunTask{{
PipelineTask: &p.Spec.Tasks[0],
TaskRunName: "pipelinerun-mytask1-9l9zj",
TaskRun: nil,
ResolvedTaskResources: &resources.ResolvedTaskResources{
TaskName: task.Name,
TaskSpec: &task.Spec,
Inputs: map[string]*v1alpha1.PipelineResource{
"input1": r,
},
Outputs: map[string]*v1alpha1.PipelineResource{},
},
}}

if d := cmp.Diff(pipelineState, expectedState, cmpopts.IgnoreUnexported(v1alpha1.TaskRunSpec{})); d != "" {
t.Fatalf("Expected to get current pipeline state %v, but actual differed: %s", expectedState, d)
}
}
65 changes: 65 additions & 0 deletions test/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ func TestTaskRunPipelineRunCancel(t *testing.T) {
t.Fatalf("Failed to create Task `banana`: %s", err)
}

logger.Infof("Creating Pipeline in namespace %s", namespace)
pipeline := tb.Pipeline("tomatoes", namespace,
tb.PipelineSpec(tb.PipelineTask("foo", "banana")),
)

logger.Infof("Creating PipelineRun in namespace %s", namespace)
pipelineRun := tb.PipelineRun("pear", namespace, tb.PipelineRunSpec(pipeline.Name))
if _, err := c.PipelineClient.Create(pipeline); err != nil {
t.Fatalf("Failed to create Pipeline `%s`: %s", "tomatoes", err)
Expand All @@ -74,6 +77,38 @@ func TestTaskRunPipelineRunCancel(t *testing.T) {
t.Fatalf("Error waiting for PipelineRun %s to be running: %s", "pear", err)
}

taskrunList, err := c.TaskRunClient.List(metav1.ListOptions{LabelSelector: "pipeline.knative.dev/pipelineRun=pear"})
if err != nil {
t.Fatalf("Error listing TaskRuns for PipelineRun %s: %s", "pear", err)
}

logger.Infof("Waiting for TaskRuns from PipelineRun %s in namespace %s to be running", "pear", namespace)
errChan := make(chan error, len(taskrunList.Items))
defer close(errChan)

for _, taskrunItem := range taskrunList.Items {
go func() {
err := WaitForTaskRunState(c, taskrunItem.Name, func(tr *v1alpha1.TaskRun) (bool, error) {
c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if c != nil {
if c.Status == corev1.ConditionTrue || c.Status == corev1.ConditionFalse {
return true, fmt.Errorf("taskRun %s already finished!", taskrunItem.Name)
} else if c.Status == corev1.ConditionUnknown && (c.Reason == "Running" || c.Reason == "Pending") {
return true, nil
}
}
return false, nil
}, "TaskRunRunning")
errChan <- err
}()

}
for i := 1; i <= len(taskrunList.Items); i++ {
if <-errChan != nil {
t.Errorf("Error waiting for TaskRun %s to be running: %s", taskrunList.Items[i-1].Name, err)
}
}

pr, err := c.PipelineRunClient.Get("pear", metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get PipelineRun `%s`: %s", "pear", err)
Expand Down Expand Up @@ -101,4 +136,34 @@ func TestTaskRunPipelineRunCancel(t *testing.T) {
}, "PipelineRunCancelled"); err != nil {
t.Errorf("Error waiting for PipelineRun `pear` to finish: %s", err)
}

logger.Infof("Waiting for TaskRuns in PipelineRun %s in namespace %s to be cancelled", "pear", namespace)
errChan2 := make(chan error, len(taskrunList.Items))
defer close(errChan2)
for _, taskrunItem := range taskrunList.Items {
go func() {
err := WaitForTaskRunState(c, taskrunItem.Name, func(tr *v1alpha1.TaskRun) (bool, error) {
c := tr.Status.GetCondition(duckv1alpha1.ConditionSucceeded)
if c != nil {
if c.Status == corev1.ConditionFalse {
if c.Reason == "TaskRunCancelled" {
return true, nil
}
return true, fmt.Errorf("taskRun %s completed with the wrong reason: %s", taskrunItem.Name, c.Reason)
} else if c.Status == corev1.ConditionTrue {
return true, fmt.Errorf("taskRun %s completed successfully, should have been cancelled", taskrunItem.Name)
}
}
return false, nil
}, "TaskRunCancelled")
errChan2 <- err
}()

}
for i := 1; i <= len(taskrunList.Items); i++ {
if <-errChan2 != nil {
t.Errorf("Error waiting for TaskRun %s to be finish: %s", taskrunList.Items[i-1].Name, err)
}
}

}
3 changes: 2 additions & 1 deletion test/helm_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func getHelmDeployPipeline(namespace string) *v1alpha1.Pipeline {
tb.PipelineTask("helm-deploy", helmDeployTaskName,
tb.PipelineTaskInputResource("gitsource", "git-repo"),
tb.PipelineTaskParam("pathToHelmCharts", "/workspace/gitsource/test/gohelloworld/gohelloworld-chart"),
tb.PipelineTaskParam("chartname", "gohelloworld"),
tb.PipelineTaskParam("chartname", "${params.chartname}"),
tb.PipelineTaskParam("image", imageName),
),
))
Expand All @@ -208,6 +208,7 @@ func getHelmDeployPipeline(namespace string) *v1alpha1.Pipeline {
func getHelmDeployPipelineRun(namespace string) *v1alpha1.PipelineRun {
return tb.PipelineRun(helmDeployPipelineRunName, namespace, tb.PipelineRunSpec(
helmDeployPipelineName,
tb.PipelineRunParam("chartname", "gohelloworld"),
tb.PipelineRunResourceBinding("git-repo", tb.PipelineResourceBindingRef(sourceResourceName)),
))
}
Expand Down
15 changes: 10 additions & 5 deletions test/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/knative/build-pipeline/pkg/apis/pipeline"
tb "github.com/knative/build-pipeline/test/builder"
"github.com/knative/build-pipeline/test/names"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
knativetest "github.com/knative/pkg/test"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -135,9 +134,6 @@ func TestPipelineRun(t *testing.T) {
knativetest.CleanupOnInterrupt(func() { tearDown(t, logger, c, namespace) }, logger)
defer tearDown(t, logger, c, namespace)

// to use fixed seed for testing name after the namespace is created
names.TestingSeed()

logger.Infof("Setting up test resources for %q test in namespace %s", td.name, namespace)
td.testSetup(t, c, namespace, i)

Expand All @@ -153,10 +149,19 @@ func TestPipelineRun(t *testing.T) {
}

logger.Infof("Making sure the expected TaskRuns %s were created", td.expectedTaskRuns)

actualTaskrunList, err := c.TaskRunClient.List(metav1.ListOptions{LabelSelector: fmt.Sprintf("pipeline.knative.dev/pipelineRun=%s", prName)})
if err != nil {
t.Fatalf("Error listing TaskRuns for PipelineRun %s: %s", prName, err)
}
expectedTaskRunNames := []string{}
for _, runName := range td.expectedTaskRuns {
taskRunName := strings.Join([]string{prName, runName}, "-")
// check the actual task name starting with prName+runName with a random suffix
for _, actualTaskRunItem := range actualTaskrunList.Items {
if strings.HasPrefix(actualTaskRunItem.Name, taskRunName) {
taskRunName = actualTaskRunItem.Name
}
}
expectedTaskRunNames = append(expectedTaskRunNames, taskRunName)
r, err := c.TaskRunClient.Get(taskRunName, metav1.GetOptions{})
if err != nil {
Expand Down
Loading

0 comments on commit 0502dbe

Please sign in to comment.