From e0c1c870e0a9122159ce1f89cbc109aba87b5ec7 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Tue, 23 Jul 2024 14:55:53 -0700 Subject: [PATCH 01/15] Unify await logic for deletes --- CHANGELOG.md | 5 + provider/pkg/await/await.go | 142 ++++---------- provider/pkg/await/await_test.go | 138 +++---------- provider/pkg/await/condition/condition.go | 69 +++++++ provider/pkg/await/condition/deleted.go | 140 ++++++++++++++ provider/pkg/await/condition/deleted_test.go | 169 ++++++++++++++++ provider/pkg/await/condition/doc.go | 17 ++ provider/pkg/await/condition/immediate.go | 171 +++++++++++++++++ provider/pkg/await/condition/observer.go | 176 +++++++++++++++++ provider/pkg/await/condition/source.go | 120 ++++++++++++ provider/pkg/await/informers/informer.go | 4 + provider/pkg/await/internal/awaiter.go | 166 ++++++++++++++++ provider/pkg/await/internal/awaiter_test.go | 181 ++++++++++++++++++ provider/pkg/metadata/annotations.go | 27 +++ provider/pkg/metadata/annotations_test.go | 66 ++++++- tests/sdk/java/await_test.go | 31 +++ .../java/testdata/await/skipawait/Pulumi.yaml | 35 ++++ 17 files changed, 1438 insertions(+), 219 deletions(-) create mode 100644 provider/pkg/await/condition/condition.go create mode 100644 provider/pkg/await/condition/deleted.go create mode 100644 provider/pkg/await/condition/deleted_test.go create mode 100644 provider/pkg/await/condition/doc.go create mode 100644 provider/pkg/await/condition/immediate.go create mode 100644 provider/pkg/await/condition/observer.go create mode 100644 provider/pkg/await/condition/source.go create mode 100644 provider/pkg/await/internal/awaiter.go create mode 100644 provider/pkg/await/internal/awaiter_test.go create mode 100644 tests/sdk/java/testdata/await/skipawait/Pulumi.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index d65ce3ea82..1ef4c67f5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,11 +16,16 @@ to trigger updates for downstream dependencies like `Deployments`) are recommended to explicitly specify `immutable: true`. +- A warning is now emitted if an object has finalizers which might be blocking + deletion. (https://github.com/pulumi/pulumi-kubernetes/issues/1418) + ### Fixed - The `immutable` field is now respected for `ConfigMaps` when the provider is configured with `enableConfigMapMutable`. (https://github.com/pulumi/pulumi-kubernetes/issues/3181) +- Fixed a panic that could occur during deletion. (https://github.com/pulumi/pulumi-kubernetes/issues/3157) + ## 4.17.1 (August 16, 2024) ### Fixed diff --git a/provider/pkg/await/await.go b/provider/pkg/await/await.go index e0f9e97bdc..ee70a07d16 100644 --- a/provider/pkg/await/await.go +++ b/provider/pkg/await/await.go @@ -21,9 +21,12 @@ import ( "fmt" "os" "strings" + "time" fluxssa "github.com/fluxcd/pkg/ssa" "github.com/jonboulle/clockwork" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/internal" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/cluster" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/host" @@ -42,10 +45,8 @@ import ( apivalidation "k8s.io/apimachinery/pkg/api/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage/names" "k8s.io/client-go/dynamic" k8sopenapi "k8s.io/kubectl/pkg/util/openapi" @@ -81,6 +82,9 @@ type ProviderConfig struct { // explicit awaiters (for testing purposes) awaiters map[string]awaitSpec + // explicit condition (for testing) + condition condition.Satisfier + clock clockwork.Clock } @@ -791,129 +795,53 @@ func Deletion(c DeleteConfig) error { return err } - timeout := metadata.TimeoutDuration(c.Timeout, c.Inputs) - var timeoutSeconds int64 = 300 - if timeout != nil { - timeoutSeconds = int64(timeout.Seconds()) - } - listOpts := metav1.ListOptions{ - FieldSelector: fields.OneTermEqualSelector("metadata.name", c.Name).String(), - TimeoutSeconds: &timeoutSeconds, - } - - // Set up a watcher for the selected resource. - watcher, err := client.Watch(c.Context, listOpts) - if err != nil { - return nilIfGVKDeleted(err) - } - // delete the specified resource (using foreground cascading delete by default). deletePolicy := metadata.DeletionPropagation(c.Inputs) deleteOpts := metav1.DeleteOptions{ PropagationPolicy: &deletePolicy, } + err = client.Delete(c.Context, c.Name, deleteOpts) if err != nil { return nilIfGVKDeleted(err) } - // Wait until delete resolves as success or error. Note that the conditional is set up to log only - // if we don't have an entry for the resource type; in the event that we do, but the await logic - // is blank, simply do nothing instead of logging. - var waitErr error - id := fmt.Sprintf("%s/%s", c.Outputs.GetAPIVersion(), c.Outputs.GetKind()) - a := awaiters - if c.awaiters != nil { - a = c.awaiters + // Apply a timeout to the operation. + timeout := 10 * time.Minute + if t := metadata.TimeoutDuration(c.Timeout, c.Inputs); t != nil { + timeout = *t } - if awaiter, exists := a[id]; exists && awaiter.awaitDeletion != nil { - if metadata.SkipAwaitLogic(c.Inputs) { - logger.V(1).Infof("Skipping await logic for %v", c.Name) - } else { - timeout := metadata.TimeoutDuration(c.Timeout, c.Inputs) - waitErr = awaiter.awaitDeletion(deleteAwaitConfig{ - awaitConfig: awaitConfig{ - ctx: c.Context, - urn: c.URN, - initialAPIVersion: c.InitialAPIVersion, - clientSet: c.ClientSet, - currentOutputs: c.Outputs, - logger: c.DedupLogger, - timeout: timeout, - clusterVersion: c.ClusterVersion, - clock: c.clock, - }, - clientForResource: client, - }) - if waitErr != nil { - return waitErr - } - _ = clearStatus(c.Context, c.Host, c.URN) - } - } else { - for { - select { - case event, ok := <-watcher.ResultChan(): - if !ok { - deleted, obj := checkIfResourceDeleted(c.Context, c.Name, client) - if deleted { - _ = clearStatus(c.Context, c.Host, c.URN) - return nil - } + ctx, cancel := context.WithTimeout(c.Context, timeout) + defer cancel() - return &timeoutError{ - object: obj, - subErrors: []string{ - fmt.Sprintf("Timed out waiting for deletion of %s %q", id, c.Name), - }, - } - } + // Setup our Informer factory. + source := condition.NewDynamicSource(ctx, c.ClientSet, c.Outputs.GetNamespace()) - switch event.Type { - case watch.Deleted: - _ = clearStatus(c.Context, c.Host, c.URN) - return nil - case watch.Error: - deleted, obj := checkIfResourceDeleted(c.Context, c.Name, client) - if deleted { - _ = clearStatus(c.Context, c.Host, c.URN) - return nil - } - return &initializationError{ - object: obj, - subErrors: []string{apierrors.FromObject(event.Object).Error()}, - } - } - case <-c.Context.Done(): // Handle user cancellation during watch for deletion. - watcher.Stop() - logger.V(3).Infof("Received error deleting object %q: %#v", id, err) - deleted, obj := checkIfResourceDeleted(c.Context, c.Name, client) - if deleted { - _ = clearStatus(c.Context, c.Host, c.URN) - return nil - } - - return &cancellationError{ - object: obj, - } - } - } + // Determine the condition to wait for. + deleted, err := metadata.GetDeletedCondition(ctx, source, c.ClientSet, c.DedupLogger, c.Outputs) + if err != nil { + return err + } + if c.condition != nil { + deleted = c.condition } - return nil -} + awaiter, err := internal.NewAwaiter( + internal.WithCondition(deleted), + internal.WithNamespace(c.Outputs.GetNamespace()), + ) + if err != nil { + return err + } -// checkIfResourceDeleted attempts to get a k8s resource, and returns true if the resource is not found (was deleted). -// Return the resource if it still exists. -func checkIfResourceDeleted( - ctx context.Context, name string, client dynamic.ResourceInterface, -) (bool, *unstructured.Unstructured) { - obj, err := client.Get(ctx, name, metav1.GetOptions{}) - if err != nil && is404(err) { // In case of 404, the resource no longer exists, so return success. - return true, nil + // Wait until the delete condition resolves. + err = awaiter.Await(ctx) + if err != nil { + return err } + _ = clearStatus(c.Context, c.Host, c.URN) - return false, obj + return nil } // clearStatus will clear the `Info` column of the CLI of all statuses and messages. diff --git a/provider/pkg/await/await_test.go b/provider/pkg/await/await_test.go index ef34e3549b..91943b7218 100644 --- a/provider/pkg/await/await_test.go +++ b/provider/pkg/await/await_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients/fake" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/cluster" @@ -663,23 +664,16 @@ func TestDeletion(t *testing.T) { // awaiters - awaitNoop := func(t *testing.T, ctx testCtx) deletionAwaiter { - return func(dac deleteAwaitConfig) error { - return nil - } + awaitNoop := func(t *testing.T, ctx testCtx) condition.Satisfier { + return condition.NewImmediate(ctx.config.DedupLogger, ctx.config.Inputs) } - awaitError := func(t *testing.T, ctx testCtx) deletionAwaiter { - return func(dac deleteAwaitConfig) error { - return serviceUnavailableErr - } + awaitError := func(t *testing.T, ctx testCtx) condition.Satisfier { + return condition.NewFailure(serviceUnavailableErr) } - awaitUnexpected := func(t *testing.T, ctx testCtx) deletionAwaiter { - return func(dac deleteAwaitConfig) error { - require.Fail(t, "Unexpected call to awaiter") - return nil - } + awaitUnexpected := func(t *testing.T, ctx testCtx) condition.Satisfier { + return condition.NewFailure(fmt.Errorf("unexpected call to await")) } // expectations @@ -705,13 +699,13 @@ func TestDeletion(t *testing.T) { } tests := []struct { - name string - client client - args args - expect []expectF - reaction []reactionF - watcher *watch.RaceFreeFakeWatcher - awaiter func(t *testing.T, ctx testCtx) deletionAwaiter + name string + client client + args args + expect []expectF + reaction []reactionF + watcher *watch.RaceFreeFakeWatcher + condition func(*testing.T, testCtx) condition.Satisfier }{ { name: "ServiceUnavailable", @@ -738,8 +732,8 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - awaiter: awaitNoop, - expect: []expectF{succeeded(), deleted("default", "foo")}, + condition: awaitNoop, + expect: []expectF{succeeded(), deleted("default", "foo")}, }, { name: "NonNamespaced", @@ -750,8 +744,8 @@ func TestDeletion(t *testing.T) { inputs: validClusterRoleUnstructured, outputs: validClusterRoleUnstructured, }, - awaiter: awaitNoop, - expect: []expectF{succeeded(), deleted("default", "foo")}, + condition: awaitNoop, + expect: []expectF{succeeded(), deleted("default", "foo")}, }, { name: "Gone", @@ -762,8 +756,8 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - awaiter: awaitUnexpected, - expect: []expectF{succeeded()}, + condition: awaitUnexpected, + expect: []expectF{succeeded()}, }, { name: "SkipAwait", @@ -772,10 +766,9 @@ func TestDeletion(t *testing.T) { name: "foo", objects: []runtime.Object{validPodUnstructured}, inputs: withSkipAwait(validPodUnstructured), - outputs: validPodUnstructured, + outputs: withSkipAwait(validPodUnstructured), }, reaction: []reactionF{suppressDeletion}, // suppress deletion to safeguard that the built-in watcher is not used. - awaiter: awaitUnexpected, expect: []expectF{succeeded()}, }, { @@ -787,8 +780,8 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - awaiter: awaitError, - expect: []expectF{failed(serviceUnavailableErr)}, + condition: awaitError, + expect: []expectF{failed(serviceUnavailableErr)}, }, { name: "Deleted", @@ -799,64 +792,8 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - awaiter: nil, - expect: []expectF{succeeded(), deleted("default", "foo")}, - }, - { - name: "WatchTimeout", - args: args{ - resType: tokens.Type("kubernetes:core/v1:Pod"), - name: "foo", - objects: []runtime.Object{validPodUnstructured}, - inputs: validPodUnstructured, - outputs: validPodUnstructured, - }, - reaction: []reactionF{suppressDeletion}, - awaiter: nil, - watcher: withWatchClosed(watch.NewRaceFreeFake()), - expect: []expectF{failed(&timeoutError{})}, - }, - { - name: "WatchTimeoutWithRecovery", - args: args{ - resType: tokens.Type("kubernetes:core/v1:Pod"), - name: "foo", - objects: []runtime.Object{validPodUnstructured}, - inputs: validPodUnstructured, - outputs: validPodUnstructured, - }, - reaction: nil, - awaiter: nil, - watcher: withWatchClosed(watch.NewRaceFreeFake()), - expect: []expectF{succeeded()}, - }, - { - name: "WatchError", - args: args{ - resType: tokens.Type("kubernetes:core/v1:Pod"), - name: "foo", - objects: []runtime.Object{validPodUnstructured}, - inputs: validPodUnstructured, - outputs: validPodUnstructured, - }, - reaction: []reactionF{suppressDeletion}, - awaiter: nil, - watcher: withWatchError(watch.NewRaceFreeFake(), apierrors.NewTimeoutError("test", 30)), - expect: []expectF{failed(&initializationError{})}, - }, - { - name: "WatchErrorWithRecovery", - args: args{ - resType: tokens.Type("kubernetes:core/v1:Pod"), - name: "foo", - objects: []runtime.Object{validPodUnstructured}, - inputs: validPodUnstructured, - outputs: validPodUnstructured, - }, - reaction: nil, - awaiter: nil, - watcher: withWatchError(watch.NewRaceFreeFake(), apierrors.NewTimeoutError("test", 30)), - expect: []expectF{succeeded()}, + condition: awaitNoop, + expect: []expectF{succeeded(), deleted("default", "foo")}, }, { name: "Cancel", @@ -868,7 +805,6 @@ func TestDeletion(t *testing.T) { outputs: validPodUnstructured, }, reaction: []reactionF{cancelAwait, suppressDeletion}, - awaiter: nil, expect: []expectF{failed(&cancellationError{})}, }, { @@ -880,9 +816,9 @@ func TestDeletion(t *testing.T) { inputs: validPodUnstructured, outputs: validPodUnstructured, }, - reaction: []reactionF{cancelAwait}, - awaiter: nil, - expect: []expectF{succeeded()}, + reaction: []reactionF{cancelAwait}, + condition: awaitNoop, + expect: []expectF{succeeded()}, }, } @@ -945,11 +881,8 @@ func TestDeletion(t *testing.T) { return true, tt.watcher, nil }) } - if tt.awaiter != nil { - id := fmt.Sprintf("%s/%s", tt.args.inputs.GetAPIVersion(), tt.args.inputs.GetKind()) - config.awaiters[id] = awaitSpec{ - awaitDeletion: tt.awaiter(t, testCtx), - } + if tt.condition != nil { + config.condition = tt.condition(t, testCtx) } err = Deletion(config) for _, e := range tt.expect { @@ -1047,17 +980,6 @@ func withGenerateName(obj *unstructured.Unstructured) *unstructured.Unstructured return copy } -func withWatchError(watcher *watch.RaceFreeFakeWatcher, err *apierrors.StatusError) *watch.RaceFreeFakeWatcher { - obj := err.Status() - watcher.Error(&obj) - return watcher -} - -func withWatchClosed(watcher *watch.RaceFreeFakeWatcher) *watch.RaceFreeFakeWatcher { - watcher.Stop() - return watcher -} - // -------------------------------------------------------------------------- // Mock implementations of Kubernetes client stuff. diff --git a/provider/pkg/await/condition/condition.go b/provider/pkg/await/condition/condition.go new file mode 100644 index 0000000000..67a22f0aa5 --- /dev/null +++ b/provider/pkg/await/condition/condition.go @@ -0,0 +1,69 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "fmt" + "io" + "os" + + checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// Satisfier is an Observer which evaluates the observed object against some +// criteria. +type Satisfier interface { + Observer + + // Satisfied returns true when the criteria is met. + Satisfied() (bool, error) + + // Object returns the last-known state of the object being observed. + Object() *unstructured.Unstructured +} + +// logger allows injecting custom log behavior. +type logger interface { + LogMessage(checkerlog.Message) +} + +// logbuf logs messages to an io.Writter. +type logbuf struct{ w io.Writer } + +func (l logbuf) LogMessage(m checkerlog.Message) { + fmt.Fprint(l.w, m.String()+"\n") +} + +// stdout logs messages to stdout. +type stdout struct{} + +func (stdout) LogMessage(m checkerlog.Message) { + l := logbuf{os.Stdout} + l.LogMessage(m) +} + +// objectGetter allows injecting custom client behavior for fetching objects +// from the cluster. +type objectGetter interface { + Get( + ctx context.Context, + name string, + options metav1.GetOptions, + subresources ...string, + ) (*unstructured.Unstructured, error) +} diff --git a/provider/pkg/await/condition/deleted.go b/provider/pkg/await/condition/deleted.go new file mode 100644 index 0000000000..16c8201ad8 --- /dev/null +++ b/provider/pkg/await/condition/deleted.go @@ -0,0 +1,140 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "errors" + "fmt" + "net/http" + "strings" + "sync/atomic" + + checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/cli-utils/pkg/kstatus/status" +) + +var _ Satisfier = (*Deleted)(nil) + +// Deleted condition succeeds when GET on a resource 404s or when a Deleted +// event is received for the resource. +type Deleted struct { + observer *ObjectObserver + logger logger + deleted atomic.Bool + getter objectGetter +} + +// NewDeleted constructs a new Deleted condition. +func NewDeleted( + ctx context.Context, + source Source, + getter objectGetter, + logger logger, + obj *unstructured.Unstructured, +) (*Deleted, error) { + dc := &Deleted{ + observer: NewObjectObserver(ctx, source, obj), + logger: logger, + getter: getter, + } + return dc, nil +} + +// Range confirms the object exists before establishing an Informer. +// If a Deleted event isn't Observed by the time the underlying Observer is +// exhausted, we attempt a final lookup on the cluster to be absolutely sure it +// still exists. +func (dc *Deleted) Range(yield func(watch.Event) bool) { + dc.getClusterState() + if dc.deleted.Load() { + // Already deleted, nothing more to do. + return + } + + // Iterate over the underlying Observer's events. + dc.observer.Range(yield) + + if dc.deleted.Load() { + // Nothing more to do. + return + } + + // Attempt one last lookup if the object still exists. (This is legacy behavior + // that might be unnecessary since we're using Informers instead of + // Watches now.) + dc.getClusterState() + if dc.deleted.Load() { + return + } + + // Let the user know we might be blocked if the object has finalizers. + // https://github.com/pulumi/pulumi-kubernetes/issues/1418 + finalizers := dc.Object().GetFinalizers() + if len(finalizers) == 0 { + return + } + dc.logger.LogMessage(checkerlog.WarningMessage( + fmt.Sprintf("finalizers might be preventing deletion (%s)", strings.Join(finalizers, ", ")), + )) +} + +// Observe watches for Deleted events. +func (dc *Deleted) Observe(e watch.Event) error { + if e.Type == watch.Deleted { + dc.deleted.Store(true) + } + return dc.observer.Observe(e) +} + +// Satisfied returns true if a Deleted event was Observed. Otherwise a status +// message will be logged, if available. +func (dc *Deleted) Satisfied() (bool, error) { + if dc.deleted.Load() { + return true, nil + } + + uns := dc.Object() + r, _ := status.Compute(uns) + if r.Message != "" { + dc.logger.LogMessage(checkerlog.StatusMessage(r.Message)) + } + + return false, nil +} + +// Object returns the last-known state of the object we're deleting. +func (dc *Deleted) Object() *unstructured.Unstructured { + return dc.observer.Object() +} + +// getClusterState performs a GET against the cluster and updates state to +// reflect whether the object still exists or not. +func (dc *Deleted) getClusterState() { + _, err := dc.getter.Get(context.Background(), dc.Object().GetName(), metav1.GetOptions{}) + if err == nil { + // Still exists. + dc.deleted.Store(false) + return + } + var statusErr *k8serrors.StatusError + if errors.As(err, &statusErr) { + dc.deleted.Store(statusErr.ErrStatus.Code == http.StatusNotFound) + } +} diff --git a/provider/pkg/await/condition/deleted_test.go b/provider/pkg/await/condition/deleted_test.go new file mode 100644 index 0000000000..ca7c09ac25 --- /dev/null +++ b/provider/pkg/await/condition/deleted_test.go @@ -0,0 +1,169 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +var pod = &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]any{ + "name": "foo", + "namespace": "default", + }, + "spec": map[string]any{ + "containers": []any{ + map[string]any{ + "name": "foo", + "image": "nginx", + }, + }, + }, + }, +} + +type get404 struct{} + +func (get404) Get(ctx context.Context, name string, opts metav1.GetOptions, sub ...string) (*unstructured.Unstructured, error) { + return nil, k8serrors.NewNotFound(schema.GroupResource{}, name) +} + +type get200 struct{ obj *unstructured.Unstructured } + +func (g *get200) Get(context.Context, string, metav1.GetOptions, ...string) (*unstructured.Unstructured, error) { + return g.obj, nil +} + +type getsequence struct { + getters []objectGetter + idx int +} + +func (g *getsequence) Get(ctx context.Context, name string, opts metav1.GetOptions, sub ...string) (*unstructured.Unstructured, error) { + defer func() { g.idx++ }() + return g.getters[g.idx].Get(ctx, name, opts, sub...) +} + +func TestDeleted(t *testing.T) { + t.Run("already deleted", func(t *testing.T) { + ctx := context.Background() + getter := get404{} + + cond, err := NewDeleted(ctx, Static(nil), getter, stdout{}, pod) + assert.NoError(t, err) + + cond.Range(nil) + + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.True(t, done) + }) + + t.Run("deleted during watch", func(t *testing.T) { + ctx := context.Background() + + getter := &get200{pod} + source := Static(make(chan watch.Event, 1)) + + cond, err := NewDeleted(ctx, source, getter, stdout{}, pod) + assert.NoError(t, err) + + seen := make(chan struct{}) + go cond.Range(func(e watch.Event) bool { + err := cond.Observe(e) + assert.NoError(t, err) + seen <- struct{}{} + return true + }) + + source <- watch.Event{Type: watch.Modified, Object: pod} + <-seen + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.False(t, done) + + source <- watch.Event{Type: watch.Deleted, Object: pod} + <-seen + done, err = cond.Satisfied() + assert.NoError(t, err) + assert.True(t, done) + }) + + t.Run("times out", func(t *testing.T) { + getter := &get200{pod} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + cond, err := NewDeleted(ctx, Static(nil), getter, stdout{}, pod) + assert.NoError(t, err) + + cond.Range(nil) + + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.False(t, done) + }) + + t.Run("times out with finalizers", func(t *testing.T) { + getter := &get200{pod} + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + podWithFinalizers := pod.DeepCopy() + podWithFinalizers.SetFinalizers([]string{"stuck"}) + + buf := &strings.Builder{} + cond, err := NewDeleted(ctx, Static(nil), getter, logbuf{buf}, podWithFinalizers) + assert.NoError(t, err) + + cond.Range(nil) + + assert.Contains(t, buf.String(), "finalizers might be preventing deletion") + }) + + // TODO: It's questionable whether we still need to test this behavior. I + // suspect this stems from earlier error handling code around our watch + // logic, which is largely obviated by our use of informers now. In other + // words, we needed this when we weren't handling the sort of watch errors + // Informers handle automatically. + t.Run("times out with recovery", func(t *testing.T) { + getter := &getsequence{[]objectGetter{&get200{pod}, &get404{}}, 0} + + ctx, cancel := context.WithCancel(context.Background()) + cond, err := NewDeleted(ctx, Static(nil), getter, stdout{}, pod) + assert.NoError(t, err) + + cancel() + cond.Range(nil) + + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.True(t, done) + }) +} diff --git a/provider/pkg/await/condition/doc.go b/provider/pkg/await/condition/doc.go new file mode 100644 index 0000000000..dd5e42b1dd --- /dev/null +++ b/provider/pkg/await/condition/doc.go @@ -0,0 +1,17 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package condition contains interfaces and primitives for use with our await +// logic. +package condition diff --git a/provider/pkg/await/condition/immediate.go b/provider/pkg/await/condition/immediate.go new file mode 100644 index 0000000000..0e2649df25 --- /dev/null +++ b/provider/pkg/await/condition/immediate.go @@ -0,0 +1,171 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "sync/atomic" + + checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" +) + +var ( + _ Satisfier = Immediate{} + _ Satisfier = (*Never)(nil) + _ Satisfier = (*On)(nil) + _ Satisfier = (*Stopped)(nil) + _ Satisfier = (*Failure)(nil) +) + +// Immediate is a no-op condition which is always satisfied. This is primarily +// used for skip-await behavior and testing. +type Immediate struct { + logger logger + obj *unstructured.Unstructured +} + +// NewImmediate creates a new Immediate condition. +func NewImmediate(logger logger, obj *unstructured.Unstructured) Immediate { + return Immediate{logger: logger, obj: obj} +} + +// Range is a no-op for Immediately conditions. +func (Immediate) Range(func(watch.Event) bool) {} + +// Satisfied always returns true for Immediately conditions. +func (i Immediate) Satisfied() (bool, error) { + if i.logger != nil { + i.logger.LogMessage(checkerlog.StatusMessage("Skipping await logic")) + } + return true, nil +} + +// Object returns the observer's underlying object. +func (i Immediate) Object() *unstructured.Unstructured { + return i.obj +} + +// Observe is a no-op for Immediately conditions. +func (Immediate) Observe(watch.Event) error { return nil } + +// Never is a no-op condition which is never satisfied. This is primarily +// useful for tests. +type Never struct { + Immediate +} + +// Satisfied always returns false for Never conditions. +func (n Never) Satisfied() (bool, error) { + return false, nil +} + +// NewNever creates a new Never condition. +func NewNever(obj *unstructured.Unstructured) *Never { + return &Never{Immediate: NewImmediate(nil, obj)} +} + +// On is satisfied when it observes a specific event. +type On struct { + observer *ObjectObserver + want watch.Event + satisfied atomic.Bool +} + +// NewOn creates a new On condition. +func NewOn( + ctx context.Context, + source Source, + obj *unstructured.Unstructured, + want watch.Event, +) *On { + oo := NewObjectObserver(ctx, source, obj) + return &On{want: want, observer: oo} +} + +// Observe checks whether the observed event is the one we want. +func (o *On) Observe(e watch.Event) error { + err := o.observer.Observe(e) + if e == o.want { + o.satisfied.Store(true) + } + return err +} + +// Object returns the observer's underlying object. +func (o *On) Object() *unstructured.Unstructured { + return o.observer.Object() +} + +// Range iterates over the underlying observer. +func (o *On) Range(yield func(watch.Event) bool) { + o.observer.Range(yield) +} + +// Satisfied returns true if the expected event has been Observed. +func (o *On) Satisfied() (bool, error) { + return o.satisfied.Load(), nil +} + +// Stopped is satisfied after its underlying Observer has been exhausted. This +// is primarily useful for testing behavior which occurs on shutdown. +type Stopped struct { + observer Immediate + stopped atomic.Bool +} + +// NewStopped creates a new Stopped condition. +func NewStopped(logger logger, obj *unstructured.Unstructured) *Stopped { + return &Stopped{observer: NewImmediate(logger, obj)} +} + +// Observe invokes the underlying Observer. +func (s *Stopped) Observe(e watch.Event) error { + return s.observer.Observe(e) +} + +// Object returns the observer's underlying object. +func (s *Stopped) Object() *unstructured.Unstructured { + return s.observer.Object() +} + +// Range iterates over the underlying Observer and satisfies the condition. +func (s *Stopped) Range(yield func(watch.Event) bool) { + s.observer.Range(yield) + s.stopped.Store(true) +} + +// Satisfied returns true if the underlying Observer has been fully iterated over. +func (s *Stopped) Satisfied() (bool, error) { + return s.stopped.Load(), nil +} + +// Failure is a no-op condition which raises an error when it is checked. This +// is primarily useful for testing. +type Failure struct { + Immediate + err error +} + +// NewFailure creates a new Failure condition. +func NewFailure(err error) Satisfier { + return &Failure{err: err} +} + +// Satisfied raises the given error. +func (f *Failure) Satisfied() (bool, error) { + return false, f.err +} diff --git a/provider/pkg/await/condition/observer.go b/provider/pkg/await/condition/observer.go new file mode 100644 index 0000000000..befe197626 --- /dev/null +++ b/provider/pkg/await/condition/observer.go @@ -0,0 +1,176 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "sync" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +var ( + _ Observer = (*ObjectObserver)(nil) + _ observer = observer{} +) + +// Observer acts on a watch.Event Source. Range is responsible for filtering +// events to only those relevant to the Observer, and Observe optionally +// updates the Observer's state. +type Observer interface { + // Range iterates over all events visible to the Observer. The caller is + // responsible for invoking Observe as part of the provided callback. Range + // can be used to customize setup and teardown behavior if the Observer + // wraps another Observer. + Range(func(watch.Event) bool) + + // Observe handles events and can optionally update the Observer's state. + // This should be invoked by the caller and not during Range. + Observe(watch.Event) error +} + +// ObjectObserver observes the given resource and keeps track of its last-known +// state. +type ObjectObserver struct { + mu sync.Mutex + ctx context.Context + obj *unstructured.Unstructured + observer Observer +} + +// NewObjectObserver creates a new ObjectObserver that tracks changes to the +// provided object +func NewObjectObserver( + ctx context.Context, + source Source, + obj *unstructured.Unstructured, +) *ObjectObserver { + return &ObjectObserver{ + ctx: ctx, + obj: obj, + observer: NewObserver(ctx, + source, + obj.GroupVersionKind(), + func(u *unstructured.Unstructured) bool { + return obj.GetName() == u.GetName() + }, + ), + } +} + +// Object returns the last-known state of the observed object. +func (oo *ObjectObserver) Object() *unstructured.Unstructured { + oo.mu.Lock() + defer oo.mu.Unlock() + return oo.obj +} + +// Observe updates the Observer's state with the observed object. +func (oo *ObjectObserver) Observe(e watch.Event) error { + oo.mu.Lock() + defer oo.mu.Unlock() + obj, _ := e.Object.(*unstructured.Unstructured) + oo.obj = obj + return nil +} + +// Range is an iterator over events visible to the Observer. +func (oo *ObjectObserver) Range(yield func(watch.Event) bool) { + oo.observer.Range(yield) +} + +// NewChildObserver creates a new ChildObserver subscribed to children of the +// owner with the given GVK. +func NewChildObserver( + ctx context.Context, + source Source, + owner *unstructured.Unstructured, + gvk schema.GroupVersionKind, +) Observer { + return NewObserver(ctx, + source, + gvk, + func(obj *unstructured.Unstructured) bool { + return isOwnedBy(obj, owner) + }, + ) +} + +// observer provides base functionality for filtering an event stream based on +// a criteria. +type observer struct { + ctx context.Context + source Source + gvk schema.GroupVersionKind + keep func(*unstructured.Unstructured) bool +} + +// NewObserver returns a new Observer with a watch.Event channel configured for +// the given GVK and filtered according to the given "keep" function. +func NewObserver( + ctx context.Context, + source Source, + gvk schema.GroupVersionKind, + keep func(*unstructured.Unstructured) bool, +) Observer { + return &observer{ + ctx: ctx, + source: source, + gvk: gvk, + keep: keep, + } +} + +// Range is an iterator over events visible to the Observer. Yielded events are +// guaranteed to have the type *unstructured.Unstructured. +func (o *observer) Range(yield func(watch.Event) bool) { + events, err := o.source.Start(o.ctx, o.gvk) + if err != nil { + return + } + + for { + select { + case <-o.ctx.Done(): + return + case e := <-events: + // Ignore events not matching our "keep" filter. + obj, ok := e.Object.(*unstructured.Unstructured) + if !ok || !o.keep(obj) { + continue + } + yield(e) + } + } +} + +// Observe is a no-op because the base Observer is stateless. +func (*observer) Observe(watch.Event) error { return nil } + +// TODO: Move this to metadata so we can share it. +func isOwnedBy(obj, possibleOwner *unstructured.Unstructured) bool { + if possibleOwner == nil { + return false + } + owners := obj.GetOwnerReferences() + for _, owner := range owners { + if owner.UID == possibleOwner.GetUID() { + return true + } + } + return false +} diff --git a/provider/pkg/await/condition/source.go b/provider/pkg/await/condition/source.go new file mode 100644 index 0000000000..ac9a262349 --- /dev/null +++ b/provider/pkg/await/condition/source.go @@ -0,0 +1,120 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "fmt" + "sync" + + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/informers" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic/dynamicinformer" +) + +var ( + _ Source = (Static)(nil) + _ Source = (*DynamicSource)(nil) +) + +// Source encapsulates logic responsible for establishing +// watch.Event channels. +type Source interface { + Start(context.Context, schema.GroupVersionKind) (<-chan watch.Event, error) +} + +// NewDynamicSource creates a new DynamicEventSource which will lazily +// establish a single dynamicinformer.DynamicSharedInformerFactory. Subsequent +// calls to Start will spawn informers.GenericInformer from that factory. +func NewDynamicSource( + ctx context.Context, + clientset *clients.DynamicClientSet, + namespace string, +) *DynamicSource { + stopper := make(chan struct{}) + factoryF := sync.OnceValue(func() dynamicinformer.DynamicSharedInformerFactory { + factory := informers.NewInformerFactory( + clientset, + informers.WithNamespace(namespace), + ) + // Stop the factory when our context closes. + go func() { + <-ctx.Done() + close(stopper) + factory.Shutdown() + }() + factory.Start(stopper) + return factory + }) + + return &DynamicSource{ + factory: factoryF, + stopper: stopper, + clientset: clientset, + } +} + +// DynamicSource establishes Informers against the cluster. +type DynamicSource struct { + factory func() dynamicinformer.DynamicSharedInformerFactory + stopper chan struct{} + clientset *clients.DynamicClientSet +} + +// Start establishes an Informer against the cluster for the given GVK. +func (des *DynamicSource) Start(_ context.Context, gvk schema.GroupVersionKind) (<-chan watch.Event, error) { + factory := des.factory() + events := make(chan watch.Event, 1) + + gvr, err := clients.GVRForGVK(des.clientset.RESTMapper, gvk) + if err != nil { + return nil, fmt.Errorf("getting GVK: %w", err) + } + + informer, err := informers.New( + factory, + informers.WithEventChannel(events), + informers.ForGVR(gvr), + ) + if err != nil { + return nil, fmt.Errorf("creating informer: %w", err) + } + i := informer.Informer() + + // client-go logs a warning if we've already started the informer for this + // GVK, but the method to check whether that's the case isn't part of the + // public interface. + if check, ok := i.(interface{ HasStarted() bool }); ok { + if check.HasStarted() { + return events, nil + } + } + go informer.Informer().Run(des.stopper) + factory.WaitForCacheSync(des.stopper) + + return events, nil +} + +// Static implements Source and allows a fixed event channel to be used as an +// Observer's Source. Static should not be shared across multiple Observers, +// instead give each Observer their own channel. +type Static chan watch.Event + +// Start returns a fixed event channel. +func (s Static) Start(context.Context, schema.GroupVersionKind) (<-chan watch.Event, error) { + return s, nil +} diff --git a/provider/pkg/await/informers/informer.go b/provider/pkg/await/informers/informer.go index 6e8142dd63..0ed1c1f5b9 100644 --- a/provider/pkg/await/informers/informer.go +++ b/provider/pkg/await/informers/informer.go @@ -86,6 +86,10 @@ func ForGVR(gvr schema.GroupVersionResource) InformerOption { // the provided informerFactory for a particular GVR. // A GVR must be specified through either ForGVR option or one of the convenience // wrappers around it in this package. +// +// The primary difference between an Informer vs. a Watcher is that Informers +// handle re-connections automatically, so consumers don't need to handle watch +// errors. func New( informerFactory dynamicinformer.DynamicSharedInformerFactory, opts ...InformerOption, diff --git a/provider/pkg/await/internal/awaiter.go b/provider/pkg/await/internal/awaiter.go new file mode 100644 index 0000000000..c57d386f45 --- /dev/null +++ b/provider/pkg/await/internal/awaiter.go @@ -0,0 +1,166 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" +) + +// Awaiter orchestrates a condition Satisfier and optional Observers. +type Awaiter struct { + namespace string + condition condition.Satisfier + observers []condition.Observer +} + +// NewAwaiter creates a new Awaiter with the given options. +func NewAwaiter(options ...awaiterOption) (*Awaiter, error) { + ea := &Awaiter{} + for _, opt := range options { + opt.apply(ea) + } + return ea, nil +} + +// Await blocks until the Condition is met or until the context is closed. +// The operation's timeout should be applied to the provided Context. +func (ea *Awaiter) Await(ctx context.Context) (err error) { + if ea.condition == nil { + return fmt.Errorf("missing condition") + } + + // check channel is signaled when we should re-evaluate our condition. + check := make(chan struct{}) + + // We'll spawn a goroutine for our condition and each observer. We wait for + // them to tear down before returning because the condition might change + // during that time. + wg := sync.WaitGroup{} + wg.Add(len(ea.observers) + 1) + go func() { + wg.Wait() + close(check) + }() + + // Start all of our observers. + observers := append([]condition.Observer{ea.condition}, ea.observers...) + for _, o := range observers { + go func(o condition.Observer) { + defer wg.Done() + o.Range(func(e watch.Event) bool { + _ = o.Observe(e) + // Re-evaluate our condition if we see an event for it. + if _, ok := o.(condition.Satisfier); ok { + check <- struct{}{} + } + return true + }) + }(o) + } + + // Before returning we attempt to re-evaluate the condition a final time, + // and we wrap our error with our object's last known state so it can be + // checkpointed. + defer func() { + if err == nil { + // Nothing to do. + return + } + // Make sure Observers are all done. + wg.Wait() + if done, _ := ea.condition.Satisfied(); done { + err = nil + } + // Wrap our error with our object's state. + if err != nil { + err = errObject{error: err, object: ea.condition.Object()} + } + }() + + for { + select { + case <-check: + done, err := ea.condition.Satisfied() + if done || err != nil { + return err + } + case <-ctx.Done(): + err := ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + err = fmt.Errorf("timed out waiting for the condition") + } + return wait.ErrorInterrupted(err) + } + } +} + +type awaiterOption interface { + apply(*Awaiter) +} + +type withConditionOption struct{ condition condition.Satisfier } + +func (o withConditionOption) apply(ea *Awaiter) { + ea.condition = o.condition +} + +// WithCondition sets the condition.Satisfier used by the Awaiter. This is +// required and determines when Await can conclude. +func WithCondition(c condition.Satisfier) awaiterOption { + return withConditionOption{c} +} + +type withObserversOption struct{ observers []condition.Observer } + +func (o withObserversOption) apply(ea *Awaiter) { + ea.observers = o.observers +} + +// WithObservers attaches optional condition.Observers to the Awaiter. This +// enables reporting informational messages while waiting for the condition to +// be met. +func WithObservers(obs ...condition.Observer) awaiterOption { + return withObserversOption{obs} +} + +type withNamespaceOption struct{ ns string } + +func (o withNamespaceOption) apply(ea *Awaiter) { + ea.namespace = o.ns +} + +// WithNamespace configures the namespace used by Informers spawned by the +// Awaiter. +func WithNamespace(ns string) awaiterOption { + return withNamespaceOption{ns} +} + +// errObject wraps an error with object state. +type errObject struct { + error + object *unstructured.Unstructured +} + +func (e errObject) Object() (*unstructured.Unstructured, error) { + return e.object, nil +} diff --git a/provider/pkg/await/internal/awaiter_test.go b/provider/pkg/await/internal/awaiter_test.go new file mode 100644 index 0000000000..f9d5f63bb5 --- /dev/null +++ b/provider/pkg/await/internal/awaiter_test.go @@ -0,0 +1,181 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package internal + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +func TestCancel(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewNever(nil)), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = awaiter.Await(ctx) + + assert.Error(t, err) +} + +func TestCancelWithRecovery(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewStopped(nil, nil)), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err = awaiter.Await(ctx) + + assert.NoError(t, err) +} + +func TestTimeout(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewNever(nil)), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err = awaiter.Await(ctx) + + assert.Error(t, err) +} + +func TestImmediateSuccess(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewImmediate(nil, nil)), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err = awaiter.Await(ctx) + + assert.NoError(t, err) +} + +func TestObserverFailure(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewImmediate(nil, nil)), + WithObservers(condition.NewFailure(fmt.Errorf("condition should still succeed"))), + ) + require.NoError(t, err) + + err = awaiter.Await(context.Background()) + + assert.NoError(t, err) +} + +func TestConditionFailure(t *testing.T) { + t.Parallel() + + awaiter, err := NewAwaiter( + WithCondition(condition.NewFailure(fmt.Errorf("expected"))), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + err = awaiter.Await(context.Background()) + + assert.ErrorContains(t, err, "expected") + assert.ErrorAs(t, err, &errObject{}) +} + +func TestEventualSuccess(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + obj := &unstructured.Unstructured{Object: map[string]any{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]any{"name": "foo", "namespace": "default"}, + }} + want := watch.Event{Type: watch.Modified, Object: obj} + + events := make(chan watch.Event) + source := &staticEventSource{ + events: events, + } + + awaiter, err := NewAwaiter( + WithCondition( + condition.NewOn(ctx, source, obj, want), + ), + WithObservers(condition.NewNever(nil)), + ) + require.NoError(t, err) + + done := make(chan error) + go func() { + done <- awaiter.Await(context.Background()) + }() + + events <- watch.Event{Type: watch.Added, Object: obj} + + select { + case <-done: + t.Fatal("await should not have finished") + case <-time.Tick(time.Second): + } + + events <- want + + select { + case err := <-done: + assert.NoError(t, err) + case <-time.Tick(time.Second): + t.Fatal("await should have finished") + } +} + +type staticEventSource struct { + events chan watch.Event +} + +func (ses *staticEventSource) Start(context.Context, schema.GroupVersionKind) (<-chan watch.Event, error) { + return ses.events, nil +} diff --git a/provider/pkg/metadata/annotations.go b/provider/pkg/metadata/annotations.go index 95cec155f7..ceca87ef31 100644 --- a/provider/pkg/metadata/annotations.go +++ b/provider/pkg/metadata/annotations.go @@ -15,10 +15,14 @@ package metadata import ( + "context" "strings" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/logging" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" ) const ( @@ -103,7 +107,30 @@ func GetAnnotationValue(obj *unstructured.Unstructured, key string) string { return annotations[key] } +// GetDeletedCondition inspects the object's annotations and returns a +// condition.Satisfier appropriate for using when awaiting deletion. +func GetDeletedCondition( + ctx context.Context, + source condition.Source, + clientset clientGetter, + logger *logging.DedupLogger, + obj *unstructured.Unstructured, +) (condition.Satisfier, error) { + if IsAnnotationTrue(obj, AnnotationSkipAwait) { + return condition.NewImmediate(logger, obj), nil + } + getter, err := clientset.ResourceClientForObject(obj) + if err != nil { + return nil, err + } + return condition.NewDeleted(ctx, source, getter, logger, obj) +} + func isComputedValue(v any) bool { _, isComputed := v.(resource.Computed) return isComputed } + +type clientGetter interface { + ResourceClientForObject(*unstructured.Unstructured) (dynamic.ResourceInterface, error) +} diff --git a/provider/pkg/metadata/annotations_test.go b/provider/pkg/metadata/annotations_test.go index 6003bfa739..a54eeebd01 100644 --- a/provider/pkg/metadata/annotations_test.go +++ b/provider/pkg/metadata/annotations_test.go @@ -15,11 +15,16 @@ package metadata import ( + "context" "testing" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/logging" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" ) func TestSetAnnotation(t *testing.T) { @@ -55,15 +60,19 @@ func TestSetAnnotation(t *testing.T) { args args }{ {"set-with-no-annotation", args{ - obj: noAnnotationObj, key: "foo", value: "bar", expectSet: true, expectKey: "foo", expectValue: "bar"}}, + obj: noAnnotationObj, key: "foo", value: "bar", expectSet: true, expectKey: "foo", expectValue: "bar", + }}, {"set-with-existing-annotations", args{ - obj: existingAnnotationObj, key: "foo", value: "bar", expectSet: true, expectKey: "foo", expectValue: "bar"}}, + obj: existingAnnotationObj, key: "foo", value: "bar", expectSet: true, expectKey: "foo", expectValue: "bar", + }}, // Computed fields cannot be set, so SetAnnotation is a no-op. {"set-with-computed-metadata", args{ - obj: computedMetadataObj, key: "foo", value: "bar", expectSet: false}}, + obj: computedMetadataObj, key: "foo", value: "bar", expectSet: false, + }}, {"set-with-computed-annotation", args{ - obj: computedAnnotationObj, key: "foo", value: "bar", expectSet: false}}, + obj: computedAnnotationObj, key: "foo", value: "bar", expectSet: false, + }}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -77,3 +86,52 @@ func TestSetAnnotation(t *testing.T) { }) } } + +func TestSkipAwait(t *testing.T) { + tests := []struct { + name string + getter func(context.Context, condition.Source, clientGetter, *logging.DedupLogger, *unstructured.Unstructured) (condition.Satisfier, error) + obj *unstructured.Unstructured + want condition.Satisfier + }{ + { + name: "skipAwait=true takes priority over delete condition", + obj: &unstructured.Unstructured{Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "true", + }, + }, + }}, + getter: GetDeletedCondition, + want: condition.Immediate{}, + }, + { + name: "skipAwait=false doesn't take priority over delete condition", + obj: &unstructured.Unstructured{Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "false", + }, + }, + }}, + getter: GetDeletedCondition, + want: &condition.Deleted{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + condition, err := tt.getter(context.Background(), nil, noopClientGetter{}, nil, tt.obj) + require.NoError(t, err) + + assert.IsType(t, tt.want, condition) + }) + } +} + +type noopClientGetter struct{} + +func (noopClientGetter) ResourceClientForObject(*unstructured.Unstructured) (dynamic.ResourceInterface, error) { + return nil, nil +} diff --git a/tests/sdk/java/await_test.go b/tests/sdk/java/await_test.go index 580d6e8a2f..ee8e101102 100644 --- a/tests/sdk/java/await_test.go +++ b/tests/sdk/java/await_test.go @@ -17,6 +17,7 @@ package test import ( "context" "testing" + "time" "github.com/pulumi/providertest/pulumitest" "github.com/pulumi/providertest/pulumitest/opttest" @@ -132,3 +133,33 @@ func TestAwaitServiceAccount(t *testing.T) { test.Up() test.Refresh() } + +func TestSkipAwait(t *testing.T) { + t.Parallel() + + test := pulumitest.NewPulumiTest(t, + "testdata/await/skipawait", + opttest.SkipInstall(), + ) + + start := time.Now() + _ = test.Up() + took := time.Since(start) + assert.Less(t, took, 2*time.Minute, "didn't skip pod's slow startup") + + start = time.Now() + _ = test.Refresh() + took = time.Since(start) + assert.Less(t, took, 2*time.Minute, "didn't skip pod's slow read") + + test.UpdateSource("testdata/await/skipawait/step2") + start = time.Now() + _ = test.Refresh() + took = time.Since(start) + assert.Less(t, took, 2*time.Minute, "didn't skip pod's slow update") + + start = time.Now() + _ = test.Destroy() + took = time.Since(start) + assert.Less(t, took, 2*time.Minute, "didn't skip config map's stuck delete") +} diff --git a/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml b/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml new file mode 100644 index 0000000000..f00b8f39b9 --- /dev/null +++ b/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml @@ -0,0 +1,35 @@ +name: skipawait +runtime: yaml +description: | + Tests the skipAwait annotation: + - A slow-to-start deployment tests create/update/read. + - Delete is tested with a config map with a stuck finalizer. +resources: + stuck-config: + type: kubernetes:core/v1:ConfigMap + properties: + metadata: + finalizers: + - pulumi.com/stuck + annotations: + pulumi.com/skipAwait: "true" + data: + foo: bar + + slow-pod: + type: kubernetes:core/v1:Pod + properties: + metadata: + annotations: + pulumi.com/skipAwait: "true" + spec: + containers: + - image: busybox + name: busybox + command: ["sleep", "infinity"] + readinessProbe: + exec: + command: + - ls + initialDelaySeconds: 600 # 10 minutes! + periodSeconds: 10 From 4d3223afc9917d9047bc273724b7ed2342ef72e4 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Tue, 23 Jul 2024 15:00:19 -0700 Subject: [PATCH 02/15] Remove custom delete-await logic --- provider/pkg/await/await_test.go | 2 +- provider/pkg/await/awaiters.go | 249 +-------------------------- provider/pkg/await/daemonset.go | 39 ----- provider/pkg/await/daemonset_test.go | 28 +-- provider/pkg/await/util.go | 20 --- 5 files changed, 25 insertions(+), 313 deletions(-) diff --git a/provider/pkg/await/await_test.go b/provider/pkg/await/await_test.go index 91943b7218..63d5bd7bff 100644 --- a/provider/pkg/await/await_test.go +++ b/provider/pkg/await/await_test.go @@ -654,7 +654,7 @@ func TestDeletion(t *testing.T) { // reactions suppressDeletion := func(t *testing.T, ctx testCtx, action kubetesting.Action) (bool, runtime.Object, error) { - return true, nil, nil + return true, ctx.config.Outputs, nil } cancelAwait := func(t *testing.T, ctx testCtx, action kubetesting.Action) (bool, runtime.Object, error) { diff --git a/provider/pkg/await/awaiters.go b/provider/pkg/await/awaiters.go index 8cb9629b86..7de2bfd4d6 100644 --- a/provider/pkg/await/awaiters.go +++ b/provider/pkg/await/awaiters.go @@ -34,7 +34,6 @@ import ( storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/client-go/dynamic" ) // awaitConfig specifies on which conditions we are to consider a resource created and fully @@ -62,15 +61,9 @@ func (cac *awaitConfig) Clock() clockwork.Clock { return clockwork.NewRealClock() } -type deleteAwaitConfig struct { - awaitConfig - clientForResource dynamic.ResourceInterface -} - type ( - awaiter func(awaitConfig) error - readAwaiter func(awaitConfig) error - deletionAwaiter func(deleteAwaitConfig) error + awaiter func(awaitConfig) error + readAwaiter func(awaitConfig) error ) func (cac *awaitConfig) getTimeout(defaultSeconds int) time.Duration { @@ -133,9 +126,8 @@ const ( ) type awaitSpec struct { - await awaiter - awaitRead readAwaiter - awaitDeletion deletionAwaiter + await awaiter + awaitRead readAwaiter } var deploymentAwaiter = awaitSpec{ @@ -145,7 +137,6 @@ var deploymentAwaiter = awaitSpec{ awaitRead: func(c awaitConfig) error { return makeDeploymentInitAwaiter(c).Read() }, - awaitDeletion: untilAppsDeploymentDeleted, } var ingressAwaiter = awaitSpec{ @@ -160,7 +151,6 @@ var jobAwaiter = awaitSpec{ awaitRead: func(c awaitConfig) error { return makeJobInitAwaiter(c).Read() }, - awaitDeletion: untilBatchV1JobDeleted, } var statefulsetAwaiter = awaitSpec{ @@ -170,7 +160,6 @@ var statefulsetAwaiter = awaitSpec{ awaitRead: func(c awaitConfig) error { return makeStatefulSetInitAwaiter(c).Read() }, - awaitDeletion: untilAppsStatefulSetDeleted, } var daemonsetAwaiter = awaitSpec{ @@ -180,9 +169,6 @@ var daemonsetAwaiter = awaitSpec{ awaitRead: func(c awaitConfig) error { return newDaemonSetAwaiter(c).Read() }, - awaitDeletion: func(c deleteAwaitConfig) error { - return newDaemonSetAwaiter(c.awaitConfig).Delete() - }, } // NOTE: Some GVKs below are blank so that we can distinguish between resource types that we know @@ -202,9 +188,6 @@ var awaiters = map[string]awaitSpec{ batchV1Job: jobAwaiter, coreV1ConfigMap: { /* NONE */ }, coreV1LimitRange: { /* NONE */ }, - coreV1Namespace: { - awaitDeletion: untilCoreV1NamespaceDeleted, - }, coreV1PersistentVolume: { await: untilCoreV1PersistentVolumeInitialized, }, @@ -212,13 +195,11 @@ var awaiters = map[string]awaitSpec{ await: untilCoreV1PersistentVolumeClaimReady, }, coreV1Pod: { - await: awaitPodInit, - awaitRead: awaitPodRead, - awaitDeletion: untilCoreV1PodDeleted, + await: awaitPodInit, + awaitRead: awaitPodRead, }, coreV1ReplicationController: { - await: untilCoreV1ReplicationControllerInitialized, - awaitDeletion: untilCoreV1ReplicationControllerDeleted, + await: untilCoreV1ReplicationControllerInitialized, }, coreV1ResourceQuota: { await: untilCoreV1ResourceQuotaInitialized, @@ -264,156 +245,6 @@ var awaiters = map[string]awaitSpec{ // -------------------------------------------------------------------------- -// -------------------------------------------------------------------------- - -// apps/v1/Deployment, apps/v1beta1/Deployment, apps/v1beta2/Deployment, -// extensions/v1beta1/Deployment - -// -------------------------------------------------------------------------- - -func deploymentSpecReplicas(deployment *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(deployment.Object, "spec", "replicas") -} - -func untilAppsDeploymentDeleted(config deleteAwaitConfig) error { - // - // TODO(hausdorff): Should we scale pods to 0 and then delete instead? Kubernetes should allow us - // to check the status after deletion, but there is some possibility if there is a long-ish - // transient network partition (or something) that it could be successfully deleted and GC'd - // before we get to check it, which I think would require manual intervention. - // - statusReplicas := func(deployment *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(deployment.Object, "status", "replicas") - } - - deploymentMissing := func(d *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - logger.V(3).Infof("Received error deleting deployment '%s': %#v", d.GetName(), err) - return err - } - - currReplicas, _ := statusReplicas(d) - specReplicas, _ := deploymentSpecReplicas(d) - - return watcher.RetryableError( - fmt.Errorf("deployment %q still exists (%d / %d replicas exist)", d.GetName(), - currReplicas, specReplicas)) - } - - // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := config.getTimeout(600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(deploymentMissing, timeout) - if err != nil { - return err - } - - logger.V(3).Infof("Deployment '%s' deleted", config.currentOutputs.GetName()) - - return nil -} - -// -------------------------------------------------------------------------- - -// apps/v1/StatefulSet, apps/v1beta1/StatefulSet, apps/v1beta2/StatefulSet, - -// -------------------------------------------------------------------------- - -func untilAppsStatefulSetDeleted(config deleteAwaitConfig) error { - specReplicas := func(statefulset *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(statefulset.Object, "spec", "replicas") - } - statusReplicas := func(statefulset *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(statefulset.Object, "status", "replicas") - } - - statefulsetmissing := func(d *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - logger.V(3).Infof("Received error deleting StatefulSet %q: %#v", d.GetName(), err) - return err - } - - currReplicas, _ := statusReplicas(d) - specReplicas, _ := specReplicas(d) - - return watcher.RetryableError( - fmt.Errorf("StatefulSet %q still exists (%d / %d replicas exist)", d.GetName(), - currReplicas, specReplicas)) - } - - // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := config.getTimeout(600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(statefulsetmissing, timeout) - if err != nil { - return err - } - - logger.V(3).Infof("StatefulSet %q deleted", config.currentOutputs.GetName()) - - return nil -} - -// -------------------------------------------------------------------------- - -// batch/v1/Job - -// -------------------------------------------------------------------------- - -func untilBatchV1JobDeleted(config deleteAwaitConfig) error { - jobMissingOrKilled := func(pod *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - return err - } - - e := fmt.Errorf("job %q still exists", pod.GetName()) - return watcher.RetryableError(e) - } - - timeout := config.getTimeout(300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(jobMissingOrKilled, timeout) -} - -// -------------------------------------------------------------------------- - -// core/v1/Namespace - -// -------------------------------------------------------------------------- - -func untilCoreV1NamespaceDeleted(config deleteAwaitConfig) error { - namespaceMissingOrKilled := func(ns *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - logger.V(3).Infof("Received error deleting namespace %q: %#v", - ns.GetName(), err) - return err - } - - statusPhase, _, _ := unstructured.NestedString(ns.Object, "status", "phase") - logger.V(3).Infof("Namespace %q status received: %#v", ns.GetName(), statusPhase) - if statusPhase == "" { - return nil - } - - return watcher.RetryableError(fmt.Errorf("namespace %q still exists (%v)", - ns.GetName(), statusPhase)) - } - - timeout := config.getTimeout(300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(namespaceMissingOrKilled, timeout) -} - -// -------------------------------------------------------------------------- - // core/v1/PersistentVolume // -------------------------------------------------------------------------- @@ -504,32 +335,6 @@ func pvcBindMode( // -------------------------------------------------------------------------- -// core/v1/Pod - -// -------------------------------------------------------------------------- - -// TODO(lblackstone): unify the function signatures across awaiters -func untilCoreV1PodDeleted(config deleteAwaitConfig) error { - podMissingOrKilled := func(pod *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - return err - } - - statusPhase, _ := openapi.Pluck(pod.Object, "status", "phase") - logger.V(3).Infof("Current state of pod %q: %#v", pod.GetName(), statusPhase) - e := fmt.Errorf("pod %q still exists (%v)", pod.GetName(), statusPhase) - return watcher.RetryableError(e) - } - - timeout := config.getTimeout(300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(podMissingOrKilled, timeout) -} - -// -------------------------------------------------------------------------- - // core/v1/ReplicationController // -------------------------------------------------------------------------- @@ -571,46 +376,6 @@ func untilCoreV1ReplicationControllerInitialized(c awaitConfig) error { return nil } -func untilCoreV1ReplicationControllerDeleted(config deleteAwaitConfig) error { - // - // TODO(hausdorff): Should we scale pods to 0 and then delete instead? Kubernetes should allow us - // to check the status after deletion, but there is some possibility if there is a long-ish - // transient network partition (or something) that it could be successfully deleted and GC'd - // before we get to check it, which I think would require manual intervention. - // - statusReplicas := func(rc *unstructured.Unstructured) (any, bool) { - return openapi.Pluck(rc.Object, "status", "replicas") - } - - rcMissing := func(rc *unstructured.Unstructured, err error) error { - if is404(err) { - return nil - } else if err != nil { - logger.V(3).Infof("Received error deleting ReplicationController %q: %#v", rc.GetName(), err) - return err - } - - currReplicas, _ := statusReplicas(rc) - specReplicas, _ := deploymentSpecReplicas(rc) - - return watcher.RetryableError( - fmt.Errorf("ReplicationController %q still exists (%d / %d replicas exist)", - rc.GetName(), currReplicas, specReplicas)) - } - - // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := config.getTimeout(600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). - RetryUntil(rcMissing, timeout) - if err != nil { - return err - } - - logger.V(3).Infof("ReplicationController %q deleted", config.currentOutputs.GetName()) - - return nil -} - // -------------------------------------------------------------------------- // core/v1/ResourceQuota diff --git a/provider/pkg/await/daemonset.go b/provider/pkg/await/daemonset.go index d99171738d..dc17bbbbde 100644 --- a/provider/pkg/await/daemonset.go +++ b/provider/pkg/await/daemonset.go @@ -24,7 +24,6 @@ import ( "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/informers" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/kinds" - "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/openapi" "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil" logger "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" @@ -84,44 +83,6 @@ func (dsa *dsAwaiter) Await() error { return dsa.await(dsa.rolloutComplete) } -// Delete blocks until a DaemonSet has been deleted. Returns nil if the -// DaemonSet does not exist. -func (dsa *dsAwaiter) Delete() error { - // Perform a lookup in case the object has already been deleted - client, _, err := dsa.clients() - if err != nil { - return err - } - ds := dsa.config.currentOutputs - _, err = client.Get(dsa.config.ctx, ds.GetName(), metav1.GetOptions{}) - if is404(err) { - return nil - } - if err != nil { - logger.V(3).Infof("Received error deleting DaemonSet %q: %#v", ds.GetName(), err) - return err - } - - // Otherwise wait for a deletion event. - deleted := func() bool { - if dsa.deleted { - return true - } - misscheduled, _ := openapi.Pluck(dsa.ds.Object, "status", "numberMisscheduled") - dsa.config.logger.LogStatus( - diag.Info, - fmt.Sprintf( - "DaemonSet %q still exists (%v pods misscheduled)", - ds.GetName(), - misscheduled, - ), - ) - return false - } - - return dsa.await(deleted) -} - // Read returns the current state of the DaemonSet and returns an error if it // is not in a ready state. func (dsa *dsAwaiter) Read() error { diff --git a/provider/pkg/await/daemonset_test.go b/provider/pkg/await/daemonset_test.go index ecb239d6c5..969c67b24d 100644 --- a/provider/pkg/await/daemonset_test.go +++ b/provider/pkg/await/daemonset_test.go @@ -270,10 +270,11 @@ func TestAwaitDaemonSetDelete(t *testing.T) { } tests := []struct { - name string - given *unstructured.Unstructured - setup []func(*fake.SimpleDynamicClient, *unstructured.Unstructured) - events func(clockwork.FakeClock, *unstructured.Unstructured) <-chan watch.Event + name string + given *unstructured.Unstructured + setup []func(*fake.SimpleDynamicClient, *unstructured.Unstructured) + events func(*unstructured.Unstructured) <-chan watch.Event + timeout time.Duration wantErr string }{ @@ -290,8 +291,7 @@ func TestAwaitDaemonSetDelete(t *testing.T) { ensureExists, dontDeleteImmediately, }, - events: func(clock clockwork.FakeClock, ds *unstructured.Unstructured) <-chan watch.Event { - clock.Advance(1 * time.Minute) + events: func(ds *unstructured.Unstructured) <-chan watch.Event { c := make(chan watch.Event, 1) c <- watchDeletedEvent(ds) return c @@ -304,11 +304,11 @@ func TestAwaitDaemonSetDelete(t *testing.T) { ensureExists, dontDeleteImmediately, }, - events: func(clock clockwork.FakeClock, ds *unstructured.Unstructured) <-chan watch.Event { - clock.Advance(_defaultDaemonSetTimeout) + events: func(ds *unstructured.Unstructured) <-chan watch.Event { c := make(chan watch.Event, 1) return c }, + timeout: 1 * time.Second, wantErr: "timed out waiting for the condition", }, { @@ -320,20 +320,26 @@ func TestAwaitDaemonSetDelete(t *testing.T) { } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { - pconfig, clientset, clock := fakeProviderConfig(context.Background(), t) + t.Parallel() + pconfig, clientset, _ := fakeProviderConfig(context.Background(), t) config := DeleteConfig{ ProviderConfig: pconfig, Inputs: tt.given, Outputs: tt.given, Name: tt.given.GetName(), + Timeout: tt.timeout.Seconds(), } w := watch.NewRaceFreeFake() clientset.PrependWatchReactor("daemonsets", testcore.DefaultWatchReactor(w, nil)) + go func() { - clock.BlockUntil(1) // Timeout sleeper - for e := range tt.events(clock, tt.given) { + if tt.events == nil { + return + } + for e := range tt.events(tt.given) { w.Action(e.Type, e.Object) } }() diff --git a/provider/pkg/await/util.go b/provider/pkg/await/util.go index 8ed84277ff..3f9a8e735f 100644 --- a/provider/pkg/await/util.go +++ b/provider/pkg/await/util.go @@ -15,10 +15,6 @@ package await import ( - "errors" - "net/http" - - k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" @@ -78,22 +74,6 @@ func watchDeletedEvent(obj runtime.Object) watch.Event { // -------------------------------------------------------------------------- -// Response helpers. - -// -------------------------------------------------------------------------- - -// is404 returns true if any error in err's tree is a Kubernetes StatusError -// with code 404. -func is404(err error) bool { - var statusErr *k8serrors.StatusError - if errors.As(err, &statusErr) { - return statusErr.ErrStatus.Code == http.StatusNotFound - } - return false -} - -// -------------------------------------------------------------------------- - // Ownership helpers. // -------------------------------------------------------------------------- From da7eff7494ceb7db83dabdbbebd1297f60c1431a Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Mon, 29 Jul 2024 12:55:23 -0700 Subject: [PATCH 03/15] Fix partial error --- provider/pkg/await/await_test.go | 11 +++++++++++ provider/pkg/await/internal/awaiter.go | 4 ++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/provider/pkg/await/await_test.go b/provider/pkg/await/await_test.go index 63d5bd7bff..aade739587 100644 --- a/provider/pkg/await/await_test.go +++ b/provider/pkg/await/await_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/internal" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/clients/fake" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/cluster" @@ -959,6 +960,16 @@ func Test_Watcher_Interface_Timeout(t *testing.T) { assert.Equal(t, "Timeout occurred polling for ''", err.Error()) } +func TestAwaiterInterfaceTimeout(t *testing.T) { + awaiter, err := internal.NewAwaiter(internal.WithCondition(condition.NewNever(nil))) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + err = awaiter.Await(ctx) + _, isPartialErr := err.(PartialError) + assert.True(t, isPartialErr, "Timed out watcher should emit `await.PartialError`") +} + // -------------------------------------------------------------------------- // Helpers diff --git a/provider/pkg/await/internal/awaiter.go b/provider/pkg/await/internal/awaiter.go index c57d386f45..2b9ac12580 100644 --- a/provider/pkg/await/internal/awaiter.go +++ b/provider/pkg/await/internal/awaiter.go @@ -161,6 +161,6 @@ type errObject struct { object *unstructured.Unstructured } -func (e errObject) Object() (*unstructured.Unstructured, error) { - return e.object, nil +func (e errObject) Object() *unstructured.Unstructured { + return e.object } From 6cb97fc2ff966bdaf43c4cac9e951bba77dc97c0 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Thu, 1 Aug 2024 14:45:43 -0700 Subject: [PATCH 04/15] recover missing step2 --- tests/sdk/java/await_test.go | 5 ++- .../await/skipawait/step2/Pulumi.yaml | 36 +++++++++++++++++++ 2 files changed, 40 insertions(+), 1 deletion(-) create mode 100644 tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml diff --git a/tests/sdk/java/await_test.go b/tests/sdk/java/await_test.go index ee8e101102..f26ba18c99 100644 --- a/tests/sdk/java/await_test.go +++ b/tests/sdk/java/await_test.go @@ -134,13 +134,16 @@ func TestAwaitServiceAccount(t *testing.T) { test.Refresh() } -func TestSkipAwait(t *testing.T) { +func TestAwaitSkip(t *testing.T) { t.Parallel() test := pulumitest.NewPulumiTest(t, "testdata/await/skipawait", opttest.SkipInstall(), ) + t.Cleanup(func() { + test.Destroy() + }) start := time.Now() _ = test.Up() diff --git a/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml b/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml new file mode 100644 index 0000000000..a7de6d62f2 --- /dev/null +++ b/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml @@ -0,0 +1,36 @@ +name: skipawait +runtime: yaml +description: | + Tests the skipAwait annotation: + - A slow-to-start deployment tests create/update/read. + - Delete is tested with a config map with a stuck finalizer. +resources: + stuck-config: + type: kubernetes:core/v1:ConfigMap + properties: + metadata: + finalizers: + - pulumi.com/stuck + annotations: + pulumi.com/skipAwait: "true" + data: + foo: bar + + slow-pod: + type: kubernetes:core/v1:Pod + properties: + metadata: + annotations: + foo: bar # Add an annotation to trigger an update. + pulumi.com/skipAwait: "true" + spec: + containers: + - image: busybox + name: busybox + command: ["sleep", "infinity"] + readinessProbe: + exec: + command: + - ls + initialDelaySeconds: 600 # 10 minutes! + periodSeconds: 10 From c17b59e9ace23bc25062810e22f3e25363dea1b9 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Thu, 1 Aug 2024 16:23:05 -0700 Subject: [PATCH 05/15] read annotation from inputs --- provider/pkg/await/await.go | 2 +- provider/pkg/metadata/annotations.go | 7 ++- provider/pkg/metadata/annotations_test.go | 54 ++++++++++++++--------- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/provider/pkg/await/await.go b/provider/pkg/await/await.go index ee70a07d16..a048b1d019 100644 --- a/provider/pkg/await/await.go +++ b/provider/pkg/await/await.go @@ -818,7 +818,7 @@ func Deletion(c DeleteConfig) error { source := condition.NewDynamicSource(ctx, c.ClientSet, c.Outputs.GetNamespace()) // Determine the condition to wait for. - deleted, err := metadata.GetDeletedCondition(ctx, source, c.ClientSet, c.DedupLogger, c.Outputs) + deleted, err := metadata.GetDeletedCondition(ctx, source, c.ClientSet, c.DedupLogger, c.Inputs, c.Outputs) if err != nil { return err } diff --git a/provider/pkg/metadata/annotations.go b/provider/pkg/metadata/annotations.go index ceca87ef31..6d821b7e9a 100644 --- a/provider/pkg/metadata/annotations.go +++ b/provider/pkg/metadata/annotations.go @@ -109,14 +109,19 @@ func GetAnnotationValue(obj *unstructured.Unstructured, key string) string { // GetDeletedCondition inspects the object's annotations and returns a // condition.Satisfier appropriate for using when awaiting deletion. +// +// The "inputs" parameter is the source of truth for user-provided annotations, +// but it is not guaranteed to be named. The "obj" parameter should be used for +// conditions. func GetDeletedCondition( ctx context.Context, source condition.Source, clientset clientGetter, logger *logging.DedupLogger, + inputs *unstructured.Unstructured, obj *unstructured.Unstructured, ) (condition.Satisfier, error) { - if IsAnnotationTrue(obj, AnnotationSkipAwait) { + if IsAnnotationTrue(inputs, AnnotationSkipAwait) { return condition.NewImmediate(logger, obj), nil } getter, err := clientset.ResourceClientForObject(obj) diff --git a/provider/pkg/metadata/annotations_test.go b/provider/pkg/metadata/annotations_test.go index a54eeebd01..3d03b9ee18 100644 --- a/provider/pkg/metadata/annotations_test.go +++ b/provider/pkg/metadata/annotations_test.go @@ -19,7 +19,6 @@ import ( "testing" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" - "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/logging" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -87,42 +86,57 @@ func TestSetAnnotation(t *testing.T) { } } -func TestSkipAwait(t *testing.T) { +func TestGetDeletedCondition(t *testing.T) { tests := []struct { name string - getter func(context.Context, condition.Source, clientGetter, *logging.DedupLogger, *unstructured.Unstructured) (condition.Satisfier, error) + inputs *unstructured.Unstructured obj *unstructured.Unstructured want condition.Satisfier }{ { - name: "skipAwait=true takes priority over delete condition", - obj: &unstructured.Unstructured{Object: map[string]any{ - "metadata": map[string]any{ - "annotations": map[string]any{ - AnnotationSkipAwait: "true", + name: "skipAwait=true", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "true", + }, }, }, - }}, - getter: GetDeletedCondition, - want: condition.Immediate{}, + }, + want: condition.Immediate{}, }, { - name: "skipAwait=false doesn't take priority over delete condition", - obj: &unstructured.Unstructured{Object: map[string]any{ - "metadata": map[string]any{ - "annotations": map[string]any{ - AnnotationSkipAwait: "false", + name: "skipAwait=false", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "false", + }, }, }, - }}, - getter: GetDeletedCondition, - want: &condition.Deleted{}, + }, + want: &condition.Deleted{}, + }, + { + name: "skipAwait unset", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{}, + }, + }, + want: &condition.Deleted{}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - condition, err := tt.getter(context.Background(), nil, noopClientGetter{}, nil, tt.obj) + obj := tt.obj + if obj == nil { + obj = tt.inputs + } + condition, err := GetDeletedCondition(context.Background(), nil, noopClientGetter{}, nil, tt.inputs, obj) require.NoError(t, err) assert.IsType(t, tt.want, condition) From 4e67b65e90fb6c764379353194408bcc93b76f51 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Mon, 5 Aug 2024 09:14:10 -0700 Subject: [PATCH 06/15] simplify awaiter --- provider/pkg/await/condition/observer.go | 4 +- provider/pkg/await/internal/awaiter.go | 90 +++++++++--------------- 2 files changed, 38 insertions(+), 56 deletions(-) diff --git a/provider/pkg/await/condition/observer.go b/provider/pkg/await/condition/observer.go index befe197626..2b8cef071f 100644 --- a/provider/pkg/await/condition/observer.go +++ b/provider/pkg/await/condition/observer.go @@ -153,7 +153,9 @@ func (o *observer) Range(yield func(watch.Event) bool) { if !ok || !o.keep(obj) { continue } - yield(e) + if !yield(e) { + return // Done iterating. + } } } } diff --git a/provider/pkg/await/internal/awaiter.go b/provider/pkg/await/internal/awaiter.go index 2b9ac12580..80ac6183f5 100644 --- a/provider/pkg/await/internal/awaiter.go +++ b/provider/pkg/await/internal/awaiter.go @@ -18,8 +18,6 @@ import ( "context" "errors" "fmt" - "sync" - "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/wait" @@ -42,76 +40,58 @@ func NewAwaiter(options ...awaiterOption) (*Awaiter, error) { return ea, nil } -// Await blocks until the Condition is met or until the context is closed. +// Await blocks until the Condition is met or until the context is canceled. // The operation's timeout should be applied to the provided Context. -func (ea *Awaiter) Await(ctx context.Context) (err error) { - if ea.condition == nil { +func (aw *Awaiter) Await(ctx context.Context) (err error) { + if aw.condition == nil { return fmt.Errorf("missing condition") } - // check channel is signaled when we should re-evaluate our condition. - check := make(chan struct{}) - - // We'll spawn a goroutine for our condition and each observer. We wait for - // them to tear down before returning because the condition might change - // during that time. - wg := sync.WaitGroup{} - wg.Add(len(ea.observers) + 1) - go func() { - wg.Wait() - close(check) - }() - - // Start all of our observers. - observers := append([]condition.Observer{ea.condition}, ea.observers...) - for _, o := range observers { + // Start all of our observers. They'll continue until they're canceled. + for _, o := range aw.observers { go func(o condition.Observer) { - defer wg.Done() o.Range(func(e watch.Event) bool { _ = o.Observe(e) - // Re-evaluate our condition if we see an event for it. - if _, ok := o.(condition.Satisfier); ok { - check <- struct{}{} - } return true }) }(o) } - // Before returning we attempt to re-evaluate the condition a final time, - // and we wrap our error with our object's last known state so it can be - // checkpointed. - defer func() { - if err == nil { - // Nothing to do. - return - } - // Make sure Observers are all done. - wg.Wait() - if done, _ := ea.condition.Satisfied(); done { - err = nil - } - // Wrap our error with our object's state. + // Block until our condition is satisfied, or until our Context is canceled. + aw.condition.Range(func(e watch.Event) bool { + err = aw.condition.Observe(e) if err != nil { - err = errObject{error: err, object: ea.condition.Object()} + return false + } + if done, _ := aw.condition.Satisfied(); done { + return false } + return true + }) + + // Re-evaluate our condition since its state might have changed during the + // iterator's teardown. + done, err := aw.condition.Satisfied() + if done { + return nil + } + + // Make sure the error we return includes the object's partial state. + defer func() { + err = errObject{error: err, object: aw.condition.Object()} }() - for { - select { - case <-check: - done, err := ea.condition.Satisfied() - if done || err != nil { - return err - } - case <-ctx.Done(): - err := ctx.Err() - if errors.Is(err, context.DeadlineExceeded) { - err = fmt.Errorf("timed out waiting for the condition") - } - return wait.ErrorInterrupted(err) - } + if err != nil { + return err + + } + + err = ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { + // Preserve the default k8s "timed out waiting for the condition" error. + err = nil } + return wait.ErrorInterrupted(err) } type awaiterOption interface { From 86a0d39a096a12c522cf8c10dadff8d83ffa6090 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Mon, 5 Aug 2024 09:48:29 -0700 Subject: [PATCH 07/15] fix race condition between 404 check and informers --- provider/pkg/await/condition/deleted.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/provider/pkg/await/condition/deleted.go b/provider/pkg/await/condition/deleted.go index 16c8201ad8..bb1a8fd5b3 100644 --- a/provider/pkg/await/condition/deleted.go +++ b/provider/pkg/await/condition/deleted.go @@ -20,6 +20,7 @@ import ( "fmt" "net/http" "strings" + "sync" "sync/atomic" checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" @@ -62,22 +63,32 @@ func NewDeleted( // exhausted, we attempt a final lookup on the cluster to be absolutely sure it // still exists. func (dc *Deleted) Range(yield func(watch.Event) bool) { + // Start listening to events before we check if the resource has already + // been deleted. This avoids races where the object is deleted in-between + // the 404 check and this watch. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + dc.observer.Range(yield) + }() + dc.getClusterState() if dc.deleted.Load() { - // Already deleted, nothing more to do. + // Already deleted, nothing more to do. Our informer will get cleaned up + // when its context is canceled. return } - // Iterate over the underlying Observer's events. - dc.observer.Range(yield) + wg.Wait() if dc.deleted.Load() { // Nothing more to do. return } - // Attempt one last lookup if the object still exists. (This is legacy behavior - // that might be unnecessary since we're using Informers instead of + // Attempt one last lookup if the object still exists. (This is legacy + // behavior that might be unnecessary since we're using Informers instead of // Watches now.) dc.getClusterState() if dc.deleted.Load() { From cf5bd3733bbbf504c49a8a62162cff463f4f7893 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Mon, 5 Aug 2024 16:49:05 -0700 Subject: [PATCH 08/15] add observer tests --- provider/pkg/await/condition/observer.go | 5 +- provider/pkg/await/condition/observer_test.go | 146 ++++++++++++++++++ provider/pkg/await/internal/awaiter.go | 6 +- provider/pkg/await/internal/awaiter_test.go | 41 +++-- 4 files changed, 187 insertions(+), 11 deletions(-) create mode 100644 provider/pkg/await/condition/observer_test.go diff --git a/provider/pkg/await/condition/observer.go b/provider/pkg/await/condition/observer.go index 2b8cef071f..1090ba3a9c 100644 --- a/provider/pkg/await/condition/observer.go +++ b/provider/pkg/await/condition/observer.go @@ -147,7 +147,10 @@ func (o *observer) Range(yield func(watch.Event) bool) { select { case <-o.ctx.Done(): return - case e := <-events: + case e, ok := <-events: + if !ok { + return // Closed channel. + } // Ignore events not matching our "keep" filter. obj, ok := e.Object.(*unstructured.Unstructured) if !ok || !o.keep(obj) { diff --git a/provider/pkg/await/condition/observer_test.go b/provider/pkg/await/condition/observer_test.go new file mode 100644 index 0000000000..d5c0c82a4c --- /dev/null +++ b/provider/pkg/await/condition/observer_test.go @@ -0,0 +1,146 @@ +// Copyright 2024, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package condition + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" +) + +func TestObserver(t *testing.T) { + ctx := context.Background() + + t.Run("filter", func(t *testing.T) { + source := Static(make(chan watch.Event)) + gvk := schema.GroupVersionKind{} + o := NewObserver(ctx, source, gvk, func(obj *unstructured.Unstructured) bool { + i, _, _ := unstructured.NestedInt64(obj.Object, "n") + // Filter to even events so we should only see 2. + return i%2 == 0 + }) + + go func() { + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(1)}}} + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(2)}}} + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(3)}}} + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(5)}}} + close(source) + }() + + seen := int64(0) + o.Range(func(e watch.Event) bool { + i, _, _ := unstructured.NestedInt64(e.Object.(*unstructured.Unstructured).Object, "n") + seen += i + return true + }) + + assert.Equal(t, int64(2), seen) + }) + + t.Run("terminated", func(t *testing.T) { + source := Static(make(chan watch.Event)) + gvk := schema.GroupVersionKind{} + o := NewObserver(ctx, source, gvk, func(obj *unstructured.Unstructured) bool { + return true + }) + + go func() { + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(1)}}} + source <- watch.Event{Object: &unstructured.Unstructured{Object: map[string]interface{}{"n": int64(2)}}} + }() + + // We should only see the first object and then terminate early. + seen := int64(0) + o.Range(func(e watch.Event) bool { + i, _, _ := unstructured.NestedInt64(e.Object.(*unstructured.Unstructured).Object, "n") + seen += i + return false + }) + + assert.Equal(t, int64(1), seen) + }) + + t.Run("canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + + source := Static(make(chan watch.Event)) + gvk := schema.GroupVersionKind{} + o := NewObserver(ctx, source, gvk, func(obj *unstructured.Unstructured) bool { + return true + }) + + go func() { + cancel() + source <- watch.Event{Object: &unstructured.Unstructured{}} + }() + + seen := 0 + o.Range(func(e watch.Event) bool { + seen++ + return true + }) + + assert.Equal(t, 0, seen) + }) + + t.Run("children", func(t *testing.T) { + source := Static(make(chan watch.Event)) + gvk := schema.GroupVersionKind{} + + owner := &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "uid": "owner-uid", + }, + }, + } + + ownedBy := func(myUID, ownerUID string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "ownerReferences": []any{ + map[string]any{ + "uid": ownerUID, + }, + }, + "uid": myUID, + }, + }, + } + } + + o := NewChildObserver(ctx, source, owner, gvk) + + go func() { + source <- watch.Event{Object: ownedBy("other-uid", "other-owner-uid")} + source <- watch.Event{Object: ownedBy("expected-uid", "owner-uid")} + source <- watch.Event{Object: ownedBy("other-uid", "other-owner-uid")} + close(source) + }() + + o.Range(func(e watch.Event) bool { + obj, _ := e.Object.(*unstructured.Unstructured) + uid, _, _ := unstructured.NestedString(obj.Object, "metadata", "uid") + assert.Equal(t, "expected-uid", uid) + return true + }) + }) +} diff --git a/provider/pkg/await/internal/awaiter.go b/provider/pkg/await/internal/awaiter.go index 80ac6183f5..fbecd2ede8 100644 --- a/provider/pkg/await/internal/awaiter.go +++ b/provider/pkg/await/internal/awaiter.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/wait" @@ -83,7 +84,6 @@ func (aw *Awaiter) Await(ctx context.Context) (err error) { if err != nil { return err - } err = ctx.Err() @@ -144,3 +144,7 @@ type errObject struct { func (e errObject) Object() *unstructured.Unstructured { return e.object } + +func (e errObject) Unwrap() error { + return e.error +} diff --git a/provider/pkg/await/internal/awaiter_test.go b/provider/pkg/await/internal/awaiter_test.go index f9d5f63bb5..231e654f99 100644 --- a/provider/pkg/await/internal/awaiter_test.go +++ b/provider/pkg/await/internal/awaiter_test.go @@ -24,15 +24,18 @@ import ( "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" ) func TestCancel(t *testing.T) { t.Parallel() + obj := &unstructured.Unstructured{Object: map[string]any{"foo": "bar"}} + awaiter, err := NewAwaiter( - WithCondition(condition.NewNever(nil)), - WithObservers(condition.NewNever(nil)), + WithCondition(condition.NewNever(obj)), + WithObservers(condition.NewNever(obj)), ) require.NoError(t, err) @@ -42,6 +45,14 @@ func TestCancel(t *testing.T) { err = awaiter.Await(ctx) assert.Error(t, err) + assert.True(t, wait.Interrupted(err)) + + // The error should include the object's partial state. + partial, ok := err.(interface { + Object() *unstructured.Unstructured + }) + assert.True(t, ok) + assert.Equal(t, obj, partial.Object()) } func TestCancelWithRecovery(t *testing.T) { @@ -64,9 +75,11 @@ func TestCancelWithRecovery(t *testing.T) { func TestTimeout(t *testing.T) { t.Parallel() + obj := &unstructured.Unstructured{Object: map[string]any{"foo": "bar"}} + awaiter, err := NewAwaiter( - WithCondition(condition.NewNever(nil)), - WithObservers(condition.NewNever(nil)), + WithCondition(condition.NewNever(obj)), + WithObservers(condition.NewNever(obj)), ) require.NoError(t, err) @@ -76,6 +89,14 @@ func TestTimeout(t *testing.T) { err = awaiter.Await(ctx) assert.Error(t, err) + assert.True(t, wait.Interrupted(err)) + + // The error should include the object's partial state. + partial, ok := err.(interface { + Object() *unstructured.Unstructured + }) + assert.True(t, ok) + assert.Equal(t, obj, partial.Object()) } func TestImmediateSuccess(t *testing.T) { @@ -129,11 +150,13 @@ func TestEventualSuccess(t *testing.T) { ctx := context.Background() - obj := &unstructured.Unstructured{Object: map[string]any{ - "apiVersion": "v1", - "kind": "Pod", - "metadata": map[string]any{"name": "foo", "namespace": "default"}, - }} + obj := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]any{"name": "foo", "namespace": "default"}, + }, + } want := watch.Event{Type: watch.Modified, Object: obj} events := make(chan watch.Event) From 0ca7b8c1cdf45717c4b15ea871da4626d4e94fd4 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Tue, 6 Aug 2024 15:44:53 -0700 Subject: [PATCH 09/15] feedback --- provider/pkg/await/await_test.go | 2 +- provider/pkg/await/condition/deleted.go | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/provider/pkg/await/await_test.go b/provider/pkg/await/await_test.go index aade739587..40f4d61c55 100644 --- a/provider/pkg/await/await_test.go +++ b/provider/pkg/await/await_test.go @@ -767,7 +767,7 @@ func TestDeletion(t *testing.T) { name: "foo", objects: []runtime.Object{validPodUnstructured}, inputs: withSkipAwait(validPodUnstructured), - outputs: withSkipAwait(validPodUnstructured), + outputs: validPodUnstructured, }, reaction: []reactionF{suppressDeletion}, // suppress deletion to safeguard that the built-in watcher is not used. expect: []expectF{succeeded()}, diff --git a/provider/pkg/await/condition/deleted.go b/provider/pkg/await/condition/deleted.go index bb1a8fd5b3..4af697e472 100644 --- a/provider/pkg/await/condition/deleted.go +++ b/provider/pkg/await/condition/deleted.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "net/http" "strings" "sync" "sync/atomic" @@ -37,6 +36,7 @@ var _ Satisfier = (*Deleted)(nil) // event is received for the resource. type Deleted struct { observer *ObjectObserver + ctx context.Context logger logger deleted atomic.Bool getter objectGetter @@ -51,6 +51,7 @@ func NewDeleted( obj *unstructured.Unstructured, ) (*Deleted, error) { dc := &Deleted{ + ctx: ctx, observer: NewObjectObserver(ctx, source, obj), logger: logger, getter: getter, @@ -58,7 +59,7 @@ func NewDeleted( return dc, nil } -// Range confirms the object exists before establishing an Informer. +// Range establishes an Informer and confirms the object still existsr. // If a Deleted event isn't Observed by the time the underlying Observer is // exhausted, we attempt a final lookup on the cluster to be absolutely sure it // still exists. @@ -98,9 +99,6 @@ func (dc *Deleted) Range(yield func(watch.Event) bool) { // Let the user know we might be blocked if the object has finalizers. // https://github.com/pulumi/pulumi-kubernetes/issues/1418 finalizers := dc.Object().GetFinalizers() - if len(finalizers) == 0 { - return - } dc.logger.LogMessage(checkerlog.WarningMessage( fmt.Sprintf("finalizers might be preventing deletion (%s)", strings.Join(finalizers, ", ")), )) @@ -138,7 +136,10 @@ func (dc *Deleted) Object() *unstructured.Unstructured { // getClusterState performs a GET against the cluster and updates state to // reflect whether the object still exists or not. func (dc *Deleted) getClusterState() { - _, err := dc.getter.Get(context.Background(), dc.Object().GetName(), metav1.GetOptions{}) + // Our context might be closed, but we still want to issue this request + // even if we're shutting down. + ctx := context.WithoutCancel(dc.ctx) + _, err := dc.getter.Get(ctx, dc.Object().GetName(), metav1.GetOptions{}) if err == nil { // Still exists. dc.deleted.Store(false) @@ -146,6 +147,6 @@ func (dc *Deleted) getClusterState() { } var statusErr *k8serrors.StatusError if errors.As(err, &statusErr) { - dc.deleted.Store(statusErr.ErrStatus.Code == http.StatusNotFound) + dc.deleted.Store(k8serrors.IsNotFound(err)) } } From 524be9be9cdf1e726892708607ba75690cecbd22 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Wed, 7 Aug 2024 09:52:23 -0700 Subject: [PATCH 10/15] drop a defer --- provider/pkg/await/internal/awaiter.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/provider/pkg/await/internal/awaiter.go b/provider/pkg/await/internal/awaiter.go index fbecd2ede8..272f66027a 100644 --- a/provider/pkg/await/internal/awaiter.go +++ b/provider/pkg/await/internal/awaiter.go @@ -43,7 +43,7 @@ func NewAwaiter(options ...awaiterOption) (*Awaiter, error) { // Await blocks until the Condition is met or until the context is canceled. // The operation's timeout should be applied to the provided Context. -func (aw *Awaiter) Await(ctx context.Context) (err error) { +func (aw *Awaiter) Await(ctx context.Context) error { if aw.condition == nil { return fmt.Errorf("missing condition") } @@ -60,7 +60,7 @@ func (aw *Awaiter) Await(ctx context.Context) (err error) { // Block until our condition is satisfied, or until our Context is canceled. aw.condition.Range(func(e watch.Event) bool { - err = aw.condition.Observe(e) + err := aw.condition.Observe(e) if err != nil { return false } @@ -78,12 +78,10 @@ func (aw *Awaiter) Await(ctx context.Context) (err error) { } // Make sure the error we return includes the object's partial state. - defer func() { - err = errObject{error: err, object: aw.condition.Object()} - }() + obj := aw.condition.Object() if err != nil { - return err + return errObject{error: err, object: obj} } err = ctx.Err() @@ -91,7 +89,7 @@ func (aw *Awaiter) Await(ctx context.Context) (err error) { // Preserve the default k8s "timed out waiting for the condition" error. err = nil } - return wait.ErrorInterrupted(err) + return errObject{error: wait.ErrorInterrupted(err), object: obj} } type awaiterOption interface { From 8d04a8efb65f223681f3b067d8143744aa74945e Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Wed, 7 Aug 2024 10:09:08 -0700 Subject: [PATCH 11/15] additional logging --- provider/pkg/await/await.go | 1 + provider/pkg/await/condition/deleted.go | 10 +++-- provider/pkg/await/condition/deleted_test.go | 24 ++++++++++- provider/pkg/await/internal/awaiter.go | 43 ++++++++++++++++---- 4 files changed, 65 insertions(+), 13 deletions(-) diff --git a/provider/pkg/await/await.go b/provider/pkg/await/await.go index a048b1d019..7e2cf40f02 100644 --- a/provider/pkg/await/await.go +++ b/provider/pkg/await/await.go @@ -829,6 +829,7 @@ func Deletion(c DeleteConfig) error { awaiter, err := internal.NewAwaiter( internal.WithCondition(deleted), internal.WithNamespace(c.Outputs.GetNamespace()), + internal.WithLogger(c.DedupLogger), ) if err != nil { return err diff --git a/provider/pkg/await/condition/deleted.go b/provider/pkg/await/condition/deleted.go index 4af697e472..4530a8d749 100644 --- a/provider/pkg/await/condition/deleted.go +++ b/provider/pkg/await/condition/deleted.go @@ -16,7 +16,6 @@ package condition import ( "context" - "errors" "fmt" "strings" "sync" @@ -145,8 +144,11 @@ func (dc *Deleted) getClusterState() { dc.deleted.Store(false) return } - var statusErr *k8serrors.StatusError - if errors.As(err, &statusErr) { - dc.deleted.Store(k8serrors.IsNotFound(err)) + if k8serrors.IsNotFound(err) { + dc.deleted.Store(true) + } else { + dc.logger.LogMessage(checkerlog.WarningMessage( + "unexpected error while checking cluster state: " + err.Error(), + )) } } diff --git a/provider/pkg/await/condition/deleted_test.go b/provider/pkg/await/condition/deleted_test.go index ca7c09ac25..aad7f24f2d 100644 --- a/provider/pkg/await/condition/deleted_test.go +++ b/provider/pkg/await/condition/deleted_test.go @@ -52,6 +52,12 @@ func (get404) Get(ctx context.Context, name string, opts metav1.GetOptions, sub return nil, k8serrors.NewNotFound(schema.GroupResource{}, name) } +type get503 struct{} + +func (get503) Get(ctx context.Context, name string, opts metav1.GetOptions, sub ...string) (*unstructured.Unstructured, error) { + return nil, k8serrors.NewServiceUnavailable("boom") +} + type get200 struct{ obj *unstructured.Unstructured } func (g *get200) Get(context.Context, string, metav1.GetOptions, ...string) (*unstructured.Unstructured, error) { @@ -153,7 +159,7 @@ func TestDeleted(t *testing.T) { // words, we needed this when we weren't handling the sort of watch errors // Informers handle automatically. t.Run("times out with recovery", func(t *testing.T) { - getter := &getsequence{[]objectGetter{&get200{pod}, &get404{}}, 0} + getter := &getsequence{[]objectGetter{&get200{pod}, get404{}}, 0} ctx, cancel := context.WithCancel(context.Background()) cond, err := NewDeleted(ctx, Static(nil), getter, stdout{}, pod) @@ -166,4 +172,20 @@ func TestDeleted(t *testing.T) { assert.NoError(t, err) assert.True(t, done) }) + + t.Run("unexpected error", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + buf := &strings.Builder{} + cond, err := NewDeleted(ctx, Static(nil), get503{}, logbuf{buf}, pod) + assert.NoError(t, err) + + cancel() + cond.Range(nil) + + done, err := cond.Satisfied() + assert.NoError(t, err) + assert.False(t, done) + assert.Contains(t, buf.String(), "boom") + }) } diff --git a/provider/pkg/await/internal/awaiter.go b/provider/pkg/await/internal/awaiter.go index 272f66027a..97131b2001 100644 --- a/provider/pkg/await/internal/awaiter.go +++ b/provider/pkg/await/internal/awaiter.go @@ -18,7 +18,9 @@ import ( "context" "errors" "fmt" + "os" + checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/wait" @@ -27,6 +29,7 @@ import ( // Awaiter orchestrates a condition Satisfier and optional Observers. type Awaiter struct { + logger logger namespace string condition condition.Satisfier observers []condition.Observer @@ -34,7 +37,7 @@ type Awaiter struct { // NewAwaiter creates a new Awaiter with the given options. func NewAwaiter(options ...awaiterOption) (*Awaiter, error) { - ea := &Awaiter{} + ea := &Awaiter{logger: stdout{}} for _, opt := range options { opt.apply(ea) } @@ -52,7 +55,9 @@ func (aw *Awaiter) Await(ctx context.Context) error { for _, o := range aw.observers { go func(o condition.Observer) { o.Range(func(e watch.Event) bool { - _ = o.Observe(e) + if err := o.Observe(e); err != nil { + aw.logger.LogMessage(checkerlog.WarningMessage("observe error: " + err.Error())) + } return true }) }(o) @@ -98,8 +103,8 @@ type awaiterOption interface { type withConditionOption struct{ condition condition.Satisfier } -func (o withConditionOption) apply(ea *Awaiter) { - ea.condition = o.condition +func (o withConditionOption) apply(aw *Awaiter) { + aw.condition = o.condition } // WithCondition sets the condition.Satisfier used by the Awaiter. This is @@ -110,8 +115,8 @@ func WithCondition(c condition.Satisfier) awaiterOption { type withObserversOption struct{ observers []condition.Observer } -func (o withObserversOption) apply(ea *Awaiter) { - ea.observers = o.observers +func (o withObserversOption) apply(aw *Awaiter) { + aw.observers = o.observers } // WithObservers attaches optional condition.Observers to the Awaiter. This @@ -123,8 +128,19 @@ func WithObservers(obs ...condition.Observer) awaiterOption { type withNamespaceOption struct{ ns string } -func (o withNamespaceOption) apply(ea *Awaiter) { - ea.namespace = o.ns +func (o withNamespaceOption) apply(aw *Awaiter) { + aw.namespace = o.ns +} + +// WithLogger uses the provided logger. If not provided stdout is used. +func WithLogger(l logger) awaiterOption { + return withLoggerOption{l} +} + +type withLoggerOption struct{ l logger } + +func (o withLoggerOption) apply(aw *Awaiter) { + aw.logger = o.l } // WithNamespace configures the namespace used by Informers spawned by the @@ -146,3 +162,14 @@ func (e errObject) Object() *unstructured.Unstructured { func (e errObject) Unwrap() error { return e.error } + +type logger interface { + LogMessage(checkerlog.Message) +} + +// stdout logs messages to stdout. +type stdout struct{} + +func (stdout) LogMessage(m checkerlog.Message) { + _, _ = os.Stdout.WriteString(m.S) +} From d190a0ec870f1ca3fc5f92f0de7683ad31265314 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Fri, 9 Aug 2024 11:48:20 -0700 Subject: [PATCH 12/15] update log types --- provider/pkg/await/condition/condition.go | 18 +++++++----------- provider/pkg/await/condition/deleted.go | 14 +++++++------- provider/pkg/await/condition/deleted_test.go | 11 +++++++---- provider/pkg/await/condition/immediate.go | 4 ++-- provider/pkg/await/internal/awaiter.go | 14 +++++++++----- 5 files changed, 32 insertions(+), 29 deletions(-) diff --git a/provider/pkg/await/condition/condition.go b/provider/pkg/await/condition/condition.go index 67a22f0aa5..0c57e009b2 100644 --- a/provider/pkg/await/condition/condition.go +++ b/provider/pkg/await/condition/condition.go @@ -18,9 +18,8 @@ import ( "context" "fmt" "io" - "os" - checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" + "github.com/pulumi/pulumi/sdk/v3/go/common/diag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -39,22 +38,19 @@ type Satisfier interface { // logger allows injecting custom log behavior. type logger interface { - LogMessage(checkerlog.Message) + Log(diag.Severity, string) + LogStatus(diag.Severity, string) } // logbuf logs messages to an io.Writter. type logbuf struct{ w io.Writer } -func (l logbuf) LogMessage(m checkerlog.Message) { - fmt.Fprint(l.w, m.String()+"\n") +func (l logbuf) Log(sev diag.Severity, msg string) { + fmt.Fprintln(l.w, sev, msg) } -// stdout logs messages to stdout. -type stdout struct{} - -func (stdout) LogMessage(m checkerlog.Message) { - l := logbuf{os.Stdout} - l.LogMessage(m) +func (l logbuf) LogStatus(sev diag.Severity, msg string) { + l.Log(sev, msg) } // objectGetter allows injecting custom client behavior for fetching objects diff --git a/provider/pkg/await/condition/deleted.go b/provider/pkg/await/condition/deleted.go index 4530a8d749..2d4bcf69cc 100644 --- a/provider/pkg/await/condition/deleted.go +++ b/provider/pkg/await/condition/deleted.go @@ -21,7 +21,7 @@ import ( "sync" "sync/atomic" - checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" + "github.com/pulumi/pulumi/sdk/v3/go/common/diag" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -98,9 +98,9 @@ func (dc *Deleted) Range(yield func(watch.Event) bool) { // Let the user know we might be blocked if the object has finalizers. // https://github.com/pulumi/pulumi-kubernetes/issues/1418 finalizers := dc.Object().GetFinalizers() - dc.logger.LogMessage(checkerlog.WarningMessage( + dc.logger.Log(diag.Warning, fmt.Sprintf("finalizers might be preventing deletion (%s)", strings.Join(finalizers, ", ")), - )) + ) } // Observe watches for Deleted events. @@ -121,7 +121,7 @@ func (dc *Deleted) Satisfied() (bool, error) { uns := dc.Object() r, _ := status.Compute(uns) if r.Message != "" { - dc.logger.LogMessage(checkerlog.StatusMessage(r.Message)) + dc.logger.LogStatus(diag.Info, r.Message) } return false, nil @@ -147,8 +147,8 @@ func (dc *Deleted) getClusterState() { if k8serrors.IsNotFound(err) { dc.deleted.Store(true) } else { - dc.logger.LogMessage(checkerlog.WarningMessage( - "unexpected error while checking cluster state: " + err.Error(), - )) + dc.logger.LogStatus(diag.Warning, + "unexpected error while checking cluster state: "+err.Error(), + ) } } diff --git a/provider/pkg/await/condition/deleted_test.go b/provider/pkg/await/condition/deleted_test.go index aad7f24f2d..ddc3fa4583 100644 --- a/provider/pkg/await/condition/deleted_test.go +++ b/provider/pkg/await/condition/deleted_test.go @@ -16,6 +16,7 @@ package condition import ( "context" + "os" "strings" "testing" @@ -75,11 +76,13 @@ func (g *getsequence) Get(ctx context.Context, name string, opts metav1.GetOptio } func TestDeleted(t *testing.T) { + stdout := logbuf{os.Stdout} + t.Run("already deleted", func(t *testing.T) { ctx := context.Background() getter := get404{} - cond, err := NewDeleted(ctx, Static(nil), getter, stdout{}, pod) + cond, err := NewDeleted(ctx, Static(nil), getter, stdout, pod) assert.NoError(t, err) cond.Range(nil) @@ -95,7 +98,7 @@ func TestDeleted(t *testing.T) { getter := &get200{pod} source := Static(make(chan watch.Event, 1)) - cond, err := NewDeleted(ctx, source, getter, stdout{}, pod) + cond, err := NewDeleted(ctx, source, getter, stdout, pod) assert.NoError(t, err) seen := make(chan struct{}) @@ -125,7 +128,7 @@ func TestDeleted(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - cond, err := NewDeleted(ctx, Static(nil), getter, stdout{}, pod) + cond, err := NewDeleted(ctx, Static(nil), getter, stdout, pod) assert.NoError(t, err) cond.Range(nil) @@ -162,7 +165,7 @@ func TestDeleted(t *testing.T) { getter := &getsequence{[]objectGetter{&get200{pod}, get404{}}, 0} ctx, cancel := context.WithCancel(context.Background()) - cond, err := NewDeleted(ctx, Static(nil), getter, stdout{}, pod) + cond, err := NewDeleted(ctx, Static(nil), getter, stdout, pod) assert.NoError(t, err) cancel() diff --git a/provider/pkg/await/condition/immediate.go b/provider/pkg/await/condition/immediate.go index 0e2649df25..a1bf35ae29 100644 --- a/provider/pkg/await/condition/immediate.go +++ b/provider/pkg/await/condition/immediate.go @@ -18,7 +18,7 @@ import ( "context" "sync/atomic" - checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" + "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" ) @@ -49,7 +49,7 @@ func (Immediate) Range(func(watch.Event) bool) {} // Satisfied always returns true for Immediately conditions. func (i Immediate) Satisfied() (bool, error) { if i.logger != nil { - i.logger.LogMessage(checkerlog.StatusMessage("Skipping await logic")) + i.logger.LogStatus(diag.Info, "Skipping await logic") } return true, nil } diff --git a/provider/pkg/await/internal/awaiter.go b/provider/pkg/await/internal/awaiter.go index 97131b2001..d6dab2da56 100644 --- a/provider/pkg/await/internal/awaiter.go +++ b/provider/pkg/await/internal/awaiter.go @@ -20,8 +20,8 @@ import ( "fmt" "os" - checkerlog "github.com/pulumi/cloud-ready-checks/pkg/checker/logging" "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -56,7 +56,7 @@ func (aw *Awaiter) Await(ctx context.Context) error { go func(o condition.Observer) { o.Range(func(e watch.Event) bool { if err := o.Observe(e); err != nil { - aw.logger.LogMessage(checkerlog.WarningMessage("observe error: " + err.Error())) + aw.logger.LogStatus(diag.Warning, "observe error: "+err.Error()) } return true }) @@ -164,12 +164,16 @@ func (e errObject) Unwrap() error { } type logger interface { - LogMessage(checkerlog.Message) + Log(diag.Severity, string) + LogStatus(diag.Severity, string) } // stdout logs messages to stdout. type stdout struct{} -func (stdout) LogMessage(m checkerlog.Message) { - _, _ = os.Stdout.WriteString(m.S) +func (stdout) Log(sev diag.Severity, msg string) { + _, _ = os.Stdout.WriteString(fmt.Sprintf("%s: %s\n", sev, msg)) +} +func (s stdout) LogStatus(sev diag.Severity, msg string) { + s.Log(sev, msg) } From 85b8f50ac7b038f578ce9aa0c98cf71c313ebe12 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Fri, 9 Aug 2024 12:45:37 -0700 Subject: [PATCH 13/15] preserve existing skipawait behavior --- provider/pkg/metadata/annotations.go | 41 ++++++++++++++++++- provider/pkg/metadata/annotations_test.go | 17 +++++++- .../java/testdata/await/skipawait/Pulumi.yaml | 8 ++-- .../await/skipawait/step2/Pulumi.yaml | 8 ++-- 4 files changed, 62 insertions(+), 12 deletions(-) diff --git a/provider/pkg/metadata/annotations.go b/provider/pkg/metadata/annotations.go index 6d821b7e9a..c545fb5474 100644 --- a/provider/pkg/metadata/annotations.go +++ b/provider/pkg/metadata/annotations.go @@ -23,6 +23,12 @@ import ( "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic" + + appsv1 "k8s.io/api/apps/v1" + appsv1beta1 "k8s.io/api/apps/v1beta1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" ) const ( @@ -121,7 +127,7 @@ func GetDeletedCondition( inputs *unstructured.Unstructured, obj *unstructured.Unstructured, ) (condition.Satisfier, error) { - if IsAnnotationTrue(inputs, AnnotationSkipAwait) { + if IsAnnotationTrue(inputs, AnnotationSkipAwait) && allowsSkipAwaitWithDelete(inputs) { return condition.NewImmediate(logger, obj), nil } getter, err := clientset.ResourceClientForObject(obj) @@ -131,6 +137,39 @@ func GetDeletedCondition( return condition.NewDeleted(ctx, source, getter, logger, obj) } +// allowsSkipDelete returns true for legacy types which support buggy skipAwait behavior during delete. +// See also: +// https://github.com/pulumi/pulumi-kubernetes/issues/1232 +// https://github.com/pulumi/pulumi-kubernetes/issues/3154 +func allowsSkipAwaitWithDelete(inputs *unstructured.Unstructured) bool { + +} + +// allowsSkipDelete returns true for legacy types which support buggy skipAwait +// behavior during delete. +// See also: +// https://github.com/pulumi/pulumi-kubernetes/issues/1232 +// https://github.com/pulumi/pulumi-kubernetes/issues/3154 +func allowsSkipAwaitWithDelete(inputs *unstructured.Unstructured) bool { + switch inputs.GroupVersionKind() { + case corev1.SchemeGroupVersion.WithKind("Namespace"), + corev1.SchemeGroupVersion.WithKind("Pod"), + corev1.SchemeGroupVersion.WithKind("ReplicationController"), + appsv1.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1beta1.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1beta2.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1.SchemeGroupVersion.WithKind("Deployment"), + appsv1beta1.SchemeGroupVersion.WithKind("Deployment"), + appsv1beta2.SchemeGroupVersion.WithKind("Deployment"), + appsv1.SchemeGroupVersion.WithKind("StatefulSet"), + appsv1beta1.SchemeGroupVersion.WithKind("StatefulSet"), + appsv1beta2.SchemeGroupVersion.WithKind("StatefulSet"), + batchv1.SchemeGroupVersion.WithKind("Job"): + return true + } + return false +} + func isComputedValue(v any) bool { _, isComputed := v.(resource.Computed) return isComputed diff --git a/provider/pkg/metadata/annotations_test.go b/provider/pkg/metadata/annotations_test.go index 3d03b9ee18..e4a8e21827 100644 --- a/provider/pkg/metadata/annotations_test.go +++ b/provider/pkg/metadata/annotations_test.go @@ -94,7 +94,7 @@ func TestGetDeletedCondition(t *testing.T) { want condition.Satisfier }{ { - name: "skipAwait=true", + name: "skipAwait=true doesn't affect generic resources", inputs: &unstructured.Unstructured{ Object: map[string]any{ "metadata": map[string]any{ @@ -104,6 +104,21 @@ func TestGetDeletedCondition(t *testing.T) { }, }, }, + want: &condition.Deleted{}, + }, + { + name: "skipAwait=true does affect legacy resources", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "true", + }, + }, + }, + }, want: condition.Immediate{}, }, { diff --git a/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml b/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml index f00b8f39b9..e37031bf0a 100644 --- a/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml +++ b/tests/sdk/java/testdata/await/skipawait/Pulumi.yaml @@ -3,18 +3,16 @@ runtime: yaml description: | Tests the skipAwait annotation: - A slow-to-start deployment tests create/update/read. - - Delete is tested with a config map with a stuck finalizer. + - Delete is tested by a namespace with a stuck finalizer. resources: - stuck-config: - type: kubernetes:core/v1:ConfigMap + stuck-namespace: + type: kubernetes:core/v1:Namespace properties: metadata: finalizers: - pulumi.com/stuck annotations: pulumi.com/skipAwait: "true" - data: - foo: bar slow-pod: type: kubernetes:core/v1:Pod diff --git a/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml b/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml index a7de6d62f2..3c17e3d817 100644 --- a/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml +++ b/tests/sdk/java/testdata/await/skipawait/step2/Pulumi.yaml @@ -3,18 +3,16 @@ runtime: yaml description: | Tests the skipAwait annotation: - A slow-to-start deployment tests create/update/read. - - Delete is tested with a config map with a stuck finalizer. + - Delete is tested by a namespace with a stuck finalizer. resources: - stuck-config: - type: kubernetes:core/v1:ConfigMap + stuck-namespace: + type: kubernetes:core/v1:Namespace properties: metadata: finalizers: - pulumi.com/stuck annotations: pulumi.com/skipAwait: "true" - data: - foo: bar slow-pod: type: kubernetes:core/v1:Pod From 9f3fd61b6a307277e9a79368c113c3aeb156fd13 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Fri, 9 Aug 2024 13:26:12 -0700 Subject: [PATCH 14/15] move metadata helper --- provider/pkg/await/await.go | 2 +- provider/pkg/metadata/annotations.go | 66 ------------------ provider/pkg/metadata/annotations_test.go | 83 ---------------------- provider/pkg/metadata/overrides.go | 57 +++++++++++++++ provider/pkg/metadata/overrides_test.go | 84 +++++++++++++++++++++++ 5 files changed, 142 insertions(+), 150 deletions(-) diff --git a/provider/pkg/await/await.go b/provider/pkg/await/await.go index 7e2cf40f02..38ed2165f2 100644 --- a/provider/pkg/await/await.go +++ b/provider/pkg/await/await.go @@ -818,7 +818,7 @@ func Deletion(c DeleteConfig) error { source := condition.NewDynamicSource(ctx, c.ClientSet, c.Outputs.GetNamespace()) // Determine the condition to wait for. - deleted, err := metadata.GetDeletedCondition(ctx, source, c.ClientSet, c.DedupLogger, c.Inputs, c.Outputs) + deleted, err := metadata.DeletedCondition(ctx, source, c.ClientSet, c.DedupLogger, c.Inputs, c.Outputs) if err != nil { return err } diff --git a/provider/pkg/metadata/annotations.go b/provider/pkg/metadata/annotations.go index c545fb5474..3131588306 100644 --- a/provider/pkg/metadata/annotations.go +++ b/provider/pkg/metadata/annotations.go @@ -15,20 +15,11 @@ package metadata import ( - "context" "strings" - "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" - "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/logging" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/dynamic" - - appsv1 "k8s.io/api/apps/v1" - appsv1beta1 "k8s.io/api/apps/v1beta1" - appsv1beta2 "k8s.io/api/apps/v1beta2" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" ) const ( @@ -113,63 +104,6 @@ func GetAnnotationValue(obj *unstructured.Unstructured, key string) string { return annotations[key] } -// GetDeletedCondition inspects the object's annotations and returns a -// condition.Satisfier appropriate for using when awaiting deletion. -// -// The "inputs" parameter is the source of truth for user-provided annotations, -// but it is not guaranteed to be named. The "obj" parameter should be used for -// conditions. -func GetDeletedCondition( - ctx context.Context, - source condition.Source, - clientset clientGetter, - logger *logging.DedupLogger, - inputs *unstructured.Unstructured, - obj *unstructured.Unstructured, -) (condition.Satisfier, error) { - if IsAnnotationTrue(inputs, AnnotationSkipAwait) && allowsSkipAwaitWithDelete(inputs) { - return condition.NewImmediate(logger, obj), nil - } - getter, err := clientset.ResourceClientForObject(obj) - if err != nil { - return nil, err - } - return condition.NewDeleted(ctx, source, getter, logger, obj) -} - -// allowsSkipDelete returns true for legacy types which support buggy skipAwait behavior during delete. -// See also: -// https://github.com/pulumi/pulumi-kubernetes/issues/1232 -// https://github.com/pulumi/pulumi-kubernetes/issues/3154 -func allowsSkipAwaitWithDelete(inputs *unstructured.Unstructured) bool { - -} - -// allowsSkipDelete returns true for legacy types which support buggy skipAwait -// behavior during delete. -// See also: -// https://github.com/pulumi/pulumi-kubernetes/issues/1232 -// https://github.com/pulumi/pulumi-kubernetes/issues/3154 -func allowsSkipAwaitWithDelete(inputs *unstructured.Unstructured) bool { - switch inputs.GroupVersionKind() { - case corev1.SchemeGroupVersion.WithKind("Namespace"), - corev1.SchemeGroupVersion.WithKind("Pod"), - corev1.SchemeGroupVersion.WithKind("ReplicationController"), - appsv1.SchemeGroupVersion.WithKind("DaemonSet"), - appsv1beta1.SchemeGroupVersion.WithKind("DaemonSet"), - appsv1beta2.SchemeGroupVersion.WithKind("DaemonSet"), - appsv1.SchemeGroupVersion.WithKind("Deployment"), - appsv1beta1.SchemeGroupVersion.WithKind("Deployment"), - appsv1beta2.SchemeGroupVersion.WithKind("Deployment"), - appsv1.SchemeGroupVersion.WithKind("StatefulSet"), - appsv1beta1.SchemeGroupVersion.WithKind("StatefulSet"), - appsv1beta2.SchemeGroupVersion.WithKind("StatefulSet"), - batchv1.SchemeGroupVersion.WithKind("Job"): - return true - } - return false -} - func isComputedValue(v any) bool { _, isComputed := v.(resource.Computed) return isComputed diff --git a/provider/pkg/metadata/annotations_test.go b/provider/pkg/metadata/annotations_test.go index e4a8e21827..f16be08f30 100644 --- a/provider/pkg/metadata/annotations_test.go +++ b/provider/pkg/metadata/annotations_test.go @@ -15,15 +15,11 @@ package metadata import ( - "context" "testing" - "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/client-go/dynamic" ) func TestSetAnnotation(t *testing.T) { @@ -85,82 +81,3 @@ func TestSetAnnotation(t *testing.T) { }) } } - -func TestGetDeletedCondition(t *testing.T) { - tests := []struct { - name string - inputs *unstructured.Unstructured - obj *unstructured.Unstructured - want condition.Satisfier - }{ - { - name: "skipAwait=true doesn't affect generic resources", - inputs: &unstructured.Unstructured{ - Object: map[string]any{ - "metadata": map[string]any{ - "annotations": map[string]any{ - AnnotationSkipAwait: "true", - }, - }, - }, - }, - want: &condition.Deleted{}, - }, - { - name: "skipAwait=true does affect legacy resources", - inputs: &unstructured.Unstructured{ - Object: map[string]any{ - "apiVersion": "v1", - "kind": "Namespace", - "metadata": map[string]any{ - "annotations": map[string]any{ - AnnotationSkipAwait: "true", - }, - }, - }, - }, - want: condition.Immediate{}, - }, - { - name: "skipAwait=false", - inputs: &unstructured.Unstructured{ - Object: map[string]any{ - "metadata": map[string]any{ - "annotations": map[string]any{ - AnnotationSkipAwait: "false", - }, - }, - }, - }, - want: &condition.Deleted{}, - }, - { - name: "skipAwait unset", - inputs: &unstructured.Unstructured{ - Object: map[string]any{ - "metadata": map[string]any{}, - }, - }, - want: &condition.Deleted{}, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - obj := tt.obj - if obj == nil { - obj = tt.inputs - } - condition, err := GetDeletedCondition(context.Background(), nil, noopClientGetter{}, nil, tt.inputs, obj) - require.NoError(t, err) - - assert.IsType(t, tt.want, condition) - }) - } -} - -type noopClientGetter struct{} - -func (noopClientGetter) ResourceClientForObject(*unstructured.Unstructured) (dynamic.ResourceInterface, error) { - return nil, nil -} diff --git a/provider/pkg/metadata/overrides.go b/provider/pkg/metadata/overrides.go index 9cac93d3c1..a00ce4a39b 100644 --- a/provider/pkg/metadata/overrides.go +++ b/provider/pkg/metadata/overrides.go @@ -15,10 +15,18 @@ package metadata import ( + "context" "strconv" "strings" "time" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/logging" + appsv1 "k8s.io/api/apps/v1" + appsv1beta1 "k8s.io/api/apps/v1beta1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -64,3 +72,52 @@ func DeletionPropagation(obj *unstructured.Unstructured) metav1.DeletionPropagat return metav1.DeletePropagationForeground } } + +// DeletedCondition inspects the object's annotations and returns a +// condition.Satisfier appropriate for using when awaiting deletion. +// +// The "inputs" parameter is the source of truth for user-provided annotations, +// but it is not guaranteed to be named. The "obj" parameter should be used for +// conditions. +func DeletedCondition( + ctx context.Context, + source condition.Source, + clientset clientGetter, + logger *logging.DedupLogger, + inputs *unstructured.Unstructured, + obj *unstructured.Unstructured, +) (condition.Satisfier, error) { + if IsAnnotationTrue(inputs, AnnotationSkipAwait) && allowsSkipAwaitWithDelete(inputs) { + return condition.NewImmediate(logger, obj), nil + } + getter, err := clientset.ResourceClientForObject(obj) + if err != nil { + return nil, err + } + return condition.NewDeleted(ctx, source, getter, logger, obj) +} + +// allowsSkipDelete returns true for legacy types which support buggy skipAwait +// behavior during delete. +// See also: +// https://github.com/pulumi/pulumi-kubernetes/issues/1232 +// https://github.com/pulumi/pulumi-kubernetes/issues/3154 +func allowsSkipAwaitWithDelete(inputs *unstructured.Unstructured) bool { + switch inputs.GroupVersionKind() { + case corev1.SchemeGroupVersion.WithKind("Namespace"), + corev1.SchemeGroupVersion.WithKind("Pod"), + corev1.SchemeGroupVersion.WithKind("ReplicationController"), + appsv1.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1beta1.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1beta2.SchemeGroupVersion.WithKind("DaemonSet"), + appsv1.SchemeGroupVersion.WithKind("Deployment"), + appsv1beta1.SchemeGroupVersion.WithKind("Deployment"), + appsv1beta2.SchemeGroupVersion.WithKind("Deployment"), + appsv1.SchemeGroupVersion.WithKind("StatefulSet"), + appsv1beta1.SchemeGroupVersion.WithKind("StatefulSet"), + appsv1beta2.SchemeGroupVersion.WithKind("StatefulSet"), + batchv1.SchemeGroupVersion.WithKind("Job"): + return true + } + return false +} diff --git a/provider/pkg/metadata/overrides_test.go b/provider/pkg/metadata/overrides_test.go index 09b35b9a79..0e336388c5 100644 --- a/provider/pkg/metadata/overrides_test.go +++ b/provider/pkg/metadata/overrides_test.go @@ -16,12 +16,17 @@ package metadata import ( + "context" "reflect" "testing" "time" + "github.com/pulumi/pulumi-kubernetes/provider/v4/pkg/await/condition" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" ) func TestSkipAwaitLogic(t *testing.T) { @@ -127,3 +132,82 @@ func TestDeletionPropagation(t *testing.T) { }) } } + +func TestDeletedCondition(t *testing.T) { + tests := []struct { + name string + inputs *unstructured.Unstructured + obj *unstructured.Unstructured + want condition.Satisfier + }{ + { + name: "skipAwait=true doesn't affect generic resources", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "true", + }, + }, + }, + }, + want: &condition.Deleted{}, + }, + { + name: "skipAwait=true does affect legacy resources", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "true", + }, + }, + }, + }, + want: condition.Immediate{}, + }, + { + name: "skipAwait=false", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{ + "annotations": map[string]any{ + AnnotationSkipAwait: "false", + }, + }, + }, + }, + want: &condition.Deleted{}, + }, + { + name: "skipAwait unset", + inputs: &unstructured.Unstructured{ + Object: map[string]any{ + "metadata": map[string]any{}, + }, + }, + want: &condition.Deleted{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + obj := tt.obj + if obj == nil { + obj = tt.inputs + } + condition, err := DeletedCondition(context.Background(), nil, noopClientGetter{}, nil, tt.inputs, obj) + require.NoError(t, err) + + assert.IsType(t, tt.want, condition) + }) + } +} + +type noopClientGetter struct{} + +func (noopClientGetter) ResourceClientForObject(*unstructured.Unstructured) (dynamic.ResourceInterface, error) { + return nil, nil +} From 229a3851a411ae815c02e15c95abe023cbfc9e24 Mon Sep 17 00:00:00 2001 From: Bryce Lampe Date: Wed, 21 Aug 2024 13:48:32 -0700 Subject: [PATCH 15/15] fixup --- provider/pkg/await/condition/deleted.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/provider/pkg/await/condition/deleted.go b/provider/pkg/await/condition/deleted.go index 2d4bcf69cc..0af8d6c9fa 100644 --- a/provider/pkg/await/condition/deleted.go +++ b/provider/pkg/await/condition/deleted.go @@ -73,7 +73,7 @@ func (dc *Deleted) Range(yield func(watch.Event) bool) { dc.observer.Range(yield) }() - dc.getClusterState() + dc.refreshClusterState() if dc.deleted.Load() { // Already deleted, nothing more to do. Our informer will get cleaned up // when its context is canceled. @@ -90,7 +90,7 @@ func (dc *Deleted) Range(yield func(watch.Event) bool) { // Attempt one last lookup if the object still exists. (This is legacy // behavior that might be unnecessary since we're using Informers instead of // Watches now.) - dc.getClusterState() + dc.refreshClusterState() if dc.deleted.Load() { return } @@ -132,9 +132,9 @@ func (dc *Deleted) Object() *unstructured.Unstructured { return dc.observer.Object() } -// getClusterState performs a GET against the cluster and updates state to +// refreshClusterState performs a GET against the cluster and updates state to // reflect whether the object still exists or not. -func (dc *Deleted) getClusterState() { +func (dc *Deleted) refreshClusterState() { // Our context might be closed, but we still want to issue this request // even if we're shutting down. ctx := context.WithoutCancel(dc.ctx)