Skip to content

Commit

Permalink
TEP-0111 - Propagating workspaces
Browse files Browse the repository at this point in the history
This POC illustrates propagating workspaces defined at the `pipelinerun` stage and directly referred at the `spec`. There is no need to specify it in the `pipelinespec` followed by `tasks` and `taskspec`.
  • Loading branch information
chitrangpatel committed Jul 5, 2022
1 parent 380dbd0 commit 2000cb1
Show file tree
Hide file tree
Showing 11 changed files with 768 additions and 53 deletions.
515 changes: 515 additions & 0 deletions docs/pipelineruns.md

Large diffs are not rendered by default.

76 changes: 76 additions & 0 deletions examples/v1beta1/pipelineruns/alpha/propagating-workspaces.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: sensitive-recipe-storage
data:
brownies: |
1. Heat oven to 325 degrees F
2. Melt 1/2 cup butter w/ 1/2 cup cocoa, stirring smooth.
3. Remove from heat, allow to cool for a few minutes.
4. Transfer to bowl.
5. Whisk in 2 eggs, one at a time.
6. Stir in vanilla.
7. Separately combine 1 cup sugar, 1/4 cup flour, 1 cup chopped
walnuts and pinch of salt
8. Combine mixtures.
9. Bake in greased pan for 30 minutes. Watch carefully for
appropriate level of gooeyness.
---
apiVersion: v1
kind: Secret
metadata:
name: secret-password
type: Opaque
data:
password: aHVudGVyMg==
---
apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
generateName: recipe-time-
spec:
params:
- name: filename
value: recipe.txt
workspaces:
- name: shared-data
volumeClaimTemplate:
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 16Mi
volumeMode: Filesystem
- name: recipe-store
configMap:
name: sensitive-recipe-storage
items:
- key: brownies
path: recipe.txt
- name: password-vault
secret:
secretName: secret-password
pipelineSpec:
tasks:
- name: fetch-secure-data
taskSpec:
steps:
- name: fetch-and-write-secure
image: ubuntu
script: |
if [ "hunter2" = "$(cat $(workspaces.password-vault.path)/password)" ]; then
cp $(workspaces.recipe-store.path)/recipe.txt $(workspaces.shared-data.path)
echo "success!!!"
else
echo "wrong password!"
exit 1
fi
- name: print-the-recipe
runAfter:
- fetch-secure-data
taskSpec:
steps:
- name: print-secrets
image: ubuntu
script: cat $(workspaces.shared-data.path)/$(params.filename)
8 changes: 6 additions & 2 deletions pkg/apis/pipeline/v1beta1/pipeline_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strings"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/validate"
"github.com/tektoncd/pipeline/pkg/list"
"github.com/tektoncd/pipeline/pkg/reconciler/pipeline/dag"
Expand All @@ -45,6 +46,7 @@ func (p *Pipeline) Validate(ctx context.Context) *apis.FieldError {
// Validate checks that taskNames in the Pipeline are valid and that the graph
// of Tasks expressed in the Pipeline makes sense.
func (ps *PipelineSpec) Validate(ctx context.Context) (errs *apis.FieldError) {
cfg := config.FromContextOrDefaults(ctx)
if equality.Semantic.DeepEqual(ps, &PipelineSpec{}) {
errs = errs.Also(apis.ErrGeneric("expected at least one, got none", "description", "params", "resources", "tasks", "workspaces"))
}
Expand All @@ -66,8 +68,10 @@ func (ps *PipelineSpec) Validate(ctx context.Context) (errs *apis.FieldError) {
errs = errs.Also(validateExecutionStatusVariables(ps.Tasks, ps.Finally))
// Validate the pipeline's workspaces.
errs = errs.Also(validatePipelineWorkspacesDeclarations(ps.Workspaces))
errs = errs.Also(validatePipelineWorkspacesUsage(ps.Workspaces, ps.Tasks).ViaField("tasks"))
errs = errs.Also(validatePipelineWorkspacesUsage(ps.Workspaces, ps.Finally).ViaField("finally"))
if cfg.FeatureFlags.EnableAPIFields != config.AlphaAPIFields {
errs = errs.Also(validatePipelineWorkspacesUsage(ps.Workspaces, ps.Tasks).ViaField("tasks"))
errs = errs.Also(validatePipelineWorkspacesUsage(ps.Workspaces, ps.Finally).ViaField("finally"))
}
// Validate the pipeline's results
errs = errs.Also(validatePipelineResults(ps.Results))
errs = errs.Also(validateTasksAndFinallySection(ps))
Expand Down
57 changes: 49 additions & 8 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
tresources "github.com/tektoncd/pipeline/pkg/reconciler/taskrun/resources"
"github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim"
"github.com/tektoncd/pipeline/pkg/remote"
"github.com/tektoncd/pipeline/pkg/substitution"
"github.com/tektoncd/pipeline/pkg/workspace"
resolution "github.com/tektoncd/resolution/pkg/resource"
"go.uber.org/zap"
Expand Down Expand Up @@ -829,7 +830,7 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para

var pipelinePVCWorkspaceName string
var err error
tr.Spec.Workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(pr, rpt)
tr.Spec.Workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(ctx, pr, rpt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -897,7 +898,7 @@ func (c *Reconciler) createRun(ctx context.Context, runName string, params []v1b
}
var pipelinePVCWorkspaceName string
var err error
r.Spec.Workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(pr, rpt)
r.Spec.Workspaces, pipelinePVCWorkspaceName, err = getTaskrunWorkspaces(ctx, pr, rpt)
if err != nil {
return nil, err
}
Expand All @@ -912,16 +913,56 @@ func (c *Reconciler) createRun(ctx context.Context, runName string, params []v1b
return c.PipelineClientSet.TektonV1alpha1().Runs(pr.Namespace).Create(ctx, r, metav1.CreateOptions{})
}

func getTaskrunWorkspaces(pr *v1beta1.PipelineRun, rpt *resources.ResolvedPipelineTask) ([]v1beta1.WorkspaceBinding, string, error) {
func getTaskrunWorkspaces(ctx context.Context, pr *v1beta1.PipelineRun, rprt *resources.ResolvedPipelineTask) ([]v1beta1.WorkspaceBinding, string, error) {
var workspaces []v1beta1.WorkspaceBinding
var pipelinePVCWorkspaceName string
pipelineRunWorkspaces := make(map[string]v1beta1.WorkspaceBinding)
for _, binding := range pr.Spec.Workspaces {
pipelineRunWorkspaces[binding.Name] = binding
}
for _, ws := range rpt.PipelineTask.Workspaces {
taskWorkspaceName, pipelineTaskSubPath, pipelineWorkspaceName := ws.Name, ws.SubPath, ws.Workspace

if config.FromContextOrDefaults(ctx).FeatureFlags.EnableAPIFields == "alpha" {
// Propagate required workspaces from pipelineRun to the pipelineTasks
if rprt.PipelineTask.TaskSpec != nil {
workspacesUsedInSteps := []string{}
ts := rprt.PipelineTask.TaskSpec.TaskSpec
for _, step := range ts.Steps {
workspacesUsedInScript, _, errString := substitution.ExtractVariablesFromString(step.Script, "workspaces")
workspacesUsedInSteps = append(workspacesUsedInSteps, workspacesUsedInScript...)
if errString != "" {
return nil, "", fmt.Errorf("Error while extracting workspace from Script: %s", errString)
}
for _, arg := range step.Args {
workspacesUsedInArg, _, errString := substitution.ExtractVariablesFromString(arg, "workspaces")
if errString != "" {
return nil, "", fmt.Errorf("Error while extracting workspace from Script: %s", errString)
}
workspacesUsedInSteps = append(workspacesUsedInSteps, workspacesUsedInArg...)
}
}

ptw := []string{}
for _, ws := range rprt.PipelineTask.Workspaces {
ptw = append(ptw, ws.Name)
}

for _, wspace := range workspacesUsedInSteps {
skip := false
for _, w := range ptw {
if w == wspace {
skip = true
break
}
}
if !skip {
rprt.PipelineTask.Workspaces = append(rprt.PipelineTask.Workspaces, v1beta1.WorkspacePipelineTaskBinding{Name: wspace})
}
}
}
}

for _, ws := range rprt.PipelineTask.Workspaces {
taskWorkspaceName, pipelineTaskSubPath, pipelineWorkspaceName := ws.Name, ws.SubPath, ws.Workspace
pipelineWorkspace := pipelineWorkspaceName

if pipelineWorkspaceName == "" {
Expand All @@ -935,16 +976,16 @@ func getTaskrunWorkspaces(pr *v1beta1.PipelineRun, rpt *resources.ResolvedPipeli
workspaces = append(workspaces, taskWorkspaceByWorkspaceVolumeSource(b, taskWorkspaceName, pipelineTaskSubPath, *kmeta.NewControllerRef(pr)))
} else {
workspaceIsOptional := false
if rpt.ResolvedTaskResources != nil && rpt.ResolvedTaskResources.TaskSpec != nil {
for _, taskWorkspaceDeclaration := range rpt.ResolvedTaskResources.TaskSpec.Workspaces {
if rprt.ResolvedTaskResources != nil && rprt.ResolvedTaskResources.TaskSpec != nil {
for _, taskWorkspaceDeclaration := range rprt.ResolvedTaskResources.TaskSpec.Workspaces {
if taskWorkspaceDeclaration.Name == taskWorkspaceName && taskWorkspaceDeclaration.Optional {
workspaceIsOptional = true
break
}
}
}
if !workspaceIsOptional {
return nil, "", fmt.Errorf("expected workspace %q to be provided by pipelinerun for pipeline task %q", pipelineWorkspace, rpt.PipelineTask.Name)
return nil, "", fmt.Errorf("expected workspace %q to be provided by pipelinerun for pipeline task %q", pipelineWorkspace, rprt.PipelineTask.Name)
}
}

Expand Down
17 changes: 15 additions & 2 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7262,6 +7262,7 @@ func checkPipelineRunConditionStatusAndReason(t *testing.T, reconciledRun *v1bet
}

func TestGetTaskrunWorkspaces_Failure(t *testing.T) {
ctx := getContextBasedOnFeatureFlag("alpha")
tests := []struct {
name string
pr *v1beta1.PipelineRun
Expand Down Expand Up @@ -7309,7 +7310,7 @@ spec:
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _, err := getTaskrunWorkspaces(tt.pr, tt.rprt)
_, _, err := getTaskrunWorkspaces(ctx, tt.pr, tt.rprt)

if err == nil {
t.Errorf("Pipeline.getTaskrunWorkspaces() did not return error for invalid workspace")
Expand All @@ -7322,6 +7323,7 @@ spec:
}

func TestGetTaskrunWorkspaces_Success(t *testing.T) {
ctx := getContextBasedOnFeatureFlag("alpha")
tests := []struct {
name string
pr *v1beta1.PipelineRun
Expand Down Expand Up @@ -7365,7 +7367,7 @@ spec:
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _, err := getTaskrunWorkspaces(tt.pr, tt.rprt)
_, _, err := getTaskrunWorkspaces(ctx, tt.pr, tt.rprt)

if err != nil {
t.Errorf("Pipeline.getTaskrunWorkspaces() returned error for valid pipeline: %v", err)
Expand Down Expand Up @@ -9138,3 +9140,14 @@ spec:
func lessTaskResourceBindings(i, j v1beta1.TaskResourceBinding) bool {
return i.Name < j.Name
}

func getContextBasedOnFeatureFlag(featureFlag string) context.Context {
featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{
"enable-api-fields": featureFlag,
})
cfg := &config.Config{
FeatureFlags: featureFlags,
}

return config.ToContext(context.Background(), cfg)
}
51 changes: 38 additions & 13 deletions pkg/reconciler/taskrun/taskrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1beta1.TaskRun) (*v1beta1
return nil, nil, controller.NewPermanentError(err)
}

if err := workspace.ValidateBindings(taskSpec.Workspaces, tr.Spec.Workspaces); err != nil {
if err := workspace.ValidateBindings(ctx, taskSpec.Workspaces, tr.Spec.Workspaces); err != nil {
logger.Errorf("TaskRun %q workspaces are invalid: %v", tr.Name, err)
tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err)
return nil, nil, controller.NewPermanentError(err)
Expand Down Expand Up @@ -417,13 +417,20 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
defer c.durationAndCountMetrics(ctx, tr)
logger := logging.FromContext(ctx)
recorder := controller.GetEventRecorder(ctx)
var err error

// Get the randomized volume names assigned to workspace bindings
workspaceVolumes := workspace.CreateVolumes(tr.Spec.Workspaces)

ts := updateTaskSpecParamsContextsResults(tr, rtr)
ts, err := updateTaskSpecParamsContextsResults(ctx, tr, rtr, workspaceVolumes)
if err != nil {
logger.Errorf("Error updating task spec parameters, contexts, results and workspaces: %s", err)
return err
}
tr.Status.TaskSpec = ts

// Get the TaskRun's Pod if it should have one. Otherwise, create the Pod.
var pod *corev1.Pod
var err error

if tr.Status.PodName != "" {
pod, err = c.podLister.Pods(tr.Namespace).Get(tr.Status.PodName)
Expand Down Expand Up @@ -469,7 +476,7 @@ func (c *Reconciler) reconcile(ctx context.Context, tr *v1beta1.TaskRun, rtr *re
// This is used by createPod below. Changes to the Spec are not updated.
tr.Spec.Workspaces = taskRunWorkspaces
}
pod, err = c.createPod(ctx, ts, tr, rtr)
pod, err = c.createPod(ctx, ts, tr, rtr, workspaceVolumes)
if err != nil {
newErr := c.handlePodCreationError(tr, err)
logger.Errorf("Failed to create task run pod for taskrun %q: %v", tr.Name, newErr)
Expand Down Expand Up @@ -651,7 +658,7 @@ func (c *Reconciler) failTaskRun(ctx context.Context, tr *v1beta1.TaskRun, reaso

// createPod creates a Pod based on the Task's configuration, with pvcName as a volumeMount
// TODO(dibyom): Refactor resource setup/substitution logic to its own function in the resources package
func (c *Reconciler) createPod(ctx context.Context, ts *v1beta1.TaskSpec, tr *v1beta1.TaskRun, rtr *resources.ResolvedTaskResources) (*corev1.Pod, error) {
func (c *Reconciler) createPod(ctx context.Context, ts *v1beta1.TaskSpec, tr *v1beta1.TaskRun, rtr *resources.ResolvedTaskResources, workspaceVolumes map[string]corev1.Volume) (*corev1.Pod, error) {
logger := logging.FromContext(ctx)
inputResources, err := resourceImplBinding(rtr.Inputs, c.Images)
if err != nil {
Expand Down Expand Up @@ -687,18 +694,13 @@ func (c *Reconciler) createPod(ctx context.Context, ts *v1beta1.TaskSpec, tr *v1
ts = resources.ApplyResources(ts, inputResources, "inputs")
ts = resources.ApplyResources(ts, outputResources, "outputs")

// Get the randomized volume names assigned to workspace bindings
workspaceVolumes := workspace.CreateVolumes(tr.Spec.Workspaces)

// Apply workspace resource substitution
ts = resources.ApplyWorkspaces(ctx, ts, ts.Workspaces, tr.Spec.Workspaces, workspaceVolumes)

if validateErr := ts.Validate(ctx); validateErr != nil {
logger.Errorf("Failed to create a pod for taskrun: %s due to task validation error %v", tr.Name, validateErr)
return nil, validateErr
}

ts, err = workspace.Apply(ctx, *ts, tr.Spec.Workspaces, workspaceVolumes)

if err != nil {
logger.Errorf("Failed to create a pod for taskrun: %s due to workspace error %v", tr.Name, err)
return nil, err
Expand Down Expand Up @@ -740,7 +742,7 @@ func (c *Reconciler) createPod(ctx context.Context, ts *v1beta1.TaskSpec, tr *v1
return pod, err
}

func updateTaskSpecParamsContextsResults(tr *v1beta1.TaskRun, rtr *resources.ResolvedTaskResources) *v1beta1.TaskSpec {
func updateTaskSpecParamsContextsResults(ctx context.Context, tr *v1beta1.TaskRun, rtr *resources.ResolvedTaskResources, workspaceVolumes map[string]corev1.Volume) (*v1beta1.TaskSpec, error) {
ts := rtr.TaskSpec.DeepCopy()
var defaults []v1beta1.ParamSpec
if len(ts.Params) > 0 {
Expand All @@ -758,7 +760,30 @@ func updateTaskSpecParamsContextsResults(tr *v1beta1.TaskRun, rtr *resources.Res
// Apply step exitCode path substitution
ts = resources.ApplyStepExitCodePath(ts)

return ts
// Apply workspace resource substitution
if config.FromContextOrDefaults(ctx).FeatureFlags.EnableAPIFields == "alpha" {
// propagate workspaces from taskrun to task.
twn := []string{}
for _, tw := range ts.Workspaces {
twn = append(twn, tw.Name)
}

for _, trw := range tr.Spec.Workspaces {
skip := false
for _, tw := range twn {
if tw == trw.Name {
skip = true
break
}
}
if !skip {
ts.Workspaces = append(ts.Workspaces, v1beta1.WorkspaceDeclaration{Name: trw.Name})
}
}
}
ts = resources.ApplyWorkspaces(ctx, ts, ts.Workspaces, tr.Spec.Workspaces, workspaceVolumes)

return ts, nil
}

func isExceededResourceQuotaError(err error) bool {
Expand Down
Loading

0 comments on commit 2000cb1

Please sign in to comment.