diff --git a/pkg/reconciler/pipelinerun/affinity_assistant.go b/pkg/reconciler/pipelinerun/affinity_assistant.go index 9e2c1efe8b5..b83495e0883 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant.go @@ -19,12 +19,14 @@ package pipelinerun import ( "context" "crypto/sha256" + "errors" "fmt" "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/apis/pipeline/pod" v1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1" + "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" aa "github.com/tektoncd/pipeline/pkg/internal/affinityassistant" "github.com/tektoncd/pipeline/pkg/reconciler/volumeclaim" "github.com/tektoncd/pipeline/pkg/workspace" @@ -40,13 +42,18 @@ import ( ) const ( - // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace indicates that a PipelineRun uses workspaces with PersistentVolumeClaim + // ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet indicates that a PipelineRun uses workspaces with PersistentVolumeClaim // as a volume source and expect an Assistant StatefulSet in AffinityAssistantPerWorkspace behavior, but couldn't create a StatefulSet. - ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace" + ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet = "ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet" featureFlagDisableAffinityAssistantKey = "disable-affinity-assistant" ) +var ( + ErrPvcCreationFailed = errors.New("PVC creation error") + ErrAffinityAssistantCreationFailed = errors.New("Affinity Assistant creation error") +) + // createOrUpdateAffinityAssistantsAndPVCs creates Affinity Assistant StatefulSets and PVCs based on AffinityAssistantBehavior. // This is done to achieve Node Affinity for taskruns in a pipelinerun, and make it possible for the taskruns to execute parallel while sharing volume. // If the AffinityAssistantBehavior is AffinityAssistantPerWorkspace, it creates an Affinity Assistant for @@ -54,7 +61,6 @@ const ( // If the AffinityAssistantBehavior is AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation, // it creates one Affinity Assistant for the pipelinerun. func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context, pr *v1.PipelineRun, aaBehavior aa.AffinityAssistantBehavior) error { - var errs []error var unschedulableNodes sets.Set[string] = nil var claimTemplates []corev1.PersistentVolumeClaim @@ -82,15 +88,16 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context // affinity assistant so that the OwnerReference of the PVCs are the pipelineruns, which is used to achieve PVC auto deletion at PipelineRun deletion time if (aaBehavior == aa.AffinityAssistantPerWorkspace || aaBehavior == aa.AffinityAssistantDisabled) && pr.HasVolumeClaimTemplate() { if err := c.pvcHandler.CreatePVCsForWorkspaces(ctx, pr.Spec.Workspaces, *kmeta.NewControllerRef(pr), pr.Namespace); err != nil { - return fmt.Errorf("failed to create PVC for PipelineRun %s: %w", pr.Name, err) + return fmt.Errorf("%w: %v", ErrPvcCreationFailed, err) //nolint:errorlint } } switch aaBehavior { case aa.AffinityAssistantPerWorkspace: for claim, workspaceName := range claimToWorkspace { aaName := GetAffinityAssistantName(workspaceName, pr.Name) - err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes) - errs = append(errs, err...) + if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{*claim}, unschedulableNodes); err != nil { + return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err) + } } for claimTemplate, workspaceName := range claimTemplatesToWorkspace { aaName := GetAffinityAssistantName(workspaceName, pr.Name) @@ -98,22 +105,22 @@ func (c *Reconciler) createOrUpdateAffinityAssistantsAndPVCs(ctx context.Context // In AffinityAssistantPerWorkspace mode, the reconciler has created PVCs (owned by pipelinerun) from pipelinerun's VolumeClaimTemplate at this point, // so the VolumeClaimTemplates are pass in as PVCs when creating affinity assistant StatefulSet for volume scheduling. // If passed in as VolumeClaimTemplates, the PVCs are owned by Affinity Assistant StatefulSet instead of the pipelinerun. - err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes) - errs = append(errs, err...) + if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, nil, []corev1.PersistentVolumeClaimVolumeSource{{ClaimName: claimTemplate.Name}}, unschedulableNodes); err != nil { + return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err) + } } case aa.AffinityAssistantPerPipelineRun, aa.AffinityAssistantPerPipelineRunWithIsolation: - if claims != nil || claimTemplates != nil { - aaName := GetAffinityAssistantName("", pr.Name) - // In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling. - // PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time, - // so we don't need to worry the OwnerReference of the PVCs - err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes) - errs = append(errs, err...) + aaName := GetAffinityAssistantName("", pr.Name) + // In AffinityAssistantPerPipelineRun or AffinityAssistantPerPipelineRunWithIsolation modes, the PVCs are created via StatefulSet for volume scheduling. + // PVCs from pipelinerun's VolumeClaimTemplate are enforced to be deleted at pipelinerun completion time, + // so we don't need to worry the OwnerReference of the PVCs + if err := c.createOrUpdateAffinityAssistant(ctx, aaName, pr, claimTemplates, claims, unschedulableNodes); err != nil { + return fmt.Errorf("%w: %v", ErrAffinityAssistantCreationFailed, err) } case aa.AffinityAssistantDisabled: } - return errorutils.NewAggregate(errs) + return nil } // createOrUpdateAffinityAssistant creates an Affinity Assistant Statefulset with the provided affinityAssistantName and pipelinerun information. @@ -225,13 +232,29 @@ func (c *Reconciler) cleanupAffinityAssistantsAndPVCs(ctx context.Context, pr *v // getPersistentVolumeClaimNameWithAffinityAssistant returns the PersistentVolumeClaim name that is // created by the Affinity Assistant StatefulSet VolumeClaimTemplate when Affinity Assistant is enabled. // The PVCs created by StatefulSet VolumeClaimTemplates follow the format `--0` -// TODO(#6740)(WIP): use this function when adding end-to-end support for AffinityAssistantPerPipelineRun mode func getPersistentVolumeClaimNameWithAffinityAssistant(pipelineWorkspaceName, prName string, wb v1.WorkspaceBinding, owner metav1.OwnerReference) string { pvcName := volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner) affinityAssistantName := GetAffinityAssistantName(pipelineWorkspaceName, prName) return fmt.Sprintf("%s-%s-0", pvcName, affinityAssistantName) } +// getAffinityAssistantAnnotationVal generates and returns the value for `pipeline.tekton.dev/affinity-assistant` annotation +// based on aaBehavior, pipelinePVCWorkspaceName and prName +func getAffinityAssistantAnnotationVal(aaBehavior affinityassistant.AffinityAssistantBehavior, pipelinePVCWorkspaceName string, prName string) string { + switch aaBehavior { + case affinityassistant.AffinityAssistantPerWorkspace: + if pipelinePVCWorkspaceName != "" { + return GetAffinityAssistantName(pipelinePVCWorkspaceName, prName) + } + case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: + return GetAffinityAssistantName("", prName) + + case affinityassistant.AffinityAssistantDisabled: + } + + return "" +} + // GetAffinityAssistantName returns the Affinity Assistant name based on pipelineWorkspaceName and pipelineRunName func GetAffinityAssistantName(pipelineWorkspaceName string, pipelineRunName string) string { hashBytes := sha256.Sum256([]byte(pipelineWorkspaceName + pipelineRunName)) @@ -348,17 +371,6 @@ func affinityAssistantStatefulSet(aaBehavior aa.AffinityAssistantBehavior, name } } -// isAffinityAssistantDisabled returns a bool indicating whether an Affinity Assistant should -// be created for each PipelineRun that use workspaces with PersistentVolumeClaims -// as volume source. The default behaviour is to enable the Affinity Assistant to -// provide Node Affinity for TaskRuns that share a PVC workspace. -// -// TODO(#6740)(WIP): replace this function with GetAffinityAssistantBehavior -func (c *Reconciler) isAffinityAssistantDisabled(ctx context.Context) bool { - cfg := config.FromContextOrDefaults(ctx) - return cfg.FeatureFlags.DisableAffinityAssistant -} - // getAssistantAffinityMergedWithPodTemplateAffinity return the affinity that merged with PipelineRun PodTemplate affinity. func getAssistantAffinityMergedWithPodTemplateAffinity(pr *v1.PipelineRun, aaBehavior aa.AffinityAssistantBehavior) *corev1.Affinity { affinityAssistantsAffinity := &corev1.Affinity{} diff --git a/pkg/reconciler/pipelinerun/affinity_assistant_test.go b/pkg/reconciler/pipelinerun/affinity_assistant_test.go index 6dea45031a8..484cdb1f8ba 100644 --- a/pkg/reconciler/pipelinerun/affinity_assistant_test.go +++ b/pkg/reconciler/pipelinerun/affinity_assistant_test.go @@ -19,6 +19,7 @@ package pipelinerun import ( "context" "errors" + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -49,7 +50,6 @@ import ( ) var podSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodSpec{}, "Containers", "Affinity") -var statefulSetSpecFilter cmp.Option = cmpopts.IgnoreFields(appsv1.StatefulSetSpec{}, "Replicas", "Selector") var podTemplateSpecFilter cmp.Option = cmpopts.IgnoreFields(corev1.PodTemplateSpec{}, "ObjectMeta") var workspacePVCName = "test-workspace-pvc" @@ -111,6 +111,7 @@ var testPRWithEmptyDir = &v1.PipelineRun{ // TestCreateAndDeleteOfAffinityAssistantPerPipelineRun tests to create and delete an Affinity Assistant // per pipelinerun for a given PipelineRun func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { + replicas := int32(1) tests := []struct { name string pr *v1.PipelineRun @@ -119,6 +120,14 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { name: "PersistentVolumeClaim Workspace type", pr: testPRWithPVC, expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithPVC.Name, + workspace.LabelInstance: "affinity-assistant-622aca4516", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -134,6 +143,14 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { name: "VolumeClaimTemplate Workspace type", pr: testPRWithVolumeClaimTemplate, expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplate.Name, + workspace.LabelInstance: "affinity-assistant-426b306c50", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, }}, @@ -142,6 +159,14 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { name: "VolumeClaimTemplate and PersistentVolumeClaim Workspaces", pr: testPRWithVolumeClaimTemplateAndPVC, expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name, + workspace.LabelInstance: "affinity-assistant-5bf44db4a8", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, VolumeClaimTemplates: []corev1.PersistentVolumeClaim{{ ObjectMeta: metav1.ObjectMeta{Name: "pvc-b9eea16dce"}, }}, @@ -157,16 +182,31 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { }, }, }, { - name: "other Workspace type", - pr: testPRWithEmptyDir, - expectStatefulSetSpec: nil, + name: "other Workspace type", + pr: testPRWithEmptyDir, + expectStatefulSetSpec: &appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithEmptyDir.Name, + workspace.LabelInstance: "affinity-assistant-c655a0c8a2", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, + }, }} for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - ctx := context.Background() + configMap := map[string]string{ + "disable-affinity-assistant": "true", + "coschedule": "pipelineruns", + } + kubeClientSet := fakek8s.NewSimpleClientset() + ctx := cfgtesting.SetFeatureFlags(context.Background(), t, configMap) c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset(), + KubeClientSet: kubeClientSet, + pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()), } err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, tc.pr, aa.AffinityAssistantPerPipelineRun) @@ -178,7 +218,15 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { expectAAName := GetAffinityAssistantName("", tc.pr.Name) validateStatefulSetSpec(t, ctx, c, expectAAName, tc.expectStatefulSetSpec) - // TODO(#6740)(WIP): test cleanupAffinityAssistantsAndPVCs for coscheduling-pipelinerun mode when fully implemented + // clean up Affinity Assistant + c.cleanupAffinityAssistantsAndPVCs(ctx, tc.pr) + if err != nil { + t.Errorf("unexpected error from cleanupAffinityAssistants: %v", err) + } + _, err = c.KubeClientSet.AppsV1().StatefulSets(tc.pr.Namespace).Get(ctx, expectAAName, metav1.GetOptions{}) + if !apierrors.IsNotFound(err) { + t.Errorf("expected a NotFound response, got: %v", err) + } }) } } @@ -186,6 +234,7 @@ func TestCreateAndDeleteOfAffinityAssistantPerPipelineRun(t *testing.T) { // TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled tests to create and delete an Affinity Assistant // per workspace for a given PipelineRun func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) { + replicas := int32(1) tests := []struct { name, expectedPVCName string pr *v1.PipelineRun @@ -196,6 +245,14 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) aaBehavior: aa.AffinityAssistantPerWorkspace, pr: testPRWithPVC, expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithPVC.Name, + workspace.LabelInstance: "affinity-assistant-ac9f8fc5ee", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -213,6 +270,14 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) pr: testPRWithVolumeClaimTemplate, expectedPVCName: "pvc-b9eea16dce", expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplate.Name, + workspace.LabelInstance: "affinity-assistant-4cf1a1c468", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -235,6 +300,14 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) pr: testPRWithVolumeClaimTemplateAndPVC, expectedPVCName: "pvc-b9eea16dce", expectStatefulSetSpec: []*appsv1.StatefulSetSpec{{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name, + workspace.LabelInstance: "affinity-assistant-6c87e714a0", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -245,6 +318,14 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) }}, }, }}, { + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + pipeline.PipelineRunLabelKey: testPRWithVolumeClaimTemplateAndPVC.Name, + workspace.LabelInstance: "affinity-assistant-6399c93362", + workspace.LabelComponent: workspace.ComponentNameAffinityAssistant, + }, + }, Template: corev1.PodTemplateSpec{ Spec: corev1.PodSpec{ Volumes: []corev1.Volume{{ @@ -314,6 +395,76 @@ func TestCreateAndDeleteOfAffinityAssistantPerWorkspaceOrDisabled(t *testing.T) } } +func TestCreateOrUpdateAffinityAssistantsAndPVCs_Failure(t *testing.T) { + testCases := []struct { + name, failureType string + aaBehavior aa.AffinityAssistantBehavior + expectedErr error + }{{ + name: "affinity assistant creation failed - per workspace", + failureType: "statefulset", + aaBehavior: aa.AffinityAssistantPerWorkspace, + expectedErr: fmt.Errorf("%w: [failed to create StatefulSet affinity-assistant-4cf1a1c468: error creating statefulsets]", ErrAffinityAssistantCreationFailed), + }, { + name: "affinity assistant creation failed - per pipelinerun", + failureType: "statefulset", + aaBehavior: aa.AffinityAssistantPerPipelineRun, + expectedErr: fmt.Errorf("%w: [failed to create StatefulSet affinity-assistant-426b306c50: error creating statefulsets]", ErrAffinityAssistantCreationFailed), + }, { + name: "pvc creation failed - per workspace", + failureType: "pvc", + aaBehavior: aa.AffinityAssistantPerWorkspace, + expectedErr: fmt.Errorf("%w: failed to create PVC pvc-b9eea16dce: error creating persistentvolumeclaims", ErrPvcCreationFailed), + }, { + name: "pvc creation failed - disabled", + failureType: "pvc", + aaBehavior: aa.AffinityAssistantDisabled, + expectedErr: fmt.Errorf("%w: failed to create PVC pvc-b9eea16dce: error creating persistentvolumeclaims", ErrPvcCreationFailed), + }} + + for _, tc := range testCases { + ctx := context.Background() + kubeClientSet := fakek8s.NewSimpleClientset() + c := Reconciler{ + KubeClientSet: kubeClientSet, + pvcHandler: volumeclaim.NewPVCHandler(kubeClientSet, zap.NewExample().Sugar()), + } + + switch tc.failureType { + case "pvc": + c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "persistentvolumeclaims", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &corev1.PersistentVolumeClaim{}, errors.New("error creating persistentvolumeclaims") + }) + case "statefulset": + c.KubeClientSet.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "statefulsets", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &appsv1.StatefulSet{}, errors.New("error creating statefulsets") + }) + } + + err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, testPRWithVolumeClaimTemplate, tc.aaBehavior) + + if err == nil { + t.Errorf("expect error from createOrUpdateAffinityAssistantsAndPVCs but got nil") + } + + switch tc.failureType { + case "pvc": + if !errors.Is(err, ErrPvcCreationFailed) { + t.Errorf("expected err type mismatching, expecting %v but got: %v", ErrPvcCreationFailed, err) + } + case "statefulset": + if !errors.Is(err, ErrAffinityAssistantCreationFailed) { + t.Errorf("expected err type mismatching, expecting %v but got: %v", ErrAffinityAssistantCreationFailed, err) + } + } + if d := cmp.Diff(tc.expectedErr.Error(), err.Error()); d != "" { + t.Errorf("expected err mismatching: %v", diff.PrintWantGot(d)) + } + } +} + // TestCreateAffinityAssistantWhenNodeIsCordoned tests an existing Affinity Assistant can identify the node failure and // can migrate the affinity assistant pod to a healthy node so that the existing pipelineRun runs to competition func TestCreateOrUpdateAffinityAssistantWhenNodeIsCordoned(t *testing.T) { @@ -809,53 +960,6 @@ func TestThatCleanupIsAvoidedIfAssistantIsDisabled(t *testing.T) { } } -func TestDisableAffinityAssistant(t *testing.T) { - for _, tc := range []struct { - description string - configMap *corev1.ConfigMap - expected bool - }{{ - description: "Default behaviour: A missing disable-affinity-assistant flag should result in false", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{}, - }, - expected: false, - }, { - description: "Setting disable-affinity-assistant to false should result in false", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{ - featureFlagDisableAffinityAssistantKey: "false", - }, - }, - expected: false, - }, { - description: "Setting disable-affinity-assistant to true should result in true", - configMap: &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{Name: config.GetFeatureFlagsConfigName(), Namespace: system.Namespace()}, - Data: map[string]string{ - featureFlagDisableAffinityAssistantKey: "true", - }, - }, - expected: true, - }} { - t.Run(tc.description, func(t *testing.T) { - c := Reconciler{ - KubeClientSet: fakek8s.NewSimpleClientset( - tc.configMap, - ), - Images: pipeline.Images{}, - } - store := config.NewStore(logtesting.TestLogger(t)) - store.OnConfigChanged(tc.configMap) - if result := c.isAffinityAssistantDisabled(store.ToContext(context.Background())); result != tc.expected { - t.Errorf("Expected %t Received %t", tc.expected, result) - } - }) - } -} - func TestGetAssistantAffinityMergedWithPodTemplateAffinity(t *testing.T) { labelSelector := &metav1.LabelSelector{ MatchLabels: map[string]string{ @@ -1030,6 +1134,48 @@ spec: } } +func TestGetAffinityAssistantAnnotationVal(t *testing.T) { + tcs := []struct { + name string + aaBehavior aa.AffinityAssistantBehavior + wsName, prName, expectAffinityAssistantAnnotationVal string + }{{ + name: "per workspace", + aaBehavior: aa.AffinityAssistantPerWorkspace, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-315f58d30d", + }, { + name: "per workspace - empty pipeline workspace name", + aaBehavior: aa.AffinityAssistantPerWorkspace, + prName: "my-pipelinerun", + }, { + name: "per pipelinerun", + aaBehavior: aa.AffinityAssistantPerPipelineRun, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50", + }, { + name: "isolate pipelinerun", + aaBehavior: aa.AffinityAssistantPerPipelineRunWithIsolation, + wsName: "my-ws", + prName: "my-pipelinerun", + expectAffinityAssistantAnnotationVal: "affinity-assistant-0b79942a50", + }, { + name: "disabled", + aaBehavior: aa.AffinityAssistantDisabled, + wsName: "my-ws", + prName: "my-pipelinerun", + }} + + for _, tc := range tcs { + aaAnnotationVal := getAffinityAssistantAnnotationVal(tc.aaBehavior, tc.wsName, tc.prName) + if diff := cmp.Diff(tc.expectAffinityAssistantAnnotationVal, aaAnnotationVal); diff != "" { + t.Errorf("Affinity Assistant Annotation Val mismatch: %v", diff) + } + } +} + type Data struct { StatefulSets []*appsv1.StatefulSet Nodes []*corev1.Node @@ -1068,7 +1214,7 @@ func validateStatefulSetSpec(t *testing.T, ctx context.Context, c Reconciler, ex if err != nil { t.Fatalf("unexpected error when retrieving StatefulSet: %v", err) } - if d := cmp.Diff(expectStatefulSetSpec, &aa.Spec, statefulSetSpecFilter, podSpecFilter, podTemplateSpecFilter); d != "" { + if d := cmp.Diff(expectStatefulSetSpec, &aa.Spec, podSpecFilter, podTemplateSpecFilter); d != "" { t.Errorf("StatefulSetSpec diff: %s", diff.PrintWantGot(d)) } } else if !apierrors.IsNotFound(err) { diff --git a/pkg/reconciler/pipelinerun/pipelinerun.go b/pkg/reconciler/pipelinerun/pipelinerun.go index f022e6efd87..aa3829f278e 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun.go +++ b/pkg/reconciler/pipelinerun/pipelinerun.go @@ -617,26 +617,21 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel if err != nil { return controller.NewPermanentError(err) } - - switch aaBehavior { - case affinityassistant.AffinityAssistantPerWorkspace, affinityassistant.AffinityAssistantDisabled: - if err = c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior); err != nil { - logger.Errorf("Failed to create PVC or affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) - if aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { - pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSetPerWorkspace, - "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", - pr.Namespace, pr.Name, err) - } else { - pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, - "Failed to create PVC for PipelineRun %s/%s Workspaces correctly: %s", - pr.Namespace, pr.Name, err) - } - - return controller.NewPermanentError(err) + if err := c.createOrUpdateAffinityAssistantsAndPVCs(ctx, pr, aaBehavior); err != nil { + switch { + case errors.Is(err, ErrPvcCreationFailed): + logger.Errorf("Failed to create PVC for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(volumeclaim.ReasonCouldntCreateWorkspacePVC, + "Failed to create PVC for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + case errors.Is(err, ErrAffinityAssistantCreationFailed): + logger.Errorf("Failed to create affinity assistant StatefulSet for PipelineRun %s: %v", pr.Name, err) + pr.Status.MarkFailed(ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + "Failed to create StatefulSet for PipelineRun %s/%s correctly: %s", + pr.Namespace, pr.Name, err) + default: } - case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: - // TODO(#6740)(WIP): implement end-to-end support for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation modes - return controller.NewPermanentError(fmt.Errorf("affinity assistant behavior: %v is not implemented", aaBehavior)) + return controller.NewPermanentError(err) } } @@ -883,8 +878,12 @@ func (c *Reconciler) createTaskRun(ctx context.Context, taskRunName string, para return nil, err } - if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" { - tr.Annotations[workspace.AnnotationAffinityAssistantName] = GetAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, err + } + if aaAnnotationVal := getAffinityAssistantAnnotationVal(aaBehavior, pipelinePVCWorkspaceName, pr.Name); aaAnnotationVal != "" { + tr.Annotations[workspace.AnnotationAffinityAssistantName] = aaAnnotationVal } logger.Infof("Creating a new TaskRun object %s for pipeline task %s", taskRunName, rpt.PipelineTask.Name) @@ -1004,10 +1003,15 @@ func (c *Reconciler) createCustomRun(ctx context.Context, runName string, params }, } } + // Set the affinity assistant annotation in case the custom task creates TaskRuns or Pods // that can take advantage of it. - if !c.isAffinityAssistantDisabled(ctx) && pipelinePVCWorkspaceName != "" { - r.Annotations[workspace.AnnotationAffinityAssistantName] = GetAffinityAssistantName(pipelinePVCWorkspaceName, pr.Name) + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, err + } + if aaAnnotationVal := getAffinityAssistantAnnotationVal(aaBehavior, pipelinePVCWorkspaceName, pr.Name); aaAnnotationVal != "" { + r.Annotations[workspace.AnnotationAffinityAssistantName] = aaAnnotationVal } logger.Infof("Creating a new CustomRun object %s", runName) @@ -1123,9 +1127,11 @@ func (c *Reconciler) taskWorkspaceByWorkspaceVolumeSource(ctx context.Context, p } binding.Name = taskWorkspaceName - // TODO(#6740)(WIP): get binding for AffinityAssistantPerPipelineRun and AffinityAssistantPerPipelineRunWithIsolation mode - if aaBehavior == affinityassistant.AffinityAssistantDisabled || aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { + switch aaBehavior { + case affinityassistant.AffinityAssistantPerWorkspace, affinityassistant.AffinityAssistantDisabled: binding.PersistentVolumeClaim.ClaimName = volumeclaim.GetPVCNameWithoutAffinityAssistant(wb.VolumeClaimTemplate.Name, wb, owner) + case affinityassistant.AffinityAssistantPerPipelineRun, affinityassistant.AffinityAssistantPerPipelineRunWithIsolation: + binding.PersistentVolumeClaim.ClaimName = getPersistentVolumeClaimNameWithAffinityAssistant("", prName, wb, owner) } return binding diff --git a/pkg/reconciler/pipelinerun/pipelinerun_test.go b/pkg/reconciler/pipelinerun/pipelinerun_test.go index 26576e3152f..bc1ac4be410 100644 --- a/pkg/reconciler/pipelinerun/pipelinerun_test.go +++ b/pkg/reconciler/pipelinerun/pipelinerun_test.go @@ -65,7 +65,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation/field" fakek8s "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/typed/core/v1/fake" ktesting "k8s.io/client-go/testing" + testing2 "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" "knative.dev/pkg/apis" @@ -13278,6 +13280,87 @@ spec: } } +func TestHandleAffinityAssistantAndPVCCreationError(t *testing.T) { + prName := "affinity-assistant-creation-fail" + namespace := "default" + pr := parse.MustParseV1PipelineRun(t, fmt.Sprintf(` +metadata: + name: %s + namespace: %s +spec: + pipelineSpec: + tasks: + - name: hello-world + workspaces: + - name: my-ws + taskSpec: + steps: + - image: busybox + script: echo hello + workspaces: + - name: my-ws + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi +`, prName, namespace)) + + tcs := []struct { + name, failureType, expectErrorReason string + }{{ + name: "pvc creation error", + failureType: "pvc", + expectErrorReason: volumeclaim.ReasonCouldntCreateWorkspacePVC, + }, { + name: "affinity assistant creation error", + failureType: "statefulset", + expectErrorReason: ReasonCouldntCreateOrUpdateAffinityAssistantStatefulSet, + }} + + for _, tc := range tcs { + d := test.Data{ + PipelineRuns: []*v1.PipelineRun{pr.DeepCopy()}, + } + testAssets, cancel := getPipelineRunController(t, d) + defer cancel() + c := testAssets.Controller + clients := testAssets.Clients + + // mock pvc creation err + switch tc.failureType { + case "pvc": + clients.Kube.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "persistentvolumeclaims", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &corev1.PersistentVolumeClaim{}, errors.New("error creating persistentvolumeclaims") + }) + case "statefulset": + clients.Kube.CoreV1().(*fake.FakeCoreV1).PrependReactor("create", "statefulsets", + func(action testing2.Action) (handled bool, ret runtime.Object, err error) { + return true, &appsv1.StatefulSet{}, errors.New("error creating statefulsets") + }) + } + + // reconciler + err := c.Reconciler.Reconcile(testAssets.Ctx, fmt.Sprintf("%s/%s", namespace, prName)) + if !controller.IsPermanentError(err) { + t.Errorf("expected permanent error but got %s", err) + } + + // check pr status + reconciledPr, err := clients.Pipeline.TektonV1().PipelineRuns(namespace).Get(testAssets.Ctx, prName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("had error getting reconciled pipelinerun out of fake client: %s", err) + } + if diff := cmp.Diff(tc.expectErrorReason, reconciledPr.Status.GetCondition(apis.ConditionSucceeded).Reason); diff != "" { + t.Errorf("pipelinerun fail reason mismatch: %v", diff) + } + } +} + func TestHandleTaskRunCreationError(t *testing.T) { prName := "taskrun-creation-fails" namespace := "default" diff --git a/pkg/reconciler/taskrun/taskrun.go b/pkg/reconciler/taskrun/taskrun.go index 695e72d6d13..4fb2598b53a 100644 --- a/pkg/reconciler/taskrun/taskrun.go +++ b/pkg/reconciler/taskrun/taskrun.go @@ -418,7 +418,11 @@ func (c *Reconciler) prepare(ctx context.Context, tr *v1.TaskRun) (*v1.TaskSpec, return nil, nil, controller.NewPermanentError(err) } - if _, usesAssistant := tr.Annotations[workspace.AnnotationAffinityAssistantName]; usesAssistant { + aaBehavior, err := affinityassistant.GetAffinityAssistantBehavior(ctx) + if err != nil { + return nil, nil, controller.NewPermanentError(err) + } + if aaBehavior == affinityassistant.AffinityAssistantPerWorkspace { if err := workspace.ValidateOnlyOnePVCIsUsed(tr.Spec.Workspaces); err != nil { logger.Errorf("TaskRun %q workspaces incompatible with Affinity Assistant: %v", tr.Name, err) tr.Status.MarkResourceFailed(podconvert.ReasonFailedValidation, err) diff --git a/pkg/reconciler/taskrun/taskrun_test.go b/pkg/reconciler/taskrun/taskrun_test.go index de4a14c5836..9c4d7bf8092 100644 --- a/pkg/reconciler/taskrun/taskrun_test.go +++ b/pkg/reconciler/taskrun/taskrun_test.go @@ -3272,10 +3272,9 @@ spec: } } -// TestReconcileWithWorkspacesIncompatibleWithAffinityAssistant tests that a TaskRun used with an associated -// Affinity Assistant is validated and that the validation fails for a TaskRun that is incompatible with -// Affinity Assistant; e.g. using more than one PVC-backed workspace. -func TestReconcileWithWorkspacesIncompatibleWithAffinityAssistant(t *testing.T) { +// TestReconcileWithMultiplePVCWorkspaceWithAffinityAssistant tests the execution of a TaskRun binding two VolumeClaimTemplate +// as Workspace in AffinityAssistantPerWorkspaces mode and AffinityAssistantPerPipelineruns mode. +func TestReconcileWithMultiplePVCWorkspaceWithAffinityAssistant(t *testing.T) { taskWithTwoWorkspaces := parse.MustParseV1Task(t, ` metadata: name: test-task-two-workspaces @@ -3287,6 +3286,11 @@ spec: readOnly: true - description: another workspace name: ws2 + steps: + - command: + - /mycmd + image: foo + name: simple-step `) taskRun := parse.MustParseV1TaskRun(t, ` metadata: @@ -3308,34 +3312,63 @@ spec: name: pvc2 `) - d := test.Data{ - Tasks: []*v1.Task{taskWithTwoWorkspaces}, - TaskRuns: []*v1.TaskRun{taskRun}, - ClusterTasks: nil, - } - testAssets, cancel := getTaskRunController(t, d) - defer cancel() - clients := testAssets.Clients - createServiceAccount(t, testAssets, "default", "foo") - _ = testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)) + tcs := []struct { + name string + cfgMap map[string]string + expectFailureReason string + }{{ + name: "multiple PVC based Workspaces in per workspace coschedule mode - failure", + cfgMap: map[string]string{ + "disable-affinity-assistant": "false", + "coschedule": "workspaces", + }, + expectFailureReason: podconvert.ReasonFailedValidation, + }, { + name: "multiple PVC based Workspaces in per pipelinerun coschedule mode - success", + cfgMap: map[string]string{ + "disable-affinity-assistant": "true", + "coschedule": "pipelineruns", + }, + }} - _, err := clients.Pipeline.TektonV1().Tasks(taskRun.Namespace).Get(testAssets.Ctx, taskWithTwoWorkspaces.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("krux: %v", err) - } + for _, tc := range tcs { + d := test.Data{ + Tasks: []*v1.Task{taskWithTwoWorkspaces}, + TaskRuns: []*v1.TaskRun{taskRun}, + ClusterTasks: nil, + ConfigMaps: []*corev1.ConfigMap{{ + ObjectMeta: metav1.ObjectMeta{Namespace: system.Namespace(), Name: config.GetFeatureFlagsConfigName()}, + Data: tc.cfgMap, + }}, + } + testAssets, cancel := getTaskRunController(t, d) + defer cancel() + clients := testAssets.Clients + createServiceAccount(t, testAssets, "default", "foo") + _ = testAssets.Controller.Reconciler.Reconcile(testAssets.Ctx, getRunName(taskRun)) - ttt, err := clients.Pipeline.TektonV1().TaskRuns(taskRun.Namespace).Get(testAssets.Ctx, taskRun.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("expected TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err) - } + _, err := clients.Pipeline.TektonV1().Tasks(taskRun.Namespace).Get(testAssets.Ctx, taskWithTwoWorkspaces.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to get task: %v", err) + } - if len(ttt.Status.Conditions) != 1 { - t.Errorf("unexpected number of Conditions, expected 1 Condition") - } + ttt, err := clients.Pipeline.TektonV1().TaskRuns(taskRun.Namespace).Get(testAssets.Ctx, taskRun.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("expected TaskRun %s to exist but instead got error when getting it: %v", taskRun.Name, err) + } + + if len(ttt.Status.Conditions) != 1 { + t.Errorf("unexpected number of Conditions, expected 1 Condition") + } - for _, cond := range ttt.Status.Conditions { - if cond.Reason != podconvert.ReasonFailedValidation { - t.Errorf("unexpected Reason on the Condition, expected: %s, got: %s", podconvert.ReasonFailedValidation, cond.Reason) + if tc.expectFailureReason != "" { + for _, cond := range ttt.Status.Conditions { + if cond.Reason != tc.expectFailureReason { + t.Errorf("unexpected Reason on the Condition, expected: %s, got: %s", tc.expectFailureReason, cond.Reason) + } + } + } else if ttt.IsFailure() { + t.Errorf("Unexpected unsuccessful condition for TaskRun %q:\n%#v", taskRun.Name, ttt.Status.Conditions) } } } diff --git a/test/affinity_assistant_test.go b/test/affinity_assistant_test.go index 56da797f83b..4c23676031b 100644 --- a/test/affinity_assistant_test.go +++ b/test/affinity_assistant_test.go @@ -22,10 +22,14 @@ package test import ( "context" "fmt" + "strconv" "testing" + "github.com/tektoncd/pipeline/pkg/apis/config" "github.com/tektoncd/pipeline/test/parse" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "knative.dev/pkg/system" knativetest "knative.dev/pkg/test" ) @@ -94,17 +98,8 @@ spec: pvcName := pvcList.Items[0].Name // validate PipelineRun pods sharing the same PVC are scheduled to the same node - podFoo, err := c.KubeClient.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%v-foo-pod", prName), metav1.GetOptions{}) - if err != nil { - t.Fatalf("Failed to get pod: %v-foo-pod, err: %v", prName, err) - } - podBar, err := c.KubeClient.CoreV1().Pods(namespace).Get(ctx, fmt.Sprintf("%v-bar-pod", prName), metav1.GetOptions{}) - if err != nil { - t.Fatalf("Failed to get pod: %v-bar-pod, err: %v", prName, err) - } - if podFoo.Spec.NodeName != podBar.Spec.NodeName { - t.Errorf("pods are not scheduled to same node: %v and %v", podFoo.Spec.NodeName, podBar.Spec.NodeName) - } + podNames := []string{fmt.Sprintf("%v-foo-pod", prName), fmt.Sprintf("%v-bar-pod", prName)} + validatePodAffinity(t, ctx, podNames, namespace, c.KubeClient) // delete PipelineRun if err = c.V1PipelineRunClient.Delete(ctx, prName, metav1.DeleteOptions{}); err != nil { @@ -120,3 +115,127 @@ spec: t.Fatalf("expect PVC %s to be in bounded state but got %v", pvcName, pvc.Status.Phase) } } + +// TestAffinityAssistant_PerPipelineRun tests that mounting multiple PVC based workspaces to a pipeline task is allowed and +// all the pods are scheduled to the same node in AffinityAssistantPerPipelineRuns mode +func TestAffinityAssistant_PerPipelineRun(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + c, namespace := setup(ctx, t) + knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf) + defer resetFeatureFlagAndCleanup(ctx, t, c, namespace) + + // update feature flag + configMapData := map[string]string{ + "coschedule": config.CoschedulePipelineRuns, + "disable-affinity-assistant": "true", + } + if err := updateConfigMap(ctx, c.KubeClient, system.Namespace(), config.GetFeatureFlagsConfigName(), configMapData); err != nil { + t.Fatal(err) + } + + prName := "my-pipelinerun" + pr := parse.MustParseV1PipelineRun(t, fmt.Sprintf(` +metadata: + name: %s + namespace: %s +spec: + pipelineSpec: + workspaces: + - name: my-workspace + - name: my-workspace2 + tasks: + - name: foo + workspaces: + - name: my-workspace + taskSpec: + steps: + - image: busybox + script: echo hello foo + - name: bar + workspaces: + - name: my-workspace2 + taskSpec: + steps: + - image: busybox + script: echo hello bar + - name: double-ws + workspaces: + - name: my-workspace + - name: my-workspace2 + taskSpec: + steps: + - image: busybox + script: echo double-ws + - name: no-ws + taskSpec: + steps: + - image: busybox + script: echo no-ws + workspaces: + - name: my-workspace + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi + - name: my-workspace2 + volumeClaimTemplate: + metadata: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 16Mi +`, prName, namespace)) + + // create PipelineRun + if _, err := c.V1PipelineRunClient.Create(ctx, pr, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create PipelineRun: %s", err) + } + + // wait for PipelineRun to finish + t.Logf("Waiting for PipelineRun in namespace %s to finish", namespace) + if err := WaitForPipelineRunState(ctx, c, prName, timeout, PipelineRunSucceed(prName), "PipelineRunSucceeded", v1Version); err != nil { + t.Errorf("Error waiting for PipelineRun to finish: %s", err) + } + + // validate PipelineRun pods sharing the same PVC are scheduled to the same node + podNames := []string{fmt.Sprintf("%v-foo-pod", prName), fmt.Sprintf("%v-bar-pod", prName), fmt.Sprintf("%v-double-ws-pod", prName), fmt.Sprintf("%v-no-ws-pod", prName)} + validatePodAffinity(t, ctx, podNames, namespace, c.KubeClient) +} + +// validatePodAffinity checks if all the given pods are scheduled to the same node +func validatePodAffinity(t *testing.T, ctx context.Context, podNames []string, namespace string, client kubernetes.Interface) { + t.Helper() + nodeName := "" + for _, podName := range podNames { + pod, err := client.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get pod: %v, err: %v", podName, err) + } + + if nodeName == "" { + nodeName = pod.Spec.NodeName + } else if pod.Spec.NodeName != nodeName { + t.Errorf("pods are not scheduled to the same node as expected %v vs %v", nodeName, pod.Spec.NodeName) + } + } +} + +func resetFeatureFlagAndCleanup(ctx context.Context, t *testing.T, c *clients, namespace string) { + t.Helper() + configMapData := map[string]string{ + "coschedule": config.DefaultCoschedule, + "disable-affinity-assistant": strconv.FormatBool(config.DefaultDisableAffinityAssistant), + } + if err := updateConfigMap(ctx, c.KubeClient, system.Namespace(), config.GetFeatureFlagsConfigName(), configMapData); err != nil { + t.Fatal(err) + } + tearDown(ctx, t, c, namespace) +}