Skip to content

Commit

Permalink
Create Run resources from PipelineTask references
Browse files Browse the repository at this point in the history
  • Loading branch information
imjasonh committed Aug 19, 2020
1 parent ce7591a commit 69808bc
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 133 deletions.
6 changes: 3 additions & 3 deletions config/200-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ rules:
# Controller needs cluster access to all of the CRDs that it is responsible for
# managing.
- apiGroups: ["tekton.dev"]
resources: ["tasks", "clustertasks", "taskruns", "pipelines", "pipelineruns", "pipelineresources", "conditions"]
resources: ["tasks", "clustertasks", "taskruns", "pipelines", "pipelineruns", "pipelineresources", "conditions", "runs"]
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
- apiGroups: ["tekton.dev"]
resources: ["taskruns/finalizers", "pipelineruns/finalizers"]
resources: ["taskruns/finalizers", "pipelineruns/finalizers", "runs/finalizers"]
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
- apiGroups: ["tekton.dev"]
resources: ["tasks/status", "clustertasks/status", "taskruns/status", "pipelines/status", "pipelineruns/status", "pipelineresources/status"]
resources: ["tasks/status", "clustertasks/status", "taskruns/status", "pipelines/status", "pipelineruns/status", "pipelineresources/status", "runs/status"]
verbs: ["get", "list", "create", "update", "delete", "patch", "watch"]
---
kind: ClusterRole
Expand Down
18 changes: 18 additions & 0 deletions examples/v1beta1/pipelineruns/pipelinerun-custom-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
name: sample-pipeline-custom-task
spec:
tasks:
- name: example-custom-task
taskRef:
apiVersion: example.dev/v0
kind: Example
---
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: sample-pipeline-run-custom-task
spec:
pipelineRef:
name: sample-pipeline-custom-task
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ module github.com/tektoncd/pipeline
go 1.13

require (
cloud.google.com/go/storage v1.8.0
contrib.go.opencensus.io/exporter/stackdriver v0.13.1 // indirect
github.com/GoogleCloudPlatform/cloud-builders/gcs-fetcher v0.0.0-20191203181535-308b93ad1f39
github.com/cloudevents/sdk-go/v2 v2.1.0
github.com/docker/cli v0.0.0-20200210162036-a4bedce16568 // indirect
github.com/ghodss/yaml v1.0.0
github.com/google/go-cmp v0.4.1
github.com/google/go-containerregistry v0.1.1
github.com/google/ko v0.5.2 // indirect
github.com/google/uuid v1.1.1
github.com/grpc-ecosystem/grpc-gateway v1.12.2 // indirect
github.com/hashicorp/go-multierror v1.1.0
Expand All @@ -24,6 +25,7 @@ require (
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/text v0.3.3 // indirect
gomodules.xyz/jsonpatch/v2 v2.1.0
google.golang.org/api v0.25.0
k8s.io/api v0.17.6
k8s.io/apimachinery v0.17.6
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
Expand Down
32 changes: 32 additions & 0 deletions go.sum

Large diffs are not rendered by default.

28 changes: 21 additions & 7 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,23 @@ func validatePipelineTasks(ctx context.Context, tasks []PipelineTask, finalTasks
taskNames := sets.NewString()
var err *apis.FieldError
for i, t := range tasks {
if err = validatePipelineTaskName(ctx, "spec.tasks", i, t, taskNames); err != nil {
if err = validatePipelineTask(ctx, "spec.tasks", i, t, taskNames); err != nil {
return err
}
}
for i, t := range finalTasks {
if err = validatePipelineTaskName(ctx, "spec.finally", i, t, taskNames); err != nil {
if err = validatePipelineTask(ctx, "spec.finally", i, t, taskNames); err != nil {
return err
}
}
return nil
}

func validatePipelineTaskName(ctx context.Context, prefix string, i int, t PipelineTask, taskNames sets.String) *apis.FieldError {
func validatePipelineTask(ctx context.Context, prefix string, i int, t PipelineTask, taskNames sets.String) *apis.FieldError {
hasTaskRef := t.TaskRef != nil
hasTaskSpec := t.TaskSpec != nil
isCustomTask := hasTaskRef && t.TaskRef.APIVersion != ""

if errs := validation.IsDNS1123Label(t.Name); len(errs) > 0 {
return &apis.FieldError{
Message: fmt.Sprintf("invalid value %q", t.Name),
Expand All @@ -244,20 +248,22 @@ func validatePipelineTaskName(ctx context.Context, prefix string, i int, t Pipel
}
}
// can't have both taskRef and taskSpec at the same time
if (t.TaskRef != nil && t.TaskRef.Name != "") && t.TaskSpec != nil {
if hasTaskRef && hasTaskSpec {
return apis.ErrMultipleOneOf(fmt.Sprintf(prefix+"[%d].taskRef", i), fmt.Sprintf(prefix+"[%d].taskSpec", i))
}
// Check that one of TaskRef and TaskSpec is present
if (t.TaskRef == nil || (t.TaskRef != nil && t.TaskRef.Name == "")) && t.TaskSpec == nil {
if !hasTaskRef && !hasTaskSpec {
return apis.ErrMissingOneOf(fmt.Sprintf(prefix+"[%d].taskRef", i), fmt.Sprintf(prefix+"[%d].taskSpec", i))
}
// Validate TaskSpec if it's present
if t.TaskSpec != nil {
if hasTaskSpec {
if err := t.TaskSpec.Validate(ctx); err != nil {
return err
}
}
if t.TaskRef != nil && t.TaskRef.Name != "" {

// Custom Task refs are allowed to have no name.
if hasTaskRef && t.TaskRef.Name != "" {
// Task names are appended to the container name, which must exist and
// must be a valid k8s name
if errSlice := validation.IsQualifiedName(t.Name); len(errSlice) != 0 {
Expand All @@ -272,6 +278,14 @@ func validatePipelineTaskName(ctx context.Context, prefix string, i int, t Pipel
}
taskNames[t.Name] = struct{}{}
}

if hasTaskRef && !isCustomTask && t.TaskRef.Name == "" {
return apis.ErrInvalidValue(t.TaskRef, "taskRef must specify name")
}
if isCustomTask && t.TaskRef.Kind == "" {
return apis.ErrInvalidValue(t.TaskRef, "custom task ref must specify apiVersion and kind")
}

return nil
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/apis/pipeline/v1beta1/pipeline_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ func TestPipeline_Validate_Success(t *testing.T) {
Tasks: []PipelineTask{{Name: "foo", TaskRef: &TaskRef{Name: "foo-task"}}},
},
},
}, {
name: "pipelinetask custom task references",
p: &Pipeline{
Spec: PipelineSpec{
Tasks: []PipelineTask{{Name: "foo", TaskRef: &TaskRef{APIVersion: "example.dev/v0", Kind: "Example", Name: ""}}},
},
},
}, {
name: "valid pipeline with params, resources, workspaces, task results, and pipeline results",
p: &Pipeline{
Expand Down Expand Up @@ -129,6 +136,27 @@ func TestPipeline_Validate_Failure(t *testing.T) {
p: &Pipeline{
ObjectMeta: metav1.ObjectMeta{Name: "pipeline"},
},
}, {
name: "pipelinetask without name",
p: &Pipeline{
Spec: PipelineSpec{
Tasks: []PipelineTask{{Name: "", TaskRef: &TaskRef{Name: "valid-task"}}},
},
},
}, {
name: "pipelinetask taskRef without name",
p: &Pipeline{
Spec: PipelineSpec{
Tasks: []PipelineTask{{Name: "foo", TaskRef: &TaskRef{Name: ""}}},
},
},
}, {
name: "pipelinetask custom task taskRef without name",
p: &Pipeline{
Spec: PipelineSpec{
Tasks: []PipelineTask{{Name: "foo", TaskRef: &TaskRef{APIVersion: "example.dev/v0", Kind: "", Name: ""}}},
},
},
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
24 changes: 17 additions & 7 deletions pkg/apis/pipeline/v1beta1/pipelinerun_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ import (
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
)

var (
groupVersionKind = schema.GroupVersionKind{
Group: SchemeGroupVersion.Group,
Version: SchemeGroupVersion.Version,
Kind: pipeline.PipelineRunControllerName,
}
)
var groupVersionKind = schema.GroupVersionKind{
Group: SchemeGroupVersion.Group,
Version: SchemeGroupVersion.Version,
Kind: pipeline.PipelineRunControllerName,
}

// +genclient
// +genreconciler:krshapedlogic=false
Expand Down Expand Up @@ -325,6 +323,10 @@ type PipelineRunStatusFields struct {
// +optional
TaskRuns map[string]*PipelineRunTaskRunStatus `json:"taskRuns,omitempty"`

// map of PipelineRunRunStatus with the run name as the key
// +optional
Runs map[string]*PipelineRunRunStatus `json:"runs,omitempty"`

// PipelineResults are the list of results written out by the pipeline task's containers
// +optional
PipelineResults []PipelineRunResult `json:"pipelineResults,omitempty"`
Expand Down Expand Up @@ -354,6 +356,14 @@ type PipelineRunTaskRunStatus struct {
ConditionChecks map[string]*PipelineRunConditionCheckStatus `json:"conditionChecks,omitempty"`
}

// PipelineRunRunStatus contains the name of the PipelineTask for this Run and the Run's Status
type PipelineRunRunStatus struct {
// PipelineTaskName is the name of the PipelineTask.
PipelineTaskName string `json:"pipelineTaskName,omitempty"`

// TODO(jasonhall): Add v1alpha1.RunStatus here, without introducing an import cycle.
}

// PipelineRunConditionCheckStatus returns the condition check status
type PipelineRunConditionCheckStatus struct {
// ConditionName is the name of the Condition
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
pipelineclient "github.com/tektoncd/pipeline/pkg/client/injection/client"
conditioninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/condition"
runinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1alpha1/run"
clustertaskinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/clustertask"
pipelineinformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipeline"
pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun"
Expand All @@ -49,6 +50,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
kubeclientset := kubeclient.Get(ctx)
pipelineclientset := pipelineclient.Get(ctx)
taskRunInformer := taskruninformer.Get(ctx)
runInformer := runinformer.Get(ctx)
taskInformer := taskinformer.Get(ctx)
clusterTaskInformer := clustertaskinformer.Get(ctx)
pipelineRunInformer := pipelineruninformer.Get(ctx)
Expand All @@ -70,6 +72,7 @@ func NewController(namespace string, images pipeline.Images) func(context.Contex
taskLister: taskInformer.Lister(),
clusterTaskLister: clusterTaskInformer.Lister(),
taskRunLister: taskRunInformer.Lister(),
runLister: runInformer.Lister(),
resourceLister: resourceInformer.Lister(),
conditionLister: conditionInformer.Lister(),
timeoutHandler: timeoutHandler,
Expand Down
88 changes: 77 additions & 11 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ type Reconciler struct {
pipelineRunLister listers.PipelineRunLister
pipelineLister listers.PipelineLister
taskRunLister listers.TaskRunLister
runLister listersv1alpha1.RunLister
taskLister listers.TaskLister
clusterTaskLister listers.ClusterTaskLister
resourceLister resourcelisters.PipelineResourceLister
Expand Down Expand Up @@ -398,6 +399,9 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
func(name string) (*v1beta1.TaskRun, error) {
return c.taskRunLister.TaskRuns(pr.Namespace).Get(name)
},
func(name string) (*v1alpha1.Run, error) {
return c.runLister.Runs(pr.Namespace).Get(name)
},
func(name string) (v1beta1.TaskInterface, error) {
return c.clusterTaskLister.Get(name)
},
Expand Down Expand Up @@ -427,11 +431,12 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
}

for _, rprt := range pipelineState {
err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources)
if err != nil {
logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonFailedValidation, err.Error())
return controller.NewPermanentError(err)
if !rprt.IsCustomTask() {
if err := taskrun.ValidateResolvedTaskResources(rprt.PipelineTask.Params, rprt.ResolvedTaskResources); err != nil {
logger.Errorf("Failed to validate pipelinerun %q with error %v", pr.Name, err)
pr.Status.MarkFailed(ReasonFailedValidation, err.Error())
return controller.NewPermanentError(err)
}
}
}

Expand Down Expand Up @@ -481,6 +486,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1beta1.PipelineRun) err
// Read the condition the way it was set by the Mark* helpers
after = pr.Status.GetCondition(apis.ConditionSucceeded)
pr.Status.TaskRuns = getTaskRunsStatus(pr, pipelineState)
pr.Status.Runs = getRunsStatus(pr, pipelineState)
logger.Infof("PipelineRun %s status is being set to %s", pr.Name, after)
return nil
}
Expand Down Expand Up @@ -525,10 +531,18 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1beta1.Pip
continue
}
if rprt.ResolvedConditionChecks == nil || rprt.ResolvedConditionChecks.IsSuccess() {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr))
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
if !rprt.IsCustomTask() {
rprt.TaskRun, err = c.createTaskRun(ctx, rprt, pr, as.StorageBasePath(pr))
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "TaskRunCreationFailed", "Failed to create TaskRun %q: %v", rprt.TaskRunName, err)
return fmt.Errorf("error creating TaskRun called %s for PipelineTask %s from PipelineRun %s: %w", rprt.TaskRunName, rprt.PipelineTask.Name, pr.Name, err)
}
} else {
rprt.Run, err = c.createRun(ctx, rprt, pr)
if err != nil {
recorder.Eventf(pr, corev1.EventTypeWarning, "RunCreationFailed", "Failed to create Run %q: %v", rprt.RunName, err)
return fmt.Errorf("error creating Run called %s for PipelineTask %s from PipelineRun %s: %w", rprt.RunName, rprt.PipelineTask.Name, pr.Name, err)
}
}
} else if !rprt.ResolvedConditionChecks.HasStarted() {
for _, rcc := range rprt.ResolvedConditionChecks {
Expand Down Expand Up @@ -564,9 +578,42 @@ func getPipelineRunResults(pipelineSpec *v1beta1.PipelineSpec, resolvedResultRef
return results
}

func getRunsStatus(pr *v1beta1.PipelineRun, state []*resources.ResolvedPipelineRunTask) map[string]*v1beta1.PipelineRunRunStatus {
status := map[string]*v1beta1.PipelineRunRunStatus{}
for _, rprt := range state {
if !rprt.IsCustomTask() {
continue
}
if rprt.Run == nil && rprt.ResolvedConditionChecks == nil {
continue
}

var prrs *v1beta1.PipelineRunRunStatus
if rprt.Run != nil {
prrs = pr.Status.Runs[rprt.RunName]
}

if prrs == nil {
prrs = &v1beta1.PipelineRunRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
}
}

if rprt.Run != nil {
// TODO(jasonhall): Support RunStatus.
//prrs.Status = &rprt.Run.Status
}
status[rprt.RunName] = prrs
}
return status
}

func getTaskRunsStatus(pr *v1beta1.PipelineRun, state []*resources.ResolvedPipelineRunTask) map[string]*v1beta1.PipelineRunTaskRunStatus {
status := make(map[string]*v1beta1.PipelineRunTaskRunStatus)
status := map[string]*v1beta1.PipelineRunTaskRunStatus{}
for _, rprt := range state {
if rprt.IsCustomTask() {
continue
}
if rprt.TaskRun == nil && rprt.ResolvedConditionChecks == nil {
continue
}
Expand All @@ -575,6 +622,7 @@ func getTaskRunsStatus(pr *v1beta1.PipelineRun, state []*resources.ResolvedPipel
if rprt.TaskRun != nil {
prtrs = pr.Status.TaskRuns[rprt.TaskRun.Name]
}

if prtrs == nil {
prtrs = &v1beta1.PipelineRunTaskRunStatus{
PipelineTaskName: rprt.PipelineTask.Name,
Expand Down Expand Up @@ -659,7 +707,8 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved
ServiceAccountName: serviceAccountName,
Timeout: getTaskRunTimeout(pr, rprt),
PodTemplate: podTemplate,
}}
},
}

if rprt.ResolvedTaskResources.TaskName != "" {
tr.Spec.TaskRef = &v1beta1.TaskRef{
Expand Down Expand Up @@ -696,6 +745,23 @@ func (c *Reconciler) createTaskRun(ctx context.Context, rprt *resources.Resolved
return c.PipelineClientSet.TektonV1beta1().TaskRuns(pr.Namespace).Create(tr)
}

func (c *Reconciler) createRun(ctx context.Context, rprt *resources.ResolvedPipelineRunTask, pr *v1beta1.PipelineRun) (*v1alpha1.Run, error) {
r := &v1alpha1.Run{
ObjectMeta: metav1.ObjectMeta{
Name: rprt.RunName,
Namespace: pr.Namespace,
OwnerReferences: []metav1.OwnerReference{pr.GetOwnerReference()},
Labels: getTaskrunLabels(pr, rprt.PipelineTask.Name),
Annotations: getTaskrunAnnotations(pr),
},
Spec: v1alpha1.RunSpec{
Ref: rprt.PipelineTask.TaskRef,
Params: rprt.PipelineTask.Params,
},
}
return c.PipelineClientSet.TektonV1alpha1().Runs(pr.Namespace).Create(r)
}

// taskWorkspaceByWorkspaceVolumeSource is returning the WorkspaceBinding with the TaskRun specified name.
// If the volume source is a volumeClaimTemplate, the template is applied and passed to TaskRun as a persistentVolumeClaim
func taskWorkspaceByWorkspaceVolumeSource(wb v1beta1.WorkspaceBinding, taskWorkspaceName string, pipelineTaskSubPath string, owner metav1.OwnerReference) v1beta1.WorkspaceBinding {
Expand Down
Loading

0 comments on commit 69808bc

Please sign in to comment.