diff --git a/CHANGELOG.md b/CHANGELOG.md index d65ce3ea82..1ef4c67f5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,11 +16,16 @@ to trigger updates for downstream dependencies like `Deployments`) are recommended to explicitly specify `immutable: true`. +- A warning is now emitted if an object has finalizers which might be blocking + deletion. (https://github.com/pulumi/pulumi-kubernetes/issues/1418) + ### Fixed - The `immutable` field is now respected for `ConfigMaps` when the provider is configured with `enableConfigMapMutable`. (https://github.com/pulumi/pulumi-kubernetes/issues/3181) +- Fixed a panic that could occur during deletion. (https://github.com/pulumi/pulumi-kubernetes/issues/3157) + ## 4.17.1 (August 16, 2024) ### Fixed diff --git a/provider/pkg/await/await.go b/provider/pkg/await/await.go index e0f9e97bdc..38ed2165f2 100644 --- a/provider/pkg/await/await.go +++ b/provider/pkg/await/await.go @@ -21,9 +21,12 @@ import ( "fmt" "os" "strings" + "time" fluxssa "github.com/fluxcd/pkg/ssa" "github.com/jonboulle/clockwork" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/internal" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/cluster" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/host" @@ -42,10 +45,8 @@ import ( apivalidation "k8s.io/apimachinery/pkg/api/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/dynamic" k8sopenapi "k8s.io/kubectl/pkg/util/openapi" @@ -81,6 +82,9 @@ type ProviderConfig struct { // explicit awaiters (for testing purposes) awaiters map[string]awaitSpec + // explicit condition (for testing) + condition condition.Satisfier + clock clockwork.Clock } @@ -791,129 +795,54 @@ func Deletion(c DeleteConfig) error { return err } - timeout := metadata.TimeoutDuration(c.Timeout, c.Inputs) - var timeoutSeconds int64 = 300 - if timeout != nil { - timeoutSeconds = int64(timeout.Seconds()) - } - listOpts := metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", c.Name).String(), - TimeoutSeconds: &timeoutSeconds, - } - - // Set up a watcher for the selected resource. - watcher, err := client.Watch(c.Context, listOpts) - if err != nil { - return nilIfGVKDeleted(err) - } - // delete the specified resource (using foreground cascading delete by default). deletePolicy := metadata.DeletionPropagation(c.Inputs) deleteOpts := metav1.DeleteOptions{ PropagationPolicy: &deletePolicy, } + err = client.Delete(c.Context, c.Name, deleteOpts) if err != nil { return nilIfGVKDeleted(err) } - // Wait until delete resolves as success or error. Note that the conditional is set up to log only - // if we don't have an entry for the resource type; in the event that we do, but the await logic - // is blank, simply do nothing instead of logging. - var waitErr error - id := fmt.Sprintf("%s/%s", c.Outputs.GetAPIVersion(), c.Outputs.GetKind()) - a := awaiters - if c.awaiters != nil { - a = c.awaiters + // Apply a timeout to the operation. + timeout := 10 * time.Minute + if t := metadata.TimeoutDuration(c.Timeout, c.Inputs); t != nil { + timeout = *t } - if awaiter, exists := a[id]; exists && awaiter.awaitDeletion != nil { - if metadata.SkipAwaitLogic(c.Inputs) { - logger.V(1).Infof("Skipping await logic for %v", c.Name) - } else { - timeout := metadata.TimeoutDuration(c.Timeout, c.Inputs) - waitErr = awaiter.awaitDeletion(deleteAwaitConfig{ - awaitConfig: awaitConfig{ - ctx: c.Context, - urn: c.URN, - initialAPIVersion: c.InitialAPIVersion, - clientSet: c.ClientSet, - currentOutputs: c.Outputs, - logger: c.DedupLogger, - timeout: timeout, - clusterVersion: c.ClusterVersion, - clock: c.clock, - }, - clientForResource: client, - }) - if waitErr != nil { - return waitErr - } - _ = clearStatus(c.Context, c.Host, c.URN) - } - } else { - for { - select { - case event, ok := <-watcher.ResultChan(): - if !ok { - deleted, obj := checkIfResourceDeleted(c.Context, c.Name, client) - if deleted { - _ = clearStatus(c.Context, c.Host, c.URN) - return nil - } + ctx, cancel := context.WithTimeout(c.Context, timeout) + defer cancel() - return &timeoutError{ - object: obj, - subErrors: []string{ - fmt.Sprintf("Timed out waiting for deletion of %s %q", id, c.Name), - }, - } - } + // Setup our Informer factory. + source := condition.NewDynamicSource(ctx, c.ClientSet, c.Outputs.GetNamespace()) - switch event.Type { - case watch.Deleted: - _ = clearStatus(c.Context, c.Host, c.URN) - return nil - case watch.Error: - deleted, obj := checkIfResourceDeleted(c.Context, c.Name, client) - if deleted { - _ = clearStatus(c.Context, c.Host, c.URN) - return nil - } - return &initializationError{ - object: obj, - subErrors: []string{apierrors.FromObject(event.Object).Error()}, - } - } - case <-c.Context.Done(): // Handle user cancellation during watch for deletion. - watcher.Stop() - logger.V(3).Infof("Received error deleting object %q: %#v", id, err) - deleted, obj := checkIfResourceDeleted(c.Context, c.Name, client) - if deleted { - _ = clearStatus(c.Context, c.Host, c.URN) - return nil - } - - return &cancellationError{ - object: obj, - } - } - } + // Determine the condition to wait for. + deleted, err := metadata.DeletedCondition(ctx, source, c.ClientSet, c.DedupLogger, c.Inputs, c.Outputs) + if err != nil { + return err + } + if c.condition != nil { + deleted = c.condition } - return nil -} + awaiter, err := internal.NewAwaiter( + internal.WithCondition(deleted), + internal.WithNamespace(c.Outputs.GetNamespace()), + internal.WithLogger(c.DedupLogger), + ) + if err != nil { + return err + } -// checkIfResourceDeleted attempts to get a k8s resource, and returns true if the resource is not found (was deleted). -// Return the resource if it still exists. -func checkIfResourceDeleted( - ctx context.Context, name string, client dynamic.ResourceInterface, -) (bool, *unstructured.Unstructured) { - obj, err := client.Get(ctx, name, metav1.GetOptions{}) - if err != nil && is404(err) { // In case of 404, the resource no longer exists, so return success. - return true, nil + // Wait until the delete condition resolves. + err = awaiter.Await(ctx) + if err != nil { + return err } + _ = clearStatus(c.Context, c.Host, c.URN) - return false, obj + return nil } // clearStatus will clear the `Info` column of the CLI of all statuses and messages. diff --git a/provider/pkg/await/await_test.go b/provider/pkg/await/await_test.go index ef34e3549b..40f4d61c55 100644 --- a/provider/pkg/await/await_test.go +++ b/provider/pkg/await/await_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/internal" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients/fake" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/cluster" @@ -653,7 +655,7 @@ func TestDeletion(t *testing.T) { // reactions suppressDeletion := func(t *testing.T, ctx testCtx, action kubetesting.Action) (bool, runtime.Object, error) { - return true, nil, nil + return true, ctx.config.Outputs, nil } cancelAwait := func(t *testing.T, ctx testCtx, action kubetesting.Action) (bool, runtime.Object, error) { @@ -663,23 +665,16 @@ func TestDeletion(t *testing.T) { // awaiters - awaitNoop := func(t *testing.T, ctx testCtx) deletionAwaiter { - return func(dac deleteAwaitConfig) error { - return nil - } + awaitNoop := func(t *testing.T, ctx testCtx) condition.Satisfier { + return condition.NewImmediate(ctx.config.DedupLogger, ctx.config.Inputs) } - awaitError := func(t *testing.T, ctx testCtx) deletionAwaiter { - return func(dac deleteAwaitConfig) error { - return serviceUnavailableErr - } + awaitError := func(t *testing.T, ctx testCtx) condition.Satisfier { + return condition.NewFailure(serviceUnavailableErr) } - awaitUnexpected := func(t *testing.T, ctx testCtx) deletionAwaiter { - return func(dac deleteAwaitConfig) error { - require.Fail(t, "Unexpected call to awaiter") - return nil - } + awaitUnexpected := func(t *testing.T, ctx testCtx) condition.Satisfier { + return condition.NewFailure(fmt.Errorf("unexpected call to await")) } // expectations @@ -705,13 +700,13 @@ func TestDeletion(t *testing.T) { } tests := []struct { - name string - client client - args args - expect []expectF - reaction []reactionF - watcher *watch.RaceFreeFakeWatcher - awaiter func(t *testing.T, ctx testCtx) deletionAwaiter + name string + client client + args args + expect []expectF + reaction []reactionF + watcher *watch.RaceFreeFakeWatcher + condition func(*testing.T, testCtx) condition.Satisfier }{ { name: "ServiceUnavailable", @@ -738,8 +733,8 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - awaiter: awaitNoop, - expect: []expectF{succeeded(), deleted("default", "foo")}, + condition: awaitNoop, + expect: []expectF{succeeded(), deleted("default", "foo")}, }, { name: "NonNamespaced", @@ -750,8 +745,8 @@ func TestDeletion(t *testing.T) { inputs: validClusterRoleUnstructured, outputs: validClusterRoleUnstructured, }, - awaiter: awaitNoop, - expect: []expectF{succeeded(), deleted("default", "foo")}, + condition: awaitNoop, + expect: []expectF{succeeded(), deleted("default", "foo")}, }, { name: "Gone", @@ -762,8 +757,8 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - awaiter: awaitUnexpected, - expect: []expectF{succeeded()}, + condition: awaitUnexpected, + expect: []expectF{succeeded()}, }, { name: "SkipAwait", @@ -775,7 +770,6 @@ func TestDeletion(t *testing.T) { outputs: validPodUnstructured, }, reaction: []reactionF{suppressDeletion}, // suppress deletion to safeguard that the built-in watcher is not used. - awaiter: awaitUnexpected, expect: []expectF{succeeded()}, }, { @@ -787,8 +781,8 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - awaiter: awaitError, - expect: []expectF{failed(serviceUnavailableErr)}, + condition: awaitError, + expect: []expectF{failed(serviceUnavailableErr)}, }, { name: "Deleted", @@ -799,64 +793,8 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - awaiter: nil, - expect: []expectF{succeeded(), deleted("default", "foo")}, - }, - { - name: "WatchTimeout", - args: args{ - resType: tokens.Type("kubernetes:core/v1:Pod"), - name: "foo", - objects: []runtime.Object{validPodUnstructured}, - inputs: validPodUnstructured, - outputs: validPodUnstructured, - }, - reaction: []reactionF{suppressDeletion}, - awaiter: nil, - watcher: withWatchClosed(watch.NewRaceFreeFake()), - expect: []expectF{failed(&timeoutError{})}, - }, - { - name: "WatchTimeoutWithRecovery", - args: args{ - resType: tokens.Type("kubernetes:core/v1:Pod"), - name: "foo", - objects: []runtime.Object{validPodUnstructured}, - inputs: validPodUnstructured, - outputs: validPodUnstructured, - }, - reaction: nil, - awaiter: nil, - watcher: withWatchClosed(watch.NewRaceFreeFake()), - expect: []expectF{succeeded()}, - }, - { - name: "WatchError", - args: args{ - resType: tokens.Type("kubernetes:core/v1:Pod"), - name: "foo", - objects: []runtime.Object{validPodUnstructured}, - inputs: validPodUnstructured, - outputs: validPodUnstructured, - }, - reaction: []reactionF{suppressDeletion}, - awaiter: nil, - watcher: withWatchError(watch.NewRaceFreeFake(), apierrors.NewTimeoutError("test", 30)), - expect: []expectF{failed(&initializationError{})}, - }, - { - name: "WatchErrorWithRecovery", - args: args{ - resType: tokens.Type("kubernetes:core/v1:Pod"), - name: "foo", - objects: []runtime.Object{validPodUnstructured}, - inputs: validPodUnstructured, - outputs: validPodUnstructured, - }, - reaction: nil, - awaiter: nil, - watcher: withWatchError(watch.NewRaceFreeFake(), apierrors.NewTimeoutError("test", 30)), - expect: []expectF{succeeded()}, + condition: awaitNoop, + expect: []expectF{succeeded(), deleted("default", "foo")}, }, { name: "Cancel", @@ -868,7 +806,6 @@ func TestDeletion(t *testing.T) { outputs: validPodUnstructured, }, reaction: []reactionF{cancelAwait, suppressDeletion}, - awaiter: nil, expect: []expectF{failed(&cancellationError{})}, }, { @@ -880,9 +817,9 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - reaction: []reactionF{cancelAwait}, - awaiter: nil, - expect: []expectF{succeeded()}, + reaction: []reactionF{cancelAwait}, + condition: awaitNoop, + expect: []expectF{succeeded()}, }, } @@ -945,11 +882,8 @@ func TestDeletion(t *testing.T) { return true, tt.watcher, nil }) } - if tt.awaiter != nil { - id := fmt.Sprintf("%s/%s", tt.args.inputs.GetAPIVersion(), tt.args.inputs.GetKind()) - config.awaiters[id] = awaitSpec{ - awaitDeletion: tt.awaiter(t, testCtx), - } + if tt.condition != nil { + config.condition = tt.condition(t, testCtx) } err = Deletion(config) for _, e := range tt.expect { @@ -1026,6 +960,16 @@ func Test_Watcher_Interface_Timeout(t *testing.T) { assert.Equal(t, "Timeout occurred polling for ''", err.Error()) } +func TestAwaiterInterfaceTimeout(t *testing.T) { + awaiter, err := internal.NewAwaiter(internal.WithCondition(condition.NewNever(nil))) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err = awaiter.Await(ctx) + _, isPartialErr := err.(PartialError) + assert.True(t, isPartialErr, "Timed out watcher should emit `await.PartialError`") +} + // -------------------------------------------------------------------------- // Helpers @@ -1047,17 +991,6 @@ func withGenerateName(obj *unstructured.Unstructured) *unstructured.Unstructured return copy } -func withWatchError(watcher *watch.RaceFreeFakeWatcher, err *apierrors.StatusError) *watch.RaceFreeFakeWatcher { - obj := err.Status() - watcher.Error(&obj) - return watcher -} - -func withWatchClosed(watcher *watch.RaceFreeFakeWatcher) *watch.RaceFreeFakeWatcher { - watcher.Stop() - return watcher -} - // -------------------------------------------------------------------------- // Mock implementations of Kubernetes client stuff. diff --git a/provider/pkg/await/awaiters.go b/provider/pkg/await/awaiters.go index 8cb9629b86..7de2bfd4d6 100644 --- a/provider/pkg/await/awaiters.go +++ b/provider/pkg/await/awaiters.go @@ -34,7 +34,6 @@ import ( storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/client-go/dynamic" ) // awaitConfig specifies on which conditions we are to consider a resource created and fully @@ -62,15 +61,9 @@ func (cac *awaitConfig) Clock() clockwork.Clock { return clockwork.NewRealClock() } -type deleteAwaitConfig struct { - awaitConfig - clientForResource dynamic.ResourceInterface -} - type ( - awaiter func(awaitConfig) error - readAwaiter func(awaitConfig) error - deletionAwaiter func(deleteAwaitConfig) error + awaiter func(awaitConfig) error + readAwaiter func(awaitConfig) error ) func (cac *awaitConfig) getTimeout(defaultSeconds int) time.Duration { @@ -133,9 +126,8 @@ const ( ) type awaitSpec struct { - await awaiter - awaitRead readAwaiter - awaitDeletion deletionAwaiter + await awaiter + awaitRead readAwaiter } var deploymentAwaiter = awaitSpec{ @@ -145,7 +137,6 @@ var deploymentAwaiter = awaitSpec{ awaitRead: func(c awaitConfig) error { return makeDeploymentInitAwaiter(c).Read() }, - awaitDeletion: untilAppsDeploymentDeleted, } var ingressAwaiter = awaitSpec{ @@ -160,7 +151,6 @@ var jobAwaiter = awaitSpec{ awaitRead: func(c awaitConfig) error { return makeJobInitAwaiter(c).Read() }, - awaitDeletion: untilBatchV1JobDeleted, } var statefulsetAwaiter = awaitSpec{ @@ -170,7 +160,6 @@ var statefulsetAwaiter = awaitSpec{ awaitRead: func(c awaitConfig) error { return makeStatefulSetInitAwaiter(c).Read() }, - awaitDeletion: untilAppsStatefulSetDeleted, } var daemonsetAwaiter = awaitSpec{ @@ -180,9 +169,6 @@ var daemonsetAwaiter = awaitSpec{ awaitRead: func(c awaitConfig) error { return newDaemonSetAwaiter(c).Read() }, - awaitDeletion: func(c deleteAwaitConfig) error { - return newDaemonSetAwaiter(c.awaitConfig).Delete() - }, } // NOTE: Some GVKs below are blank so that we can distinguish between resource types that we know @@ -202,9 +188,6 @@ var awaiters = map[string]awaitSpec{ batchV1Job: jobAwaiter, coreV1ConfigMap: { /* NONE */ }, coreV1LimitRange: { /* NONE */ }, - coreV1Namespace: { - awaitDeletion: untilCoreV1NamespaceDeleted, - }, coreV1PersistentVolume: { await: untilCoreV1PersistentVolumeInitialized, }, @@ -212,13 +195,11 @@ var awaiters = map[string]awaitSpec{ await: untilCoreV1PersistentVolumeClaimReady, }, coreV1Pod: { - await: awaitPodInit, - awaitRead: awaitPodRead, - awaitDeletion: untilCoreV1PodDeleted, + await: awaitPodInit, + awaitRead: awaitPodRead, }, coreV1ReplicationController: { - await: untilCoreV1ReplicationControllerInitialized, - awaitDeletion: untilCoreV1ReplicationControllerDeleted, + await: untilCoreV1ReplicationControllerInitialized, }, coreV1ResourceQuota: { await: untilCoreV1ResourceQuotaInitialized, @@ -264,156 +245,6 @@ var awaiters = map[string]awaitSpec{ // -------------------------------------------------------------------------- -// -------------------------------------------------------------------------- - -// apps/v1/Deployment, apps/v1beta1/Deployment, apps/v1beta2/Deployment, -// extensions/v1beta1/Deployment - -// -------------------------------------------------------------------------- - -func deploymentSpecReplicas(deployment *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(deployment.Object, "spec", "replicas") -} - -func untilAppsDeploymentDeleted(config deleteAwaitConfig) error { - // - // TODO(hausdorff): Should we scale pods to 0 and then delete instead? Kubernetes should allow us - // to check the status after deletion, but there is some possibility if there is a long-ish - // transient network partition (or something) that it could be successfully deleted and GC'd - // before we get to check it, which I think would require manual intervention. - // - statusReplicas := func(deployment *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(deployment.Object, "status", "replicas") - } - - deploymentMissing := func(d *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - logger.V(3).Infof("Received error deleting deployment '%s': %#v", d.GetName(), err) - return err - } - - currReplicas, _ := statusReplicas(d) - specReplicas, _ := deploymentSpecReplicas(d) - - return watcher.RetryableError( - fmt.Errorf("deployment %q still exists (%d / %d replicas exist)", d.GetName(), - currReplicas, specReplicas)) - } - - // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := config.getTimeout(600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(deploymentMissing, timeout) - if err != nil { - return err - } - - logger.V(3).Infof("Deployment '%s' deleted", config.currentOutputs.GetName()) - - return nil -} - -// -------------------------------------------------------------------------- - -// apps/v1/StatefulSet, apps/v1beta1/StatefulSet, apps/v1beta2/StatefulSet, - -// -------------------------------------------------------------------------- - -func untilAppsStatefulSetDeleted(config deleteAwaitConfig) error { - specReplicas := func(statefulset *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(statefulset.Object, "spec", "replicas") - } - statusReplicas := func(statefulset *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(statefulset.Object, "status", "replicas") - } - - statefulsetmissing := func(d *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - logger.V(3).Infof("Received error deleting StatefulSet %q: %#v", d.GetName(), err) - return err - } - - currReplicas, _ := statusReplicas(d) - specReplicas, _ := specReplicas(d) - - return watcher.RetryableError( - fmt.Errorf("StatefulSet %q still exists (%d / %d replicas exist)", d.GetName(), - currReplicas, specReplicas)) - } - - // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := config.getTimeout(600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(statefulsetmissing, timeout) - if err != nil { - return err - } - - logger.V(3).Infof("StatefulSet %q deleted", config.currentOutputs.GetName()) - - return nil -} - -// -------------------------------------------------------------------------- - -// batch/v1/Job - -// -------------------------------------------------------------------------- - -func untilBatchV1JobDeleted(config deleteAwaitConfig) error { - jobMissingOrKilled := func(pod *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - return err - } - - e := fmt.Errorf("job %q still exists", pod.GetName()) - return watcher.RetryableError(e) - } - - timeout := config.getTimeout(300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(jobMissingOrKilled, timeout) -} - -// -------------------------------------------------------------------------- - -// core/v1/Namespace - -// -------------------------------------------------------------------------- - -func untilCoreV1NamespaceDeleted(config deleteAwaitConfig) error { - namespaceMissingOrKilled := func(ns *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - logger.V(3).Infof("Received error deleting namespace %q: %#v", - ns.GetName(), err) - return err - } - - statusPhase, _, _ := unstructured.NestedString(ns.Object, "status", "phase") - logger.V(3).Infof("Namespace %q status received: %#v", ns.GetName(), statusPhase) - if statusPhase == "" { - return nil - } - - return watcher.RetryableError(fmt.Errorf("namespace %q still exists (%v)", - ns.GetName(), statusPhase)) - } - - timeout := config.getTimeout(300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(namespaceMissingOrKilled, timeout) -} - -// -------------------------------------------------------------------------- - // core/v1/PersistentVolume // -------------------------------------------------------------------------- @@ -504,32 +335,6 @@ func pvcBindMode( // -------------------------------------------------------------------------- -// core/v1/Pod - -// -------------------------------------------------------------------------- - -// TODO(lblackstone): unify the function signatures across awaiters -func untilCoreV1PodDeleted(config deleteAwaitConfig) error { - podMissingOrKilled := func(pod *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - return err - } - - statusPhase, _ := openapi.Pluck(pod.Object, "status", "phase") - logger.V(3).Infof("Current state of pod %q: %#v", pod.GetName(), statusPhase) - e := fmt.Errorf("pod %q still exists (%v)", pod.GetName(), statusPhase) - return watcher.RetryableError(e) - } - - timeout := config.getTimeout(300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(podMissingOrKilled, timeout) -} - -// -------------------------------------------------------------------------- - // core/v1/ReplicationController // -------------------------------------------------------------------------- @@ -571,46 +376,6 @@ func untilCoreV1ReplicationControllerInitialized(c awaitConfig) error { return nil } -func untilCoreV1ReplicationControllerDeleted(config deleteAwaitConfig) error { - // - // TODO(hausdorff): Should we scale pods to 0 and then delete instead? Kubernetes should allow us - // to check the status after deletion, but there is some possibility if there is a long-ish - // transient network partition (or something) that it could be successfully deleted and GC'd - // before we get to check it, which I think would require manual intervention. - // - statusReplicas := func(rc *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(rc.Object, "status", "replicas") - } - - rcMissing := func(rc *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - logger.V(3).Infof("Received error deleting ReplicationController %q: %#v", rc.GetName(), err) - return err - } - - currReplicas, _ := statusReplicas(rc) - specReplicas, _ := deploymentSpecReplicas(rc) - - return watcher.RetryableError( - fmt.Errorf("ReplicationController %q still exists (%d / %d replicas exist)", - rc.GetName(), currReplicas, specReplicas)) - } - - // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := config.getTimeout(600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(rcMissing, timeout) - if err != nil { - return err - } - - logger.V(3).Infof("ReplicationController %q deleted", config.currentOutputs.GetName()) - - return nil -} - // -------------------------------------------------------------------------- // core/v1/ResourceQuota diff --git a/provider/pkg/await/condition/condition.go b/provider/pkg/await/condition/condition.go new file mode 100644 index 0000000000..0c57e009b2 --- /dev/null +++ b/provider/pkg/await/condition/condition.go @@ -0,0 +1,65 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "fmt" + "io" + + "github.com/pulumi/pulumi/sdk/v3/go/common/diag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// Satisfier is an Observer which evaluates the observed object against some +// criteria. +type Satisfier interface { + Observer + + // Satisfied returns true when the criteria is met. + Satisfied() (bool, error) + + // Object returns the last-known state of the object being observed. + Object() *unstructured.Unstructured +} + +// logger allows injecting custom log behavior. +type logger interface { + Log(diag.Severity, string) + LogStatus(diag.Severity, string) +} + +// logbuf logs messages to an io.Writter. +type logbuf struct{ w io.Writer } + +func (l logbuf) Log(sev diag.Severity, msg string) { + fmt.Fprintln(l.w, sev, msg) +} + +func (l logbuf) LogStatus(sev diag.Severity, msg string) { + l.Log(sev, msg) +} + +// objectGetter allows injecting custom client behavior for fetching objects +// from the cluster. +type objectGetter interface { + Get( + ctx context.Context, + name string, + options metav1.GetOptions, + subresources ...string, + ) (*unstructured.Unstructured, error) +} diff --git a/provider/pkg/await/condition/deleted.go b/provider/pkg/await/condition/deleted.go new file mode 100644 index 0000000000..0af8d6c9fa --- /dev/null +++ b/provider/pkg/await/condition/deleted.go @@ -0,0 +1,154 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "fmt" + "strings" + "sync" + "sync/atomic" + + "github.com/pulumi/pulumi/sdk/v3/go/common/diag" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" +) + +var _ Satisfier = (*Deleted)(nil) + +// Deleted condition succeeds when GET on a resource 404s or when a Deleted +// event is received for the resource. +type Deleted struct { + observer *ObjectObserver + ctx context.Context + logger logger + deleted atomic.Bool + getter objectGetter +} + +// NewDeleted constructs a new Deleted condition. +func NewDeleted( + ctx context.Context, + source Source, + getter objectGetter, + logger logger, + obj *unstructured.Unstructured, +) (*Deleted, error) { + dc := &Deleted{ + ctx: ctx, + observer: NewObjectObserver(ctx, source, obj), + logger: logger, + getter: getter, + } + return dc, nil +} + +// Range establishes an Informer and confirms the object still existsr. +// If a Deleted event isn't Observed by the time the underlying Observer is +// exhausted, we attempt a final lookup on the cluster to be absolutely sure it +// still exists. +func (dc *Deleted) Range(yield func(watch.Event) bool) { + // Start listening to events before we check if the resource has already + // been deleted. This avoids races where the object is deleted in-between + // the 404 check and this watch. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + dc.observer.Range(yield) + }() + + dc.refreshClusterState() + if dc.deleted.Load() { + // Already deleted, nothing more to do. Our informer will get cleaned up + // when its context is canceled. + return + } + + wg.Wait() + + if dc.deleted.Load() { + // Nothing more to do. + return + } + + // Attempt one last lookup if the object still exists. (This is legacy + // behavior that might be unnecessary since we're using Informers instead of + // Watches now.) + dc.refreshClusterState() + if dc.deleted.Load() { + return + } + + // Let the user know we might be blocked if the object has finalizers. + // https://github.com/pulumi/pulumi-kubernetes/issues/1418 + finalizers := dc.Object().GetFinalizers() + dc.logger.Log(diag.Warning, + fmt.Sprintf("finalizers might be preventing deletion (%s)", strings.Join(finalizers, ", ")), + ) +} + +// Observe watches for Deleted events. +func (dc *Deleted) Observe(e watch.Event) error { + if e.Type == watch.Deleted { + dc.deleted.Store(true) + } + return dc.observer.Observe(e) +} + +// Satisfied returns true if a Deleted event was Observed. Otherwise a status +// message will be logged, if available. +func (dc *Deleted) Satisfied() (bool, error) { + if dc.deleted.Load() { + return true, nil + } + + uns := dc.Object() + r, _ := status.Compute(uns) + if r.Message != "" { + dc.logger.LogStatus(diag.Info, r.Message) + } + + return false, nil +} + +// Object returns the last-known state of the object we're deleting. +func (dc *Deleted) Object() *unstructured.Unstructured { + return dc.observer.Object() +} + +// refreshClusterState performs a GET against the cluster and updates state to +// reflect whether the object still exists or not. +func (dc *Deleted) refreshClusterState() { + // Our context might be closed, but we still want to issue this request + // even if we're shutting down. + ctx := context.WithoutCancel(dc.ctx) + _, err := dc.getter.Get(ctx, dc.Object().GetName(), metav1.GetOptions{}) + if err == nil { + // Still exists. + dc.deleted.Store(false) + return + } + if k8serrors.IsNotFound(err) { + dc.deleted.Store(true) + } else { + dc.logger.LogStatus(diag.Warning, + "unexpected error while checking cluster state: "+err.Error(), + ) + } +} diff --git a/provider/pkg/await/condition/deleted_test.go b/provider/pkg/await/condition/deleted_test.go new file mode 100644 index 0000000000..ddc3fa4583 --- /dev/null +++ b/provider/pkg/await/condition/deleted_test.go @@ -0,0 +1,194 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +var pod = &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]any{ + "name": "foo", + "namespace": "default", + }, + "spec": map[string]any{ + "containers": []any{ + map[string]any{ + "name": "foo", + "image": "nginx", + }, + }, + }, + }, +} + +type get404 struct{} + +func (get404) Get(ctx context.Context, name string, opts metav1.GetOptions, sub ...string) (*unstructured.Unstructured, error) { + return nil, k8serrors.NewNotFound(schema.GroupResource{}, name) +} + +type get503 struct{} + +func (get503) Get(ctx context.Context, name string, opts metav1.GetOptions, sub ...string) (*unstructured.Unstructured, error) { + return nil, k8serrors.NewServiceUnavailable("boom") +} + +type get200 struct{ obj *unstructured.Unstructured } + +func (g *get200) Get(context.Context, string, metav1.GetOptions, ...string) (*unstructured.Unstructured, error) { + return g.obj, nil +} + +type getsequence struct { + getters []objectGetter + idx int +} + +func (g *getsequence) Get(ctx context.Context, name string, opts metav1.GetOptions, sub ...string) (*unstructured.Unstructured, error) { + defer func() { g.idx++ }() + return g.getters[g.idx].Get(ctx, name, opts, sub...) +} + +func TestDeleted(t *testing.T) { + stdout := logbuf{os.Stdout} + + t.Run("already deleted", func(t *testing.T) { + ctx := context.Background() + getter := get404{} + + cond, err := NewDeleted(ctx, Static(nil), getter, stdout, pod) + assert.NoError(t, err) + + cond.Range(nil) + + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.True(t, done) + }) + + t.Run("deleted during watch", func(t *testing.T) { + ctx := context.Background() + + getter := &get200{pod} + source := Static(make(chan watch.Event, 1)) + + cond, err := NewDeleted(ctx, source, getter, stdout, pod) + assert.NoError(t, err) + + seen := make(chan struct{}) + go cond.Range(func(e watch.Event) bool { + err := cond.Observe(e) + assert.NoError(t, err) + seen <- struct{}{} + return true + }) + + source <- watch.Event{Type: watch.Modified, Object: pod} + <-seen + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.False(t, done) + + source <- watch.Event{Type: watch.Deleted, Object: pod} + <-seen + done, err = cond.Satisfied() + assert.NoError(t, err) + assert.True(t, done) + }) + + t.Run("times out", func(t *testing.T) { + getter := &get200{pod} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + cond, err := NewDeleted(ctx, Static(nil), getter, stdout, pod) + assert.NoError(t, err) + + cond.Range(nil) + + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.False(t, done) + }) + + t.Run("times out with finalizers", func(t *testing.T) { + getter := &get200{pod} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + podWithFinalizers := pod.DeepCopy() + podWithFinalizers.SetFinalizers([]string{"stuck"}) + + buf := &strings.Builder{} + cond, err := NewDeleted(ctx, Static(nil), getter, logbuf{buf}, podWithFinalizers) + assert.NoError(t, err) + + cond.Range(nil) + + assert.Contains(t, buf.String(), "finalizers might be preventing deletion") + }) + + // TODO: It's questionable whether we still need to test this behavior. I + // suspect this stems from earlier error handling code around our watch + // logic, which is largely obviated by our use of informers now. In other + // words, we needed this when we weren't handling the sort of watch errors + // Informers handle automatically. + t.Run("times out with recovery", func(t *testing.T) { + getter := &getsequence{[]objectGetter{&get200{pod}, get404{}}, 0} + + ctx, cancel := context.WithCancel(context.Background()) + cond, err := NewDeleted(ctx, Static(nil), getter, stdout, pod) + assert.NoError(t, err) + + cancel() + cond.Range(nil) + + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.True(t, done) + }) + + t.Run("unexpected error", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + buf := &strings.Builder{} + cond, err := NewDeleted(ctx, Static(nil), get503{}, logbuf{buf}, pod) + assert.NoError(t, err) + + cancel() + cond.Range(nil) + + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.False(t, done) + assert.Contains(t, buf.String(), "boom") + }) +} diff --git a/provider/pkg/await/condition/doc.go b/provider/pkg/await/condition/doc.go new file mode 100644 index 0000000000..dd5e42b1dd --- /dev/null +++ b/provider/pkg/await/condition/doc.go @@ -0,0 +1,17 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package condition contains interfaces and primitives for use with our await +// logic. +package condition diff --git a/provider/pkg/await/condition/immediate.go b/provider/pkg/await/condition/immediate.go new file mode 100644 index 0000000000..a1bf35ae29 --- /dev/null +++ b/provider/pkg/await/condition/immediate.go @@ -0,0 +1,171 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "sync/atomic" + + "github.com/pulumi/pulumi/sdk/v3/go/common/diag" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" +) + +var ( + _ Satisfier = Immediate{} + _ Satisfier = (*Never)(nil) + _ Satisfier = (*On)(nil) + _ Satisfier = (*Stopped)(nil) + _ Satisfier = (*Failure)(nil) +) + +// Immediate is a no-op condition which is always satisfied. This is primarily +// used for skip-await behavior and testing. +type Immediate struct { + logger logger + obj *unstructured.Unstructured +} + +// NewImmediate creates a new Immediate condition. +func NewImmediate(logger logger, obj *unstructured.Unstructured) Immediate { + return Immediate{logger: logger, obj: obj} +} + +// Range is a no-op for Immediately conditions. +func (Immediate) Range(func(watch.Event) bool) {} + +// Satisfied always returns true for Immediately conditions. +func (i Immediate) Satisfied() (bool, error) { + if i.logger != nil { + i.logger.LogStatus(diag.Info, "Skipping await logic") + } + return true, nil +} + +// Object returns the observer's underlying object. +func (i Immediate) Object() *unstructured.Unstructured { + return i.obj +} + +// Observe is a no-op for Immediately conditions. +func (Immediate) Observe(watch.Event) error { return nil } + +// Never is a no-op condition which is never satisfied. This is primarily +// useful for tests. +type Never struct { + Immediate +} + +// Satisfied always returns false for Never conditions. +func (n Never) Satisfied() (bool, error) { + return false, nil +} + +// NewNever creates a new Never condition. +func NewNever(obj *unstructured.Unstructured) *Never { + return &Never{Immediate: NewImmediate(nil, obj)} +} + +// On is satisfied when it observes a specific event. +type On struct { + observer *ObjectObserver + want watch.Event + satisfied atomic.Bool +} + +// NewOn creates a new On condition. +func NewOn( + ctx context.Context, + source Source, + obj *unstructured.Unstructured, + want watch.Event, +) *On { + oo := NewObjectObserver(ctx, source, obj) + return &On{want: want, observer: oo} +} + +// Observe checks whether the observed event is the one we want. +func (o *On) Observe(e watch.Event) error { + err := o.observer.Observe(e) + if e == o.want { + o.satisfied.Store(true) + } + return err +} + +// Object returns the observer's underlying object. +func (o *On) Object() *unstructured.Unstructured { + return o.observer.Object() +} + +// Range iterates over the underlying observer. +func (o *On) Range(yield func(watch.Event) bool) { + o.observer.Range(yield) +} + +// Satisfied returns true if the expected event has been Observed. +func (o *On) Satisfied() (bool, error) { + return o.satisfied.Load(), nil +} + +// Stopped is satisfied after its underlying Observer has been exhausted. This +// is primarily useful for testing behavior which occurs on shutdown. +type Stopped struct { + observer Immediate + stopped atomic.Bool +} + +// NewStopped creates a new Stopped condition. +func NewStopped(logger logger, obj *unstructured.Unstructured) *Stopped { + return &Stopped{observer: NewImmediate(logger, obj)} +} + +// Observe invokes the underlying Observer. +func (s *Stopped) Observe(e watch.Event) error { + return s.observer.Observe(e) +} + +// Object returns the observer's underlying object. +func (s *Stopped) Object() *unstructured.Unstructured { + return s.observer.Object() +} + +// Range iterates over the underlying Observer and satisfies the condition. +func (s *Stopped) Range(yield func(watch.Event) bool) { + s.observer.Range(yield) + s.stopped.Store(true) +} + +// Satisfied returns true if the underlying Observer has been fully iterated over. +func (s *Stopped) Satisfied() (bool, error) { + return s.stopped.Load(), nil +} + +// Failure is a no-op condition which raises an error when it is checked. This +// is primarily useful for testing. +type Failure struct { + Immediate + err error +} + +// NewFailure creates a new Failure condition. +func NewFailure(err error) Satisfier { + return &Failure{err: err} +} + +// Satisfied raises the given error. +func (f *Failure) Satisfied() (bool, error) { + return false, f.err +} diff --git a/provider/pkg/await/condition/observer.go b/provider/pkg/await/condition/observer.go new file mode 100644 index 0000000000..1090ba3a9c --- /dev/null +++ b/provider/pkg/await/condition/observer.go @@ -0,0 +1,181 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "sync" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +var ( + _ Observer = (*ObjectObserver)(nil) + _ observer = observer{} +) + +// Observer acts on a watch.Event Source. Range is responsible for filtering +// events to only those relevant to the Observer, and Observe optionally +// updates the Observer's state. +type Observer interface { + // Range iterates over all events visible to the Observer. The caller is + // responsible for invoking Observe as part of the provided callback. Range + // can be used to customize setup and teardown behavior if the Observer + // wraps another Observer. + Range(func(watch.Event) bool) + + // Observe handles events and can optionally update the Observer's state. + // This should be invoked by the caller and not during Range. + Observe(watch.Event) error +} + +// ObjectObserver observes the given resource and keeps track of its last-known +// state. +type ObjectObserver struct { + mu sync.Mutex + ctx context.Context + obj *unstructured.Unstructured + observer Observer +} + +// NewObjectObserver creates a new ObjectObserver that tracks changes to the +// provided object +func NewObjectObserver( + ctx context.Context, + source Source, + obj *unstructured.Unstructured, +) *ObjectObserver { + return &ObjectObserver{ + ctx: ctx, + obj: obj, + observer: NewObserver(ctx, + source, + obj.GroupVersionKind(), + func(u *unstructured.Unstructured) bool { + return obj.GetName() == u.GetName() + }, + ), + } +} + +// Object returns the last-known state of the observed object. +func (oo *ObjectObserver) Object() *unstructured.Unstructured { + oo.mu.Lock() + defer oo.mu.Unlock() + return oo.obj +} + +// Observe updates the Observer's state with the observed object. +func (oo *ObjectObserver) Observe(e watch.Event) error { + oo.mu.Lock() + defer oo.mu.Unlock() + obj, _ := e.Object.(*unstructured.Unstructured) + oo.obj = obj + return nil +} + +// Range is an iterator over events visible to the Observer. +func (oo *ObjectObserver) Range(yield func(watch.Event) bool) { + oo.observer.Range(yield) +} + +// NewChildObserver creates a new ChildObserver subscribed to children of the +// owner with the given GVK. +func NewChildObserver( + ctx context.Context, + source Source, + owner *unstructured.Unstructured, + gvk schema.GroupVersionKind, +) Observer { + return NewObserver(ctx, + source, + gvk, + func(obj *unstructured.Unstructured) bool { + return isOwnedBy(obj, owner) + }, + ) +} + +// observer provides base functionality for filtering an event stream based on +// a criteria. +type observer struct { + ctx context.Context + source Source + gvk schema.GroupVersionKind + keep func(*unstructured.Unstructured) bool +} + +// NewObserver returns a new Observer with a watch.Event channel configured for +// the given GVK and filtered according to the given "keep" function. +func NewObserver( + ctx context.Context, + source Source, + gvk schema.GroupVersionKind, + keep func(*unstructured.Unstructured) bool, +) Observer { + return &observer{ + ctx: ctx, + source: source, + gvk: gvk, + keep: keep, + } +} + +// Range is an iterator over events visible to the Observer. Yielded events are +// guaranteed to have the type *unstructured.Unstructured. +func (o *observer) Range(yield func(watch.Event) bool) { + events, err := o.source.Start(o.ctx, o.gvk) + if err != nil { + return + } + + for { + select { + case <-o.ctx.Done(): + return + case e, ok := <-events: + if !ok { + return // Closed channel. + } + // Ignore events not matching our "keep" filter. + obj, ok := e.Object.(*unstructured.Unstructured) + if !ok || !o.keep(obj) { + continue + } + if !yield(e) { + return // Done iterating. + } + } + } +} + +// Observe is a no-op because the base Observer is stateless. +func (*observer) Observe(watch.Event) error { return nil } + +// TODO: Move this to metadata so we can share it. +func isOwnedBy(obj, possibleOwner *unstructured.Unstructured) bool { + if possibleOwner == nil { + return false + } + owners := obj.GetOwnerReferences() + for _, owner := range owners { + if owner.UID == possibleOwner.GetUID() { + return true + } + } + return false +} diff --git a/provider/pkg/await/condition/observer_test.go b/provider/pkg/await/condition/observer_test.go new file mode 100644 index 0000000000..d5c0c82a4c --- /dev/null +++ b/provider/pkg/await/condition/observer_test.go @@ -0,0 +1,146 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +func TestObserver(t *testing.T) { + ctx := context.Background() + + t.Run("filter", func(t *testing.T) { + source := Static(make(chan watch.Event)) + gvk := schema.GroupVersionKind{} + o := NewObserver(ctx, source, gvk, func(obj *unstructured.Unstructured) bool { + i, _, _ := unstructured.NestedInt64(obj.Object, "n") + // Filter to even events so we should only see 2. + return i%2 == 0 + }) + + go func() { + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(1)}}} + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(2)}}} + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(3)}}} + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(5)}}} + close(source) + }() + + seen := int64(0) + o.Range(func(e watch.Event) bool { + i, _, _ := unstructured.NestedInt64(e.Object.(*unstructured.Unstructured).Object, "n") + seen += i + return true + }) + + assert.Equal(t, int64(2), seen) + }) + + t.Run("terminated", func(t *testing.T) { + source := Static(make(chan watch.Event)) + gvk := schema.GroupVersionKind{} + o := NewObserver(ctx, source, gvk, func(obj *unstructured.Unstructured) bool { + return true + }) + + go func() { + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(1)}}} + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(2)}}} + }() + + // We should only see the first object and then terminate early. + seen := int64(0) + o.Range(func(e watch.Event) bool { + i, _, _ := unstructured.NestedInt64(e.Object.(*unstructured.Unstructured).Object, "n") + seen += i + return false + }) + + assert.Equal(t, int64(1), seen) + }) + + t.Run("canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + + source := Static(make(chan watch.Event)) + gvk := schema.GroupVersionKind{} + o := NewObserver(ctx, source, gvk, func(obj *unstructured.Unstructured) bool { + return true + }) + + go func() { + cancel() + source <- watch.Event{Object: &unstructured.Unstructured{}} + }() + + seen := 0 + o.Range(func(e watch.Event) bool { + seen++ + return true + }) + + assert.Equal(t, 0, seen) + }) + + t.Run("children", func(t *testing.T) { + source := Static(make(chan watch.Event)) + gvk := schema.GroupVersionKind{} + + owner := &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "uid": "owner-uid", + }, + }, + } + + ownedBy := func(myUID, ownerUID string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "ownerReferences": []any{ + map[string]any{ + "uid": ownerUID, + }, + }, + "uid": myUID, + }, + }, + } + } + + o := NewChildObserver(ctx, source, owner, gvk) + + go func() { + source <- watch.Event{Object: ownedBy("other-uid", "other-owner-uid")} + source <- watch.Event{Object: ownedBy("expected-uid", "owner-uid")} + source <- watch.Event{Object: ownedBy("other-uid", "other-owner-uid")} + close(source) + }() + + o.Range(func(e watch.Event) bool { + obj, _ := e.Object.(*unstructured.Unstructured) + uid, _, _ := unstructured.NestedString(obj.Object, "metadata", "uid") + assert.Equal(t, "expected-uid", uid) + return true + }) + }) +} diff --git a/provider/pkg/await/condition/source.go b/provider/pkg/await/condition/source.go new file mode 100644 index 0000000000..ac9a262349 --- /dev/null +++ b/provider/pkg/await/condition/source.go @@ -0,0 +1,120 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "fmt" + "sync" + + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/informers" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic/dynamicinformer" +) + +var ( + _ Source = (Static)(nil) + _ Source = (*DynamicSource)(nil) +) + +// Source encapsulates logic responsible for establishing +// watch.Event channels. +type Source interface { + Start(context.Context, schema.GroupVersionKind) (<-chan watch.Event, error) +} + +// NewDynamicSource creates a new DynamicEventSource which will lazily +// establish a single dynamicinformer.DynamicSharedInformerFactory. Subsequent +// calls to Start will spawn informers.GenericInformer from that factory. +func NewDynamicSource( + ctx context.Context, + clientset *clients.DynamicClientSet, + namespace string, +) *DynamicSource { + stopper := make(chan struct{}) + factoryF := sync.OnceValue(func() dynamicinformer.DynamicSharedInformerFactory { + factory := informers.NewInformerFactory( + clientset, + informers.WithNamespace(namespace), + ) + // Stop the factory when our context closes. + go func() { + <-ctx.Done() + close(stopper) + factory.Shutdown() + }() + factory.Start(stopper) + return factory + }) + + return &DynamicSource{ + factory: factoryF, + stopper: stopper, + clientset: clientset, + } +} + +// DynamicSource establishes Informers against the cluster. +type DynamicSource struct { + factory func() dynamicinformer.DynamicSharedInformerFactory + stopper chan struct{} + clientset *clients.DynamicClientSet +} + +// Start establishes an Informer against the cluster for the given GVK. +func (des *DynamicSource) Start(_ context.Context, gvk schema.GroupVersionKind) (<-chan watch.Event, error) { + factory := des.factory() + events := make(chan watch.Event, 1) + + gvr, err := clients.GVRForGVK(des.clientset.RESTMapper, gvk) + if err != nil { + return nil, fmt.Errorf("getting GVK: %w", err) + } + + informer, err := informers.New( + factory, + informers.WithEventChannel(events), + informers.ForGVR(gvr), + ) + if err != nil { + return nil, fmt.Errorf("creating informer: %w", err) + } + i := informer.Informer() + + // client-go logs a warning if we've already started the informer for this + // GVK, but the method to check whether that's the case isn't part of the + // public interface. + if check, ok := i.(interface{ HasStarted() bool }); ok { + if check.HasStarted() { + return events, nil + } + } + go informer.Informer().Run(des.stopper) + factory.WaitForCacheSync(des.stopper) + + return events, nil +} + +// Static implements Source and allows a fixed event channel to be used as an +// Observer's Source. Static should not be shared across multiple Observers, +// instead give each Observer their own channel. +type Static chan watch.Event + +// Start returns a fixed event channel. +func (s Static) Start(context.Context, schema.GroupVersionKind) (<-chan watch.Event, error) { + return s, nil +} diff --git a/provider/pkg/await/daemonset.go b/provider/pkg/await/daemonset.go index d99171738d..dc17bbbbde 100644 --- a/provider/pkg/await/daemonset.go +++ b/provider/pkg/await/daemonset.go @@ -24,7 +24,6 @@ import ( "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/informers" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/kinds" - "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/openapi" "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil" logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" @@ -84,44 +83,6 @@ func (dsa *dsAwaiter) Await() error { return dsa.await(dsa.rolloutComplete) } -// Delete blocks until a DaemonSet has been deleted. Returns nil if the -// DaemonSet does not exist. -func (dsa *dsAwaiter) Delete() error { - // Perform a lookup in case the object has already been deleted - client, _, err := dsa.clients() - if err != nil { - return err - } - ds := dsa.config.currentOutputs - _, err = client.Get(dsa.config.ctx, ds.GetName(), metav1.GetOptions{}) - if is404(err) { - return nil - } - if err != nil { - logger.V(3).Infof("Received error deleting DaemonSet %q: %#v", ds.GetName(), err) - return err - } - - // Otherwise wait for a deletion event. - deleted := func() bool { - if dsa.deleted { - return true - } - misscheduled, _ := openapi.Pluck(dsa.ds.Object, "status", "numberMisscheduled") - dsa.config.logger.LogStatus( - diag.Info, - fmt.Sprintf( - "DaemonSet %q still exists (%v pods misscheduled)", - ds.GetName(), - misscheduled, - ), - ) - return false - } - - return dsa.await(deleted) -} - // Read returns the current state of the DaemonSet and returns an error if it // is not in a ready state. func (dsa *dsAwaiter) Read() error { diff --git a/provider/pkg/await/daemonset_test.go b/provider/pkg/await/daemonset_test.go index ecb239d6c5..969c67b24d 100644 --- a/provider/pkg/await/daemonset_test.go +++ b/provider/pkg/await/daemonset_test.go @@ -270,10 +270,11 @@ func TestAwaitDaemonSetDelete(t *testing.T) { } tests := []struct { - name string - given *unstructured.Unstructured - setup []func(*fake.SimpleDynamicClient, *unstructured.Unstructured) - events func(clockwork.FakeClock, *unstructured.Unstructured) <-chan watch.Event + name string + given *unstructured.Unstructured + setup []func(*fake.SimpleDynamicClient, *unstructured.Unstructured) + events func(*unstructured.Unstructured) <-chan watch.Event + timeout time.Duration wantErr string }{ @@ -290,8 +291,7 @@ func TestAwaitDaemonSetDelete(t *testing.T) { ensureExists, dontDeleteImmediately, }, - events: func(clock clockwork.FakeClock, ds *unstructured.Unstructured) <-chan watch.Event { - clock.Advance(1 * time.Minute) + events: func(ds *unstructured.Unstructured) <-chan watch.Event { c := make(chan watch.Event, 1) c <- watchDeletedEvent(ds) return c @@ -304,11 +304,11 @@ func TestAwaitDaemonSetDelete(t *testing.T) { ensureExists, dontDeleteImmediately, }, - events: func(clock clockwork.FakeClock, ds *unstructured.Unstructured) <-chan watch.Event { - clock.Advance(_defaultDaemonSetTimeout) + events: func(ds *unstructured.Unstructured) <-chan watch.Event { c := make(chan watch.Event, 1) return c }, + timeout: 1 * time.Second, wantErr: "timed out waiting for the condition", }, { @@ -320,20 +320,26 @@ func TestAwaitDaemonSetDelete(t *testing.T) { } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { - pconfig, clientset, clock := fakeProviderConfig(context.Background(), t) + t.Parallel() + pconfig, clientset, _ := fakeProviderConfig(context.Background(), t) config := DeleteConfig{ ProviderConfig: pconfig, Inputs: tt.given, Outputs: tt.given, Name: tt.given.GetName(), + Timeout: tt.timeout.Seconds(), } w := watch.NewRaceFreeFake() clientset.PrependWatchReactor("daemonsets", testcore.DefaultWatchReactor(w, nil)) + go func() { - clock.BlockUntil(1) // Timeout sleeper - for e := range tt.events(clock, tt.given) { + if tt.events == nil { + return + } + for e := range tt.events(tt.given) { w.Action(e.Type, e.Object) } }() diff --git a/provider/pkg/await/informers/informer.go b/provider/pkg/await/informers/informer.go index 6e8142dd63..0ed1c1f5b9 100644 --- a/provider/pkg/await/informers/informer.go +++ b/provider/pkg/await/informers/informer.go @@ -86,6 +86,10 @@ func ForGVR(gvr schema.GroupVersionResource) InformerOption { // the provided informerFactory for a particular GVR. // A GVR must be specified through either ForGVR option or one of the convenience // wrappers around it in this package. +// +// The primary difference between an Informer vs. a Watcher is that Informers +// handle re-connections automatically, so consumers don't need to handle watch +// errors. func New( informerFactory dynamicinformer.DynamicSharedInformerFactory, opts ...InformerOption, diff --git a/provider/pkg/await/internal/awaiter.go b/provider/pkg/await/internal/awaiter.go new file mode 100644 index 0000000000..d6dab2da56 --- /dev/null +++ b/provider/pkg/await/internal/awaiter.go @@ -0,0 +1,179 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi/sdk/v3/go/common/diag" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" +) + +// Awaiter orchestrates a condition Satisfier and optional Observers. +type Awaiter struct { + logger logger + namespace string + condition condition.Satisfier + observers []condition.Observer +} + +// NewAwaiter creates a new Awaiter with the given options. +func NewAwaiter(options ...awaiterOption) (*Awaiter, error) { + ea := &Awaiter{logger: stdout{}} + for _, opt := range options { + opt.apply(ea) + } + return ea, nil +} + +// Await blocks until the Condition is met or until the context is canceled. +// The operation's timeout should be applied to the provided Context. +func (aw *Awaiter) Await(ctx context.Context) error { + if aw.condition == nil { + return fmt.Errorf("missing condition") + } + + // Start all of our observers. They'll continue until they're canceled. + for _, o := range aw.observers { + go func(o condition.Observer) { + o.Range(func(e watch.Event) bool { + if err := o.Observe(e); err != nil { + aw.logger.LogStatus(diag.Warning, "observe error: "+err.Error()) + } + return true + }) + }(o) + } + + // Block until our condition is satisfied, or until our Context is canceled. + aw.condition.Range(func(e watch.Event) bool { + err := aw.condition.Observe(e) + if err != nil { + return false + } + if done, _ := aw.condition.Satisfied(); done { + return false + } + return true + }) + + // Re-evaluate our condition since its state might have changed during the + // iterator's teardown. + done, err := aw.condition.Satisfied() + if done { + return nil + } + + // Make sure the error we return includes the object's partial state. + obj := aw.condition.Object() + + if err != nil { + return errObject{error: err, object: obj} + } + + err = ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + // Preserve the default k8s "timed out waiting for the condition" error. + err = nil + } + return errObject{error: wait.ErrorInterrupted(err), object: obj} +} + +type awaiterOption interface { + apply(*Awaiter) +} + +type withConditionOption struct{ condition condition.Satisfier } + +func (o withConditionOption) apply(aw *Awaiter) { + aw.condition = o.condition +} + +// WithCondition sets the condition.Satisfier used by the Awaiter. This is +// required and determines when Await can conclude. +func WithCondition(c condition.Satisfier) awaiterOption { + return withConditionOption{c} +} + +type withObserversOption struct{ observers []condition.Observer } + +func (o withObserversOption) apply(aw *Awaiter) { + aw.observers = o.observers +} + +// WithObservers attaches optional condition.Observers to the Awaiter. This +// enables reporting informational messages while waiting for the condition to +// be met. +func WithObservers(obs ...condition.Observer) awaiterOption { + return withObserversOption{obs} +} + +type withNamespaceOption struct{ ns string } + +func (o withNamespaceOption) apply(aw *Awaiter) { + aw.namespace = o.ns +} + +// WithLogger uses the provided logger. If not provided stdout is used. +func WithLogger(l logger) awaiterOption { + return withLoggerOption{l} +} + +type withLoggerOption struct{ l logger } + +func (o withLoggerOption) apply(aw *Awaiter) { + aw.logger = o.l +} + +// WithNamespace configures the namespace used by Informers spawned by the +// Awaiter. +func WithNamespace(ns string) awaiterOption { + return withNamespaceOption{ns} +} + +// errObject wraps an error with object state. +type errObject struct { + error + object *unstructured.Unstructured +} + +func (e errObject) Object() *unstructured.Unstructured { + return e.object +} + +func (e errObject) Unwrap() error { + return e.error +} + +type logger interface { + Log(diag.Severity, string) + LogStatus(diag.Severity, string) +} + +// stdout logs messages to stdout. +type stdout struct{} + +func (stdout) Log(sev diag.Severity, msg string) { + _, _ = os.Stdout.WriteString(fmt.Sprintf("%s: %s\n", sev, msg)) +} +func (s stdout) LogStatus(sev diag.Severity, msg string) { + s.Log(sev, msg) +} diff --git a/provider/pkg/await/internal/awaiter_test.go b/provider/pkg/await/internal/awaiter_test.go new file mode 100644 index 0000000000..231e654f99 --- /dev/null +++ b/provider/pkg/await/internal/awaiter_test.go @@ -0,0 +1,204 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" +) + +func TestCancel(t *testing.T) { + t.Parallel() + + obj := &unstructured.Unstructured{Object: map[string]any{"foo": "bar"}} + + awaiter, err := NewAwaiter( + WithCondition(condition.NewNever(obj)), + WithObservers(condition.NewNever(obj)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = awaiter.Await(ctx) + + assert.Error(t, err) + assert.True(t, wait.Interrupted(err)) + + // The error should include the object's partial state. + partial, ok := err.(interface { + Object() *unstructured.Unstructured + }) + assert.True(t, ok) + assert.Equal(t, obj, partial.Object()) +} + +func TestCancelWithRecovery(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewStopped(nil, nil)), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = awaiter.Await(ctx) + + assert.NoError(t, err) +} + +func TestTimeout(t *testing.T) { + t.Parallel() + + obj := &unstructured.Unstructured{Object: map[string]any{"foo": "bar"}} + + awaiter, err := NewAwaiter( + WithCondition(condition.NewNever(obj)), + WithObservers(condition.NewNever(obj)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err = awaiter.Await(ctx) + + assert.Error(t, err) + assert.True(t, wait.Interrupted(err)) + + // The error should include the object's partial state. + partial, ok := err.(interface { + Object() *unstructured.Unstructured + }) + assert.True(t, ok) + assert.Equal(t, obj, partial.Object()) +} + +func TestImmediateSuccess(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewImmediate(nil, nil)), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err = awaiter.Await(ctx) + + assert.NoError(t, err) +} + +func TestObserverFailure(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewImmediate(nil, nil)), + WithObservers(condition.NewFailure(fmt.Errorf("condition should still succeed"))), + ) + require.NoError(t, err) + + err = awaiter.Await(context.Background()) + + assert.NoError(t, err) +} + +func TestConditionFailure(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewFailure(fmt.Errorf("expected"))), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + err = awaiter.Await(context.Background()) + + assert.ErrorContains(t, err, "expected") + assert.ErrorAs(t, err, &errObject{}) +} + +func TestEventualSuccess(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + obj := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]any{"name": "foo", "namespace": "default"}, + }, + } + want := watch.Event{Type: watch.Modified, Object: obj} + + events := make(chan watch.Event) + source := &staticEventSource{ + events: events, + } + + awaiter, err := NewAwaiter( + WithCondition( + condition.NewOn(ctx, source, obj, want), + ), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + done := make(chan error) + go func() { + done <- awaiter.Await(context.Background()) + }() + + events <- watch.Event{Type: watch.Added, Object: obj} + + select { + case <-done: + t.Fatal("await should not have finished") + case <-time.Tick(time.Second): + } + + events <- want + + select { + case err := <-done: + assert.NoError(t, err) + case <-time.Tick(time.Second): + t.Fatal("await should have finished") + } +} + +type staticEventSource struct { + events chan watch.Event +} + +func (ses *staticEventSource) Start(context.Context, schema.GroupVersionKind) (<-chan watch.Event, error) { + return ses.events, nil +} diff --git a/provider/pkg/await/util.go b/provider/pkg/await/util.go index 8ed84277ff..3f9a8e735f 100644 --- a/provider/pkg/await/util.go +++ b/provider/pkg/await/util.go @@ -15,10 +15,6 @@ package await import ( - "errors" - "net/http" - - k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -78,22 +74,6 @@ func watchDeletedEvent(obj runtime.Object) watch.Event { // -------------------------------------------------------------------------- -// Response helpers. - -// -------------------------------------------------------------------------- - -// is404 returns true if any error in err's tree is a Kubernetes StatusError -// with code 404. -func is404(err error) bool { - var statusErr *k8serrors.StatusError - if errors.As(err, &statusErr) { - return statusErr.ErrStatus.Code == http.StatusNotFound - } - return false -} - -// -------------------------------------------------------------------------- - // Ownership helpers. // -------------------------------------------------------------------------- diff --git a/provider/pkg/metadata/annotations.go b/provider/pkg/metadata/annotations.go index 95cec155f7..3131588306 100644 --- a/provider/pkg/metadata/annotations.go +++ b/provider/pkg/metadata/annotations.go @@ -19,6 +19,7 @@ import ( "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" ) const ( @@ -107,3 +108,7 @@ func isComputedValue(v any) bool { _, isComputed := v.(resource.Computed) return isComputed } + +type clientGetter interface { + ResourceClientForObject(*unstructured.Unstructured) (dynamic.ResourceInterface, error) +} diff --git a/provider/pkg/metadata/annotations_test.go b/provider/pkg/metadata/annotations_test.go index 6003bfa739..f16be08f30 100644 --- a/provider/pkg/metadata/annotations_test.go +++ b/provider/pkg/metadata/annotations_test.go @@ -55,15 +55,19 @@ func TestSetAnnotation(t *testing.T) { args args }{ {"set-with-no-annotation", args{ - obj: noAnnotationObj, key: "foo", value: "bar", expectSet: true, expectKey: "foo", expectValue: "bar"}}, + obj: noAnnotationObj, key: "foo", value: "bar", expectSet: true, expectKey: "foo", expectValue: "bar", + }}, {"set-with-existing-annotations", args{ - obj: existingAnnotationObj, key: "foo", value: "bar", expectSet: true, expectKey: "foo", expectValue: "bar"}}, + obj: existingAnnotationObj, key: "foo", value: "bar", expectSet: true, expectKey: "foo", expectValue: "bar", + }}, // Computed fields cannot be set, so SetAnnotation is a no-op. {"set-with-computed-metadata", args{ - obj: computedMetadataObj, key: "foo", value: "bar", expectSet: false}}, + obj: computedMetadataObj, key: "foo", value: "bar", expectSet: false, + }}, {"set-with-computed-annotation", args{ - obj: computedAnnotationObj, key: "foo", value: "bar", expectSet: false}}, + obj: computedAnnotationObj, key: "foo", value: "bar", expectSet: false, + }}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/provider/pkg/metadata/overrides.go b/provider/pkg/metadata/overrides.go index 9cac93d3c1..a00ce4a39b 100644 --- a/provider/pkg/metadata/overrides.go +++ b/provider/pkg/metadata/overrides.go @@ -15,10 +15,18 @@ package metadata import ( + "context" "strconv" "strings" "time" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/logging" + appsv1 "k8s.io/api/apps/v1" + appsv1beta1 "k8s.io/api/apps/v1beta1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -64,3 +72,52 @@ func DeletionPropagation(obj *unstructured.Unstructured) metav1.DeletionPropagat return metav1.DeletePropagationForeground } } + +// DeletedCondition inspects the object's annotations and returns a +// condition.Satisfier appropriate for using when awaiting deletion. +// +// The "inputs" parameter is the source of truth for user-provided annotations, +// but it is not guaranteed to be named. The "obj" parameter should be used for +// conditions. +func DeletedCondition( + ctx context.Context, + source condition.Source, + clientset clientGetter, + logger *logging.DedupLogger, + inputs *unstructured.Unstructured, + obj *unstructured.Unstructured, +) (condition.Satisfier, error) { + if IsAnnotationTrue(inputs, AnnotationSkipAwait) && allowsSkipAwaitWithDelete(inputs) { + return condition.NewImmediate(logger, obj), nil + } + getter, err := clientset.ResourceClientForObject(obj) + if err != nil { + return nil, err + } + return condition.NewDeleted(ctx, source, getter, logger, obj) +} + +// allowsSkipDelete returns true for legacy types which support buggy skipAwait +// behavior during delete. +// See also: +// https://github.com/pulumi/pulumi-kubernetes/issues/1232 +// https://github.com/pulumi/pulumi-kubernetes/issues/3154 +func allowsSkipAwaitWithDelete(inputs *unstructured.Unstructured) bool { + switch inputs.GroupVersionKind() { + case corev1.SchemeGroupVersion.WithKind("Namespace"), + corev1.SchemeGroupVersion.WithKind("Pod"), + corev1.SchemeGroupVersion.WithKind("ReplicationController"), + appsv1.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1beta1.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1beta2.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1.SchemeGroupVersion.WithKind("Deployment"), + appsv1beta1.SchemeGroupVersion.WithKind("Deployment"), + appsv1beta2.SchemeGroupVersion.WithKind("Deployment"), + appsv1.SchemeGroupVersion.WithKind("StatefulSet"), + appsv1beta1.SchemeGroupVersion.WithKind("StatefulSet"), + appsv1beta2.SchemeGroupVersion.WithKind("StatefulSet"), + batchv1.SchemeGroupVersion.WithKind("Job"): + return true + } + return false +} diff --git a/provider/pkg/metadata/overrides_test.go b/provider/pkg/metadata/overrides_test.go index 09b35b9a79..0e336388c5 100644 --- a/provider/pkg/metadata/overrides_test.go +++ b/provider/pkg/metadata/overrides_test.go @@ -16,12 +16,17 @@ package metadata import ( + "context" "reflect" "testing" "time" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" ) func TestSkipAwaitLogic(t *testing.T) { @@ -127,3 +132,82 @@ func TestDeletionPropagation(t *testing.T) { }) } } + +func TestDeletedCondition(t *testing.T) { + tests := []struct { + name string + inputs *unstructured.Unstructured + obj *unstructured.Unstructured + want condition.Satisfier + }{ + { + name: "skipAwait=true doesn't affect generic resources", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "true", + }, + }, + }, + }, + want: &condition.Deleted{}, + }, + { + name: "skipAwait=true does affect legacy resources", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "true", + }, + }, + }, + }, + want: condition.Immediate{}, + }, + { + name: "skipAwait=false", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "false", + }, + }, + }, + }, + want: &condition.Deleted{}, + }, + { + name: "skipAwait unset", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{}, + }, + }, + want: &condition.Deleted{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + obj := tt.obj + if obj == nil { + obj = tt.inputs + } + condition, err := DeletedCondition(context.Background(), nil, noopClientGetter{}, nil, tt.inputs, obj) + require.NoError(t, err) + + assert.IsType(t, tt.want, condition) + }) + } +} + +type noopClientGetter struct{} + +func (noopClientGetter) ResourceClientForObject(*unstructured.Unstructured) (dynamic.ResourceInterface, error) { + return nil, nil +} diff --git a/tests/sdk/java/await_test.go b/tests/sdk/java/await_test.go index 580d6e8a2f..f26ba18c99 100644 --- a/tests/sdk/java/await_test.go +++ b/tests/sdk/java/await_test.go @@ -17,6 +17,7 @@ package test import ( "context" "testing" + "time" "github.com/pulumi/providertest/pulumitest" "github.com/pulumi/providertest/pulumitest/opttest" @@ -132,3 +133,36 @@ func TestAwaitServiceAccount(t *testing.T) { test.Up() test.Refresh() } + +func TestAwaitSkip(t *testing.T) { + t.Parallel() + + test := pulumitest.NewPulumiTest(t, + "testdata/await/skipawait", + opttest.SkipInstall(), + ) + t.Cleanup(func() { + test.Destroy() + }) + + start := time.Now() + _ = test.Up() + took := time.Since(start) + assert.Less(t, took, 2*time.Minute, "didn't skip pod's slow startup") + + start = time.Now() + _ = test.Refresh() + took = time.Since(start) + assert.Less(t, took, 2*time.Minute, "didn't skip pod's slow read") + + test.UpdateSource("testdata/await/skipawait/step2") + start = time.Now() + _ = test.Refresh() + took = time.Since(start) + assert.Less(t, took, 2*time.Minute, "didn't skip pod's slow update") + + start = time.Now() + _ = test.Destroy() + took = time.Since(start) + assert.Less(t, took, 2*time.Minute, "didn't skip config map's stuck delete") +} diff --git a/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml b/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml new file mode 100644 index 0000000000..e37031bf0a --- /dev/null +++ b/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml @@ -0,0 +1,33 @@ +name: skipawait +runtime: yaml +description: | + Tests the skipAwait annotation: + - A slow-to-start deployment tests create/update/read. + - Delete is tested by a namespace with a stuck finalizer. +resources: + stuck-namespace: + type: kubernetes:core/v1:Namespace + properties: + metadata: + finalizers: + - pulumi.com/stuck + annotations: + pulumi.com/skipAwait: "true" + + slow-pod: + type: kubernetes:core/v1:Pod + properties: + metadata: + annotations: + pulumi.com/skipAwait: "true" + spec: + containers: + - image: busybox + name: busybox + command: ["sleep", "infinity"] + readinessProbe: + exec: + command: + - ls + initialDelaySeconds: 600 # 10 minutes! + periodSeconds: 10 diff --git a/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml b/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml new file mode 100644 index 0000000000..3c17e3d817 --- /dev/null +++ b/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml @@ -0,0 +1,34 @@ +name: skipawait +runtime: yaml +description: | + Tests the skipAwait annotation: + - A slow-to-start deployment tests create/update/read. + - Delete is tested by a namespace with a stuck finalizer. +resources: + stuck-namespace: + type: kubernetes:core/v1:Namespace + properties: + metadata: + finalizers: + - pulumi.com/stuck + annotations: + pulumi.com/skipAwait: "true" + + slow-pod: + type: kubernetes:core/v1:Pod + properties: + metadata: + annotations: + foo: bar # Add an annotation to trigger an update. + pulumi.com/skipAwait: "true" + spec: + containers: + - image: busybox + name: busybox + command: ["sleep", "infinity"] + readinessProbe: + exec: + command: + - ls + initialDelaySeconds: 600 # 10 minutes! + periodSeconds: 10