Skip to content

Commit

Permalink
If no resources are linked, don't make a PVC 🗑️
Browse files Browse the repository at this point in the history
When ouputs of a Task are linked to the inputs of another we create a
PVC (or use a bucket) to share the data between executing Tasks. However
we were doing this even when no Resources were linked! Now we only do it
when we need to.

Co-authored-by: Christie Wilson <[email protected]>
Fixes: tektoncd#937
  • Loading branch information
dlorenc and bobcatfish committed Nov 8, 2019
1 parent 3882f07 commit 30abfa0
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 105 deletions.
7 changes: 7 additions & 0 deletions pkg/apis/pipeline/v1alpha1/resource_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ import (
// additional metatdata should be provided for it.
type PipelineResourceType string

var (
AllowedOutputResources = map[PipelineResourceType]bool{
PipelineResourceTypeStorage: true,
PipelineResourceTypeGit: true,
}
)

const (
// PipelineResourceTypeGit indicates that this source is a GitHub repo.
PipelineResourceTypeGit PipelineResourceType = "git"
Expand Down
129 changes: 43 additions & 86 deletions pkg/artifacts/artifact_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,50 @@ var (
return x.Cmp(y) == 0
})

tasksWithFrom = []v1alpha1.PipelineTask{
{
Name: "task1",
TaskRef: v1alpha1.TaskRef{
Name: "task",
},
Resources: &v1alpha1.PipelineTaskResources{
Outputs: []v1alpha1.PipelineTaskOutputResource{{
Name: "output",
Resource: "resource",
}},
},
},
{
Name: "task2",
TaskRef: v1alpha1.TaskRef{
Name: "task",
pipelineWithtasksWithFrom = v1alpha1.Pipeline{
Spec: v1alpha1.PipelineSpec{
Resources: []v1alpha1.PipelineDeclaredResource{
{
Name: "input1",
Type: "git",
},
{
Name: "output",
Type: "git",
},
},
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{
Name: "input1",
Resource: "resource",
From: []string{"task1"},
}},
Tasks: []v1alpha1.PipelineTask{
{
Name: "task1",
TaskRef: v1alpha1.TaskRef{
Name: "task",
},
Resources: &v1alpha1.PipelineTaskResources{
Outputs: []v1alpha1.PipelineTaskOutputResource{{
Name: "output",
Resource: "resource",
}},
},
},
{
Name: "task2",
TaskRef: v1alpha1.TaskRef{
Name: "task",
},
Resources: &v1alpha1.PipelineTaskResources{
Inputs: []v1alpha1.PipelineTaskInputResource{{
Name: "input1",
Resource: "resource",
From: []string{"task1"},
}},
},
},
},
},
}
)


func GetPersistentVolumeClaim(pipelinerun *v1alpha1.PipelineRun, size, storageClassName string) *corev1.PersistentVolumeClaim {
func GetPersistentVolumeClaim(size string, storageClassName *string) *corev1.PersistentVolumeClaim {
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: "pipelineruntest-pvc", Namespace: pipelinerun.Namespace, OwnerReferences: pipelinerun.GetOwnerReference()},
Spec: corev1.PersistentVolumeClaimSpec{
Expand Down Expand Up @@ -183,16 +196,6 @@ func TestConfigMapNeedsPVC(t *testing.T) {
}

func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
pipelinerun := &v1alpha1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
Name: "pipelineruntest",
},
Spec: v1alpha1.PipelineRunSpec{
PipelineRef: v1alpha1.PipelineRef{
Name: "pipelineWithFrom",
},
},
}
logger := logtesting.TestLogger(t)
for _, c := range []struct {
desc string
Expand All @@ -212,7 +215,6 @@ func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
},
expectedArtifactStorage: &v1alpha1.ArtifactPVC{
Name: "pipelineruntest",
<<<<<<< HEAD
PersistentVolumeClaim: GetPersistentVolumeClaim("10Gi", defaultStorageClass),
ShellImage: "busybox",
},
Expand All @@ -228,14 +230,10 @@ func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
PvcStorageClassNameKey: customStorageClass,
},
},
pipelinerun: pipelinerun,
expectedArtifactStorage: &v1alpha1.ArtifactPVC{
Name: "pipelineruntest",
PersistentVolumeClaim: GetPersistentVolumeClaim("5Gi", &customStorageClass),
ShellImage: "busybox",
=======
PersistentVolumeClaim: GetPersistentVolumeClaim(pipelinerun, "10Gi"),
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
},
storagetype: "pvc",
}, {
Expand Down Expand Up @@ -277,12 +275,8 @@ func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
},
expectedArtifactStorage: &v1alpha1.ArtifactPVC{
Name: "pipelineruntest",
<<<<<<< HEAD
PersistentVolumeClaim: persistentVolumeClaim,
ShellImage: "busybox",
=======
PersistentVolumeClaim: GetPersistentVolumeClaim(pipelinerun, DefaultPvcSize),
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
},
storagetype: "pvc",
}, {
Expand All @@ -299,12 +293,8 @@ func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
},
expectedArtifactStorage: &v1alpha1.ArtifactPVC{
Name: "pipelineruntest",
<<<<<<< HEAD
PersistentVolumeClaim: persistentVolumeClaim,
ShellImage: "busybox",
=======
PersistentVolumeClaim: GetPersistentVolumeClaim(pipelinerun, DefaultPvcSize),
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
},
storagetype: "pvc",
}, {
Expand All @@ -317,12 +307,8 @@ func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
},
expectedArtifactStorage: &v1alpha1.ArtifactPVC{
Name: "pipelineruntest",
<<<<<<< HEAD
PersistentVolumeClaim: persistentVolumeClaim,
ShellImage: "busybox",
=======
PersistentVolumeClaim: GetPersistentVolumeClaim(pipelinerun, DefaultPvcSize),
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
},
storagetype: "pvc",
}, {
Expand Down Expand Up @@ -356,7 +342,6 @@ func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
v1alpha1.BucketServiceAccountFieldName: "BOTO_CONFIG",
},
},
pipelinerun: pipelinerun,
expectedArtifactStorage: &v1alpha1.ArtifactBucket{
Location: "s3://fake-bucket",
ShellImage: "busybox",
Expand All @@ -371,11 +356,7 @@ func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
}} {
t.Run(c.desc, func(t *testing.T) {
fakekubeclient := fakek8s.NewSimpleClientset(c.configMap)
<<<<<<< HEAD
artifactStorage, err := InitializeArtifactStorage(images, c.pipelinerun, fakekubeclient, logger)
=======
artifactStorage, err := InitializeArtifactStorage(pipelinerun, tasksWithFrom, fakekubeclient, logger)
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
artifactStorage, err := InitializeArtifactStorage(images, pipelinerun, &pipelineWithtasksWithFrom.Spec, fakekubeclient, logger)
if err != nil {
t.Fatalf("Somehow had error initializing artifact storage run out of fake client: %s", err)
}
Expand All @@ -395,17 +376,10 @@ func TestInitializeArtifactStorageWithConfigMap(t *testing.T) {
t.Fatalf("Error cleaning up artifact storage: %s", err)
}
if diff := cmp.Diff(artifactStorage.GetType(), c.storagetype); diff != "" {
<<<<<<< HEAD
t.Fatalf("-want +got: %s", diff)
}
if diff := cmp.Diff(artifactStorage, c.expectedArtifactStorage, quantityComparer); diff != "" {
t.Fatalf("-want +got: %s", diff)
=======
t.Fatalf(diff)
}
if diff := cmp.Diff(artifactStorage, c.expectedArtifactStorage, quantityComparer); diff != "" {
t.Fatalf(diff)
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
}
})
}
Expand Down Expand Up @@ -463,7 +437,7 @@ func TestInitializeArtifactStorageNoStorageNeeded(t *testing.T) {
Namespace: "namespace",
},
Spec: v1alpha1.PipelineRunSpec{
PipelineRef: v1alpha1.PipelineRef{
PipelineRef: &v1alpha1.PipelineRef{
Name: "pipeline",
},
},
Expand Down Expand Up @@ -511,7 +485,7 @@ func TestInitializeArtifactStorageNoStorageNeeded(t *testing.T) {
}} {
t.Run(c.desc, func(t *testing.T) {
fakekubeclient := fakek8s.NewSimpleClientset(c.configMap)
artifactStorage, err := InitializeArtifactStorage(pipelinerun, pipeline.Spec.Tasks, fakekubeclient, logger)
artifactStorage, err := InitializeArtifactStorage(images, pipelinerun, &pipeline.Spec, fakekubeclient, logger)
if err != nil {
t.Fatalf("Somehow had error initializing artifact storage run out of fake client: %s", err)
}
Expand Down Expand Up @@ -568,13 +542,8 @@ func TestCleanupArtifactStorage(t *testing.T) {
},
}} {
t.Run(c.desc, func(t *testing.T) {
<<<<<<< HEAD
fakekubeclient := fakek8s.NewSimpleClientset(c.configMap, GetPVCSpec(c.pipelinerun, persistentVolumeClaim.Spec.Resources.Requests["storage"], defaultStorageClass))
_, err := fakekubeclient.CoreV1().PersistentVolumeClaims(c.pipelinerun.Namespace).Get(GetPVCName(c.pipelinerun), metav1.GetOptions{})
=======
fakekubeclient := fakek8s.NewSimpleClientset(c.configMap, GetPVCSpec(pipelinerun, GetPersistentVolumeClaim(pipelinerun, DefaultPvcSize).Spec.Resources.Requests["storage"]))
fakekubeclient := fakek8s.NewSimpleClientset(c.configMap, GetPVCSpec(pipelinerun, persistentVolumeClaim.Spec.Resources.Requests["storage"], defaultStorageClass))
_, err := fakekubeclient.CoreV1().PersistentVolumeClaims(pipelinerun.Namespace).Get(GetPVCName(pipelinerun), metav1.GetOptions{})
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
if err != nil {
t.Fatalf("Error getting expected PVC %s for PipelineRun %s: %s", GetPVCName(pipelinerun), pipelinerun.Name, err)
}
Expand All @@ -600,31 +569,19 @@ func TestInitializeArtifactStorageWithoutConfigMap(t *testing.T) {
logger := logtesting.TestLogger(t)
fakekubeclient := fakek8s.NewSimpleClientset()

<<<<<<< HEAD
pvc, err := InitializeArtifactStorage(images, pipelinerun, fakekubeclient, logger)
=======
pvc, err := InitializeArtifactStorage(pipelinerun, tasksWithFrom, fakekubeclient, logger)
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
pvc, err := InitializeArtifactStorage(images, pipelinerun, &pipelineWithtasksWithFrom.Spec, fakekubeclient, logger)
if err != nil {
t.Fatalf("Somehow had error initializing artifact storage run out of fake client: %s", err)
}

expectedArtifactPVC := &v1alpha1.ArtifactPVC{
Name: "pipelineruntest",
<<<<<<< HEAD
PersistentVolumeClaim: persistentVolumeClaim,
ShellImage: "busybox",
}

if diff := cmp.Diff(pvc, expectedArtifactPVC, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" {
t.Fatalf("-want +got: %s", diff)
=======
PersistentVolumeClaim: GetPersistentVolumeClaim(pipelinerun, DefaultPvcSize),
}

if diff := cmp.Diff(pvc, expectedArtifactPVC, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" {
t.Fatal(diff)
>>>>>>> 1d99c320... If no resources are linked, don't make a PVC 🗑️
}
}

Expand Down
14 changes: 10 additions & 4 deletions pkg/artifacts/artifacts_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,19 @@ func (a *ArtifactStorageNone) StorageBasePath(pr *v1alpha1.PipelineRun) string {

// InitializeArtifactStorage will check if there is there is a
// bucket configured, create a PVC or return nil if no storage is required.
func InitializeArtifactStorage(images pipeline.Images, pr *v1alpha1.PipelineRun, ts []v1alpha1.PipelineTask, c kubernetes.Interface, logger *zap.SugaredLogger) (ArtifactStorageInterface, error) {
func InitializeArtifactStorage(images pipeline.Images, pr *v1alpha1.PipelineRun, ps *v1alpha1.PipelineSpec, c kubernetes.Interface, logger *zap.SugaredLogger) (ArtifactStorageInterface, error) {
// Artifact storage is only required if a pipeline has tasks that take inputs from previous tasks.
needStorage := false
for _, t := range ts {
possibleOutputs := map[string]struct{}{}
for _, r := range ps.Resources {
if _, ok := v1alpha1.AllowedOutputResources[r.Type]; ok {
possibleOutputs[r.Name] = struct{}{}
}
}
for _, t := range ps.Tasks {
if t.Resources != nil {
for _, i := range t.Resources.Inputs {
if len(i.From) != 0 {
for _, o := range t.Resources.Outputs {
if _, ok := possibleOutputs[o.Name]; ok {
needStorage = true
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er

var as artifacts.ArtifactStorageInterface

if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec.Tasks, c.KubeClientSet, c.Logger); err != nil {
if as, err = artifacts.InitializeArtifactStorage(c.Images, pr, pipelineSpec, c.KubeClientSet, c.Logger); err != nil {
c.Logger.Infof("PipelineRun failed to initialize artifact storage %s", pr.Name)
return err
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,10 +847,8 @@ func TestReconcileWithoutPVC(t *testing.T) {
Tasks: ts,
}

// create fake recorder for testing
fr := record.NewFakeRecorder(2)

testAssets := getPipelineRunController(t, d, fr)
testAssets, cancel := getPipelineRunController(t, d)
defer cancel()
c := testAssets.Controller
clients := testAssets.Clients

Expand Down Expand Up @@ -879,6 +877,7 @@ func TestReconcileWithoutPVC(t *testing.T) {
t.Errorf("Expected PipelineRun to be running, but condition status is %s", reconciledRun.Status.GetCondition(apis.ConditionSucceeded))
}
}

func TestReconcileCancelledPipelineRun(t *testing.T) {
ps := []*v1alpha1.Pipeline{tb.Pipeline("test-pipeline", "foo", tb.PipelineSpec(
tb.PipelineTask("hello-world-1", "hello-world", tb.Retries(1)),
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/taskrun/resources/input_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func AddInputResource(
dPath := destinationPath(input.Name, input.TargetPath)
// if taskrun is fetching resource from previous task then execute copy step instead of fetching new copy
// to the desired destination directory, as long as the resource exports output to be copied
if allowedOutputResources[resource.GetType()] && taskRun.HasPipelineRunOwnerReference() {
if v1alpha1.AllowedOutputResources[resource.GetType()] && taskRun.HasPipelineRunOwnerReference() {
for _, path := range boundResource.Paths {
cpSteps := as.GetCopyFromStorageToSteps(boundResource.Name, path, dPath)
if as.GetType() == v1alpha1.ArtifactStoragePVCType {
Expand Down
15 changes: 6 additions & 9 deletions pkg/reconciler/taskrun/resources/output_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ import (

var (
outputDir = "/workspace/output/"

// allowedOutputResource checks if an output resource type produces
// an output that should be copied to the PVC
allowedOutputResources = map[v1alpha1.PipelineResourceType]bool{
v1alpha1.PipelineResourceTypeStorage: true,
v1alpha1.PipelineResourceTypeGit: true,
}
)

// AddOutputResources reads the output resources and adds the corresponding container steps
Expand Down Expand Up @@ -94,10 +87,12 @@ func AddOutputResources(
mkdirSteps := []v1alpha1.Step{v1alpha1.CreateDirStep(images.ShellImage, boundResource.Name, sourcePath)}
taskSpec.Steps = append(mkdirSteps, taskSpec.Steps...)

if allowedOutputResources[resource.GetType()] && taskRun.HasPipelineRunOwnerReference() {
needsPvc := false
if v1alpha1.AllowedOutputResources[resource.GetType()] && taskRun.HasPipelineRunOwnerReference() {
var newSteps []v1alpha1.Step
for _, dPath := range boundResource.Paths {
newSteps = append(newSteps, as.GetCopyToStorageFromSteps(resource.GetName(), sourcePath, dPath)...)
needsPvc = true
}
taskSpec.Steps = append(taskSpec.Steps, newSteps...)
taskSpec.Volumes = append(taskSpec.Volumes, as.GetSecretsVolumes()...)
Expand All @@ -124,7 +119,9 @@ func AddOutputResources(
return taskSpec, nil
}
}
taskSpec.Volumes = append(taskSpec.Volumes, GetPVCVolume(pvcName))
if needsPvc {
taskSpec.Volumes = append(taskSpec.Volumes, GetPVCVolume(pvcName))
}
}
}
return taskSpec, nil
Expand Down

0 comments on commit 30abfa0

Please sign in to comment.