From cc3b9f7319c79885ced93b09cfdf1f72cd775f57 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 29 May 2024 08:47:07 +0200 Subject: [PATCH 1/8] Add Local() to the Applier's resource.Builder This will prevent the Builder to try to get a REST mapping for all the resources from the API server, which will fail in case of custom resources whose CRDs weren't applied yet. The error returned from the builder looked like this: > unable to build resources: resource mapping not found for name: "fooResource" namespace: "default" from "/var/lib/k0s/manifests/foo/resources.yaml": no matches for kind "Foo" in version "example.com/v1" > ensure CRDs are installed first Signed-off-by: Tom Wieczorek --- pkg/applier/applier.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/applier/applier.go b/pkg/applier/applier.go index a0239070917a..f4070befa870 100644 --- a/pkg/applier/applier.go +++ b/pkg/applier/applier.go @@ -149,6 +149,7 @@ func (a *Applier) parseFiles(files []string) ([]*unstructured.Unstructured, erro } objects, err := resource.NewBuilder(a.restClientGetter). + Local(). // don't fail on unknown CRDs Unstructured(). Path(false, files...). Flatten(). From 8b5563639ad42292fcad223a572b31fda2bce03f Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 29 May 2024 11:23:24 +0200 Subject: [PATCH 2/8] Only use public APIs in Applier unit test This makes it easier to refactor the Applier. Signed-off-by: Tom Wieczorek --- pkg/applier/applier_test.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/applier/applier_test.go b/pkg/applier/applier_test.go index 3dbd675130c2..f581acf3c021 100644 --- a/pkg/applier/applier_test.go +++ b/pkg/applier/applier_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package applier +package applier_test import ( "context" @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" kubeutil "github.com/k0sproject/k0s/internal/testutil" + "github.com/k0sproject/k0s/pkg/applier" ) func TestApplierAppliesAllManifestsInADirectory(t *testing.T) { @@ -116,40 +117,40 @@ spec: }, } - a := NewApplier(dir, fakes) + a := applier.NewApplier(dir, fakes) ctx := context.Background() err := a.Apply(ctx) assert.NoError(t, err) gv, _ := schema.ParseResourceArg("configmaps.v1.") - r, err := a.client.Resource(*gv).Namespace("kube-system").Get(ctx, "applier-test", metav1.GetOptions{}) + r, err := fakes.DynamicClient.Resource(*gv).Namespace("kube-system").Get(ctx, "applier-test", metav1.GetOptions{}) if assert.NoError(t, err) { assert.Equal(t, "applier", r.GetLabels()["component"]) } podgv, _ := schema.ParseResourceArg("pods.v1.") - r, err = a.client.Resource(*podgv).Namespace("kube-system").Get(ctx, "applier-test", metav1.GetOptions{}) + r, err = fakes.DynamicClient.Resource(*podgv).Namespace("kube-system").Get(ctx, "applier-test", metav1.GetOptions{}) if assert.NoError(t, err) { assert.Equal(t, "Pod", r.GetKind()) assert.Equal(t, "applier", r.GetLabels()["component"]) } deployGV, _ := schema.ParseResourceArg("deployments.v1.apps") - _, err = a.client.Resource(*deployGV).Namespace("kube-system").Get(ctx, "nginx", metav1.GetOptions{}) + _, err = fakes.DynamicClient.Resource(*deployGV).Namespace("kube-system").Get(ctx, "nginx", metav1.GetOptions{}) assert.NoError(t, err) // Attempt to delete the stack with a different applier - a2 := NewApplier(dir, fakes) + a2 := applier.NewApplier(dir, fakes) assert.NoError(t, a2.Delete(ctx)) // Check that the resources are deleted - _, err = a.client.Resource(*gv).Namespace("kube-system").Get(ctx, "applier-test", metav1.GetOptions{}) + _, err = fakes.DynamicClient.Resource(*gv).Namespace("kube-system").Get(ctx, "applier-test", metav1.GetOptions{}) assert.True(t, errors.IsNotFound(err)) - _, err = a.client.Resource(*podgv).Namespace("kube-system").Get(ctx, "applier-test", metav1.GetOptions{}) + _, err = fakes.DynamicClient.Resource(*podgv).Namespace("kube-system").Get(ctx, "applier-test", metav1.GetOptions{}) assert.True(t, errors.IsNotFound(err)) - _, err = a.client.Resource(*deployGV).Namespace("kube-system").Get(ctx, "nginx", metav1.GetOptions{}) + _, err = fakes.DynamicClient.Resource(*deployGV).Namespace("kube-system").Get(ctx, "nginx", metav1.GetOptions{}) assert.True(t, errors.IsNotFound(err)) gvNS, _ := schema.ParseResourceArg("namespaces.v1.") - _, err = a.client.Resource(*gvNS).Get(ctx, "kube-system", metav1.GetOptions{}) + _, err = fakes.DynamicClient.Resource(*gvNS).Get(ctx, "kube-system", metav1.GetOptions{}) assert.True(t, errors.IsNotFound(err)) } From 653ce8b7078574019a86215ea4e3eebb000abdd3 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 29 May 2024 11:46:17 +0200 Subject: [PATCH 3/8] Reset the RESTMapper inside the Stack When applying a stack, the Stack knows best when it makes sense to reset the RESTMapper. Move the cache invalidation out of the Applier into the Stack, and trigger it whenever a REST mapping for a given kind was not found. Signed-off-by: Tom Wieczorek --- pkg/applier/applier.go | 1 - pkg/applier/stack.go | 44 +++++++++++++++++++++++++++++------------- 2 files changed, 31 insertions(+), 14 deletions(-) diff --git a/pkg/applier/applier.go b/pkg/applier/applier.go index f4070befa870..19700b35effe 100644 --- a/pkg/applier/applier.go +++ b/pkg/applier/applier.go @@ -118,7 +118,6 @@ func (a *Applier) Apply(ctx context.Context) error { err = stack.Apply(ctx, true) if err != nil { a.log.WithError(err).Warn("stack apply failed") - a.discoveryClient.Invalidate() } else { a.log.Debug("successfully applied stack") } diff --git a/pkg/applier/stack.go b/pkg/applier/stack.go index f557d5b3f5ce..f32142714ec7 100644 --- a/pkg/applier/stack.go +++ b/pkg/applier/stack.go @@ -20,6 +20,7 @@ import ( "context" "crypto/md5" "encoding/hex" + "errors" "fmt" "slices" "sync" @@ -36,6 +37,7 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/restmapper" + "k8s.io/utils/ptr" ) // Stack is a k8s resource bundle @@ -70,9 +72,9 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { for _, resource := range sortedResources { s.prepareResource(resource) - mapping, err := mapper.RESTMapping(resource.GroupVersionKind().GroupKind(), resource.GroupVersionKind().Version) + mapping, err := getRESTMapping(mapper, ptr.To(resource.GroupVersionKind())) if err != nil { - return fmt.Errorf("mapping error: %w", err) + return err } var drClient dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { @@ -124,7 +126,7 @@ func (s *Stack) keepResource(resource *unstructured.Unstructured) { s.keepResources = append(s.keepResources, resourceID) } -func (s *Stack) prune(ctx context.Context, mapper *restmapper.DeferredDiscoveryRESTMapper) error { +func (s *Stack) prune(ctx context.Context, mapper meta.ResettableRESTMapper) error { pruneableResources, err := s.findPruneableResources(ctx, mapper) if err != nil { return err @@ -170,7 +172,7 @@ var ignoredResources = []string{ "discovery.k8s.io/v1:EndpointSlice", } -func (s *Stack) findPruneableResources(ctx context.Context, mapper *restmapper.DeferredDiscoveryRESTMapper) ([]unstructured.Unstructured, error) { +func (s *Stack) findPruneableResources(ctx context.Context, mapper meta.ResettableRESTMapper) ([]unstructured.Unstructured, error) { var pruneableResources []unstructured.Unstructured apiResourceLists, err := s.Discovery.ServerPreferredResources() if err != nil { @@ -237,7 +239,7 @@ func (s *Stack) findPruneableResources(ctx context.Context, mapper *restmapper.D return pruneableResources, nil } -func (s *Stack) deleteResource(ctx context.Context, mapper *restmapper.DeferredDiscoveryRESTMapper, resource unstructured.Unstructured) error { +func (s *Stack) deleteResource(ctx context.Context, mapper meta.ResettableRESTMapper, resource unstructured.Unstructured) error { propagationPolicy := metav1.DeletePropagationForeground drClient, err := s.clientForResource(mapper, resource) if err != nil { @@ -252,8 +254,28 @@ func (s *Stack) deleteResource(ctx context.Context, mapper *restmapper.DeferredD return nil } -func (s *Stack) clientForResource(mapper *restmapper.DeferredDiscoveryRESTMapper, resource unstructured.Unstructured) (dynamic.ResourceInterface, error) { - mapping, err := mapper.RESTMapping(resource.GroupVersionKind().GroupKind(), resource.GroupVersionKind().Version) +func getRESTMapping(mapper meta.ResettableRESTMapper, gvk *schema.GroupVersionKind) (*meta.RESTMapping, error) { + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + + // If the error indicates that the resource's kind is unknown, it may be + // that the corresponding CRD has already been applied, but the RESTMapper + // is still operating on stale cached data. Force a reset of the mapper and + // retry the call once. + var noMatchErr *meta.NoKindMatchError + if errors.As(err, &noMatchErr) { + mapper.Reset() + mapping, err = mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + } + + if err != nil { + return nil, fmt.Errorf("mapping error: %w", err) + } + + return mapping, nil +} + +func (s *Stack) clientForResource(mapper meta.ResettableRESTMapper, resource unstructured.Unstructured) (dynamic.ResourceInterface, error) { + mapping, err := getRESTMapping(mapper, ptr.To(resource.GroupVersionKind())) if err != nil { return nil, fmt.Errorf("mapping error: %w", err) } @@ -268,12 +290,8 @@ func (s *Stack) clientForResource(mapper *restmapper.DeferredDiscoveryRESTMapper return drClient, nil } -func (s *Stack) findPruneableResourceForGroupVersionKind(ctx context.Context, mapper *restmapper.DeferredDiscoveryRESTMapper, groupVersionKind *schema.GroupVersionKind) []unstructured.Unstructured { - groupKind := schema.GroupKind{ - Group: groupVersionKind.Group, - Kind: groupVersionKind.Kind, - } - mapping, _ := mapper.RESTMapping(groupKind, groupVersionKind.Version) +func (s *Stack) findPruneableResourceForGroupVersionKind(ctx context.Context, mapper meta.ResettableRESTMapper, groupVersionKind *schema.GroupVersionKind) []unstructured.Unstructured { + mapping, _ := getRESTMapping(mapper, groupVersionKind) // FIXME error handling... if mapping != nil { // We're running this with full admin rights, we should have capability to get stuff with single call From 0577e5c5b952ea32312b16d71d0a65d435bdfb4a Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 29 May 2024 13:09:30 +0200 Subject: [PATCH 4/8] Use the client factory directly in the Stack This reduces the boilerplate in the callers and allows for the Stack to use typed clients, if required. Signed-off-by: Tom Wieczorek --- pkg/applier/applier.go | 51 ++----------------- pkg/applier/stack.go | 49 +++++++++++++----- .../controller/workerconfig/reconciler.go | 11 +--- 3 files changed, 43 insertions(+), 68 deletions(-) diff --git a/pkg/applier/applier.go b/pkg/applier/applier.go index 19700b35effe..681f6e815c46 100644 --- a/pkg/applier/applier.go +++ b/pkg/applier/applier.go @@ -25,8 +25,6 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/cli-runtime/pkg/resource" - "k8s.io/client-go/discovery" - "k8s.io/client-go/dynamic" "github.com/sirupsen/logrus" ) @@ -43,10 +41,8 @@ type Applier struct { Name string Dir string - log *logrus.Entry - clientFactory kubernetes.ClientFactoryInterface - client dynamic.Interface - discoveryClient discovery.CachedDiscoveryInterface + log *logrus.Entry + clientFactory kubernetes.ClientFactoryInterface restClientGetter resource.RESTClientGetter } @@ -70,35 +66,8 @@ func NewApplier(dir string, kubeClientFactory kubernetes.ClientFactoryInterface) } } -func (a *Applier) lazyInit() error { - if a.client == nil { - c, err := a.clientFactory.GetDynamicClient() - if err != nil { - return err - } - - a.client = c - } - - if a.discoveryClient == nil { - c, err := a.clientFactory.GetDiscoveryClient() - if err != nil { - return err - } - - a.discoveryClient = c - } - - return nil -} - // Apply resources func (a *Applier) Apply(ctx context.Context) error { - err := a.lazyInit() - if err != nil { - return err - } - files, err := FindManifestFilesInDir(a.Dir) if err != nil { return err @@ -111,8 +80,7 @@ func (a *Applier) Apply(ctx context.Context) error { stack := Stack{ Name: a.Name, Resources: resources, - Client: a.client, - Discovery: a.discoveryClient, + Clients: a.clientFactory, } a.log.Debug("applying stack") err = stack.Apply(ctx, true) @@ -127,18 +95,9 @@ func (a *Applier) Apply(ctx context.Context) error { // Delete deletes the entire stack by applying it with empty set of resources func (a *Applier) Delete(ctx context.Context) error { - err := a.lazyInit() - if err != nil { - return err - } - stack := Stack{ - Name: a.Name, - Client: a.client, - Discovery: a.discoveryClient, - } + stack := Stack{Name: a.Name, Clients: a.clientFactory} logrus.Debugf("about to delete a stack %s with empty apply", a.Name) - err = stack.Apply(ctx, true) - return err + return stack.Apply(ctx, true) } func (a *Applier) parseFiles(files []string) ([]*unstructured.Unstructured, error) { diff --git a/pkg/applier/stack.go b/pkg/applier/stack.go index f32142714ec7..75750796a4e5 100644 --- a/pkg/applier/stack.go +++ b/pkg/applier/stack.go @@ -26,8 +26,8 @@ import ( "sync" "time" - jsonpatch "github.com/evanphx/json-patch" - "github.com/sirupsen/logrus" + "github.com/k0sproject/k0s/pkg/kubernetes" + apiErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +38,9 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/restmapper" "k8s.io/utils/ptr" + + jsonpatch "github.com/evanphx/json-patch" + "github.com/sirupsen/logrus" ) // Stack is a k8s resource bundle @@ -45,8 +48,7 @@ type Stack struct { Name string Resources []*unstructured.Unstructured keepResources []string - Client dynamic.Interface - Discovery discovery.CachedDiscoveryInterface + Clients kubernetes.ClientFactoryInterface log *logrus.Entry } @@ -56,8 +58,17 @@ type Stack struct { func (s *Stack) Apply(ctx context.Context, prune bool) error { s.log = logrus.WithField("stack", s.Name) + discoveryClient, err := s.Clients.GetDiscoveryClient() + if err != nil { + return err + } + dynamicClient, err := s.Clients.GetDynamicClient() + if err != nil { + return err + } + s.log.Debugf("applying with %d resources", len(s.Resources)) - mapper := restmapper.NewDeferredDiscoveryRESTMapper(s.Discovery) + mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoveryClient) var sortedResources []*unstructured.Unstructured for _, resource := range s.Resources { if resource.GetNamespace() == "" { @@ -78,9 +89,9 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { } var drClient dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - drClient = s.Client.Resource(mapping.Resource).Namespace(resource.GetNamespace()) + drClient = dynamicClient.Resource(mapping.Resource).Namespace(resource.GetNamespace()) } else { - drClient = s.Client.Resource(mapping.Resource) + drClient = dynamicClient.Resource(mapping.Resource) } serverResource, err := drClient.Get(ctx, resource.GetName(), metav1.GetOptions{}) if apiErrors.IsNotFound(err) { @@ -112,7 +123,6 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { s.keepResource(resource) } - var err error if prune { err = s.prune(ctx, mapper) } @@ -174,7 +184,13 @@ var ignoredResources = []string{ func (s *Stack) findPruneableResources(ctx context.Context, mapper meta.ResettableRESTMapper) ([]unstructured.Unstructured, error) { var pruneableResources []unstructured.Unstructured - apiResourceLists, err := s.Discovery.ServerPreferredResources() + + client, err := s.Clients.GetDiscoveryClient() + if err != nil { + return nil, err + } + + apiResourceLists, err := client.ServerPreferredResources() if err != nil { // Client-Go emits an error when an API service is registered but unimplemented. // We trap that error here but since the discovery client continues @@ -280,11 +296,16 @@ func (s *Stack) clientForResource(mapper meta.ResettableRESTMapper, resource uns return nil, fmt.Errorf("mapping error: %w", err) } + client, err := s.Clients.GetDynamicClient() + if err != nil { + return nil, err + } + var drClient dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { - drClient = s.Client.Resource(mapping.Resource).Namespace(resource.GetNamespace()) + drClient = client.Resource(mapping.Resource).Namespace(resource.GetNamespace()) } else { - drClient = s.Client.Resource(mapping.Resource) + drClient = client.Resource(mapping.Resource) } return drClient, nil @@ -294,8 +315,12 @@ func (s *Stack) findPruneableResourceForGroupVersionKind(ctx context.Context, ma mapping, _ := getRESTMapping(mapper, groupVersionKind) // FIXME error handling... if mapping != nil { + client, err := s.Clients.GetDynamicClient() + if err != nil { + return nil + } // We're running this with full admin rights, we should have capability to get stuff with single call - drClient := s.Client.Resource(mapping.Resource) + drClient := client.Resource(mapping.Resource) return s.getPruneableResources(ctx, drClient) } diff --git a/pkg/component/controller/workerconfig/reconciler.go b/pkg/component/controller/workerconfig/reconciler.go index 9122e8f4c281..61436767fd5c 100644 --- a/pkg/component/controller/workerconfig/reconciler.go +++ b/pkg/component/controller/workerconfig/reconciler.go @@ -133,18 +133,9 @@ func (r *Reconciler) Init(context.Context) error { clientFactory := r.clientFactory apply := func(ctx context.Context, resources resources) error { - dynamicClient, err := clientFactory.GetDynamicClient() - if err != nil { - return err - } - discoveryClient, err := clientFactory.GetDiscoveryClient() - if err != nil { - return err - } return (&applier.Stack{ Name: fmt.Sprintf("k0s-%s-%s", constant.WorkerConfigComponentName, constant.KubernetesMajorMinorVersion), - Client: dynamicClient, - Discovery: discoveryClient, + Clients: clientFactory, Resources: resources, }).Apply(ctx, true) } From 7e142e4dcf9a4aedaf39975dbd4bac008016e7b3 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 29 May 2024 09:09:19 +0200 Subject: [PATCH 5/8] Introduce watch.CRDs() Watching CRDs has more and more usages in k0s. Introducing a common helper makes sense and reduces the amount of copy/pasted type aliases. Signed-off-by: Tom Wieczorek --- inttest/common/autopilot/waitfor.go | 10 ++------ pkg/autopilot/controller/setup.go | 10 ++------ .../controller/etcd_member_reconciler.go | 9 ++----- pkg/kubernetes/watch/apiextensions.go | 25 +++++++++++++++++++ 4 files changed, 31 insertions(+), 23 deletions(-) create mode 100644 pkg/kubernetes/watch/apiextensions.go diff --git a/inttest/common/autopilot/waitfor.go b/inttest/common/autopilot/waitfor.go index 8b76b7ed93d5..51afb0b8ff52 100644 --- a/inttest/common/autopilot/waitfor.go +++ b/inttest/common/autopilot/waitfor.go @@ -56,16 +56,10 @@ func WaitForPlanState(ctx context.Context, client apclient.Interface, name strin // WaitForCRDByName waits until the CRD with the given name is established. func WaitForCRDByName(ctx context.Context, client extensionsclient.ApiextensionsV1Interface, name string) error { - // Some shortcuts for very long type names. - type ( - crd = extensionsv1.CustomResourceDefinition - crdList = extensionsv1.CustomResourceDefinitionList - ) - - return watch.FromClient[*crdList, crd](client.CustomResourceDefinitions()). + return watch.CRDs(client.CustomResourceDefinitions()). WithObjectName(fmt.Sprintf("%s.%s", name, apv1beta2.GroupName)). WithErrorCallback(common.RetryWatchErrors(logrus.Infof)). - Until(ctx, func(item *crd) (bool, error) { + Until(ctx, func(item *extensionsv1.CustomResourceDefinition) (bool, error) { for _, cond := range item.Status.Conditions { if cond.Type == extensionsv1.Established { return cond.Status == extensionsv1.ConditionTrue, nil diff --git a/pkg/autopilot/controller/setup.go b/pkg/autopilot/controller/setup.go index 0f8dc30c21ba..3ab9fc8a6227 100644 --- a/pkg/autopilot/controller/setup.go +++ b/pkg/autopilot/controller/setup.go @@ -212,12 +212,6 @@ func getControllerAPIAddress() (string, error) { // waitForControlNodesCRD waits until the controlnodes CRD is established for // max 2 minutes. func (sc *setupController) waitForControlNodesCRD(ctx context.Context, cf apcli.FactoryInterface) error { - // Some shortcuts for very long type names. - type ( - crd = extensionsv1.CustomResourceDefinition - crdList = extensionsv1.CustomResourceDefinitionList - ) - extClient, err := cf.GetExtensionClient() if err != nil { return fmt.Errorf("unable to obtain extensions client: %w", err) @@ -225,7 +219,7 @@ func (sc *setupController) waitForControlNodesCRD(ctx context.Context, cf apcli. ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) defer cancel() - return watch.FromClient[*crdList, crd](extClient.CustomResourceDefinitions()). + return watch.CRDs(extClient.CustomResourceDefinitions()). WithObjectName(fmt.Sprintf("controlnodes.%s", apv1beta2.GroupName)). WithErrorCallback(func(err error) (time.Duration, error) { if retryDelay, e := watch.IsRetryable(err); e == nil { @@ -237,7 +231,7 @@ func (sc *setupController) waitForControlNodesCRD(ctx context.Context, cf apcli. } return 0, err }). - Until(ctx, func(item *crd) (bool, error) { + Until(ctx, func(item *extensionsv1.CustomResourceDefinition) (bool, error) { for _, cond := range item.Status.Conditions { if cond.Type == extensionsv1.Established { return cond.Status == extensionsv1.ConditionTrue, nil diff --git a/pkg/component/controller/etcd_member_reconciler.go b/pkg/component/controller/etcd_member_reconciler.go index ea384017c055..171758c40e4d 100644 --- a/pkg/component/controller/etcd_member_reconciler.go +++ b/pkg/component/controller/etcd_member_reconciler.go @@ -125,11 +125,6 @@ func (e *EtcdMemberReconciler) Stop() error { return nil } -type ( - crd = extensionsv1.CustomResourceDefinition - crdList = extensionsv1.CustomResourceDefinitionList -) - func (e *EtcdMemberReconciler) waitForCRD(ctx context.Context) error { rc := e.clientFactory.GetRESTConfig() @@ -140,7 +135,7 @@ func (e *EtcdMemberReconciler) waitForCRD(ctx context.Context) error { var lastObservedVersion string log := logrus.WithField("component", "etcdMemberReconciler") log.Info("waiting to see EtcdMember CRD ready") - return watch.FromClient[*crdList, crd](ec.CustomResourceDefinitions()). + return watch.CRDs(ec.CustomResourceDefinitions()). WithObjectName(fmt.Sprintf("%s.%s", "etcdmembers", "etcd.k0sproject.io")). WithErrorCallback(func(err error) (time.Duration, error) { if retryAfter, e := watch.IsRetryable(err); e == nil { @@ -162,7 +157,7 @@ func (e *EtcdMemberReconciler) waitForCRD(ctx context.Context) error { ) return retryAfter, nil }). - Until(ctx, func(item *crd) (bool, error) { + Until(ctx, func(item *extensionsv1.CustomResourceDefinition) (bool, error) { lastObservedVersion = item.ResourceVersion for _, cond := range item.Status.Conditions { if cond.Type == extensionsv1.Established { diff --git a/pkg/kubernetes/watch/apiextensions.go b/pkg/kubernetes/watch/apiextensions.go new file mode 100644 index 000000000000..a6f1bf4e7dfd --- /dev/null +++ b/pkg/kubernetes/watch/apiextensions.go @@ -0,0 +1,25 @@ +/* +Copyright 2024 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" +) + +func CRDs(client Provider[*v1.CustomResourceDefinitionList]) *Watcher[v1.CustomResourceDefinition] { + return FromClient[*v1.CustomResourceDefinitionList, v1.CustomResourceDefinition](client) +} From 2f44ff0b1f9e742f0d21c7f7cade4aa4c10aa02a Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 29 May 2024 13:12:09 +0200 Subject: [PATCH 6/8] Wait for CRDs to be established when applying stacks As a result, the Applier requires fewer retries when applying stacks that contain both custom resources and their definitions. Signed-off-by: Tom Wieczorek --- pkg/applier/stack.go | 57 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 9 deletions(-) diff --git a/pkg/applier/stack.go b/pkg/applier/stack.go index 75750796a4e5..c54371f6859e 100644 --- a/pkg/applier/stack.go +++ b/pkg/applier/stack.go @@ -27,7 +27,10 @@ import ( "time" "github.com/k0sproject/k0s/pkg/kubernetes" + "github.com/k0sproject/k0s/pkg/kubernetes/watch" + extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + extensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" apiErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -95,10 +98,14 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { } serverResource, err := drClient.Get(ctx, resource.GetName(), metav1.GetOptions{}) if apiErrors.IsNotFound(err) { - _, err := drClient.Create(ctx, resource, metav1.CreateOptions{}) + created, err := drClient.Create(ctx, resource, metav1.CreateOptions{}) if err != nil { return fmt.Errorf("cannot create resource %s: %w", resource.GetName(), err) } + if isCRD(created) { + s.waitForCRD(ctx, created.GetName()) + mapper.Reset() // so that the created CRD gets rediscovered + } } else if err != nil { return fmt.Errorf("unknown api error: %w", err) } else { // The resource already exists, we need to update/patch it @@ -111,14 +118,18 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { if serverResource.GetAnnotations()[LastConfigAnnotation] == "" { s.log.Debug("doing plain update as no last-config label present") resource.SetResourceVersion(serverResource.GetResourceVersion()) - _, err = drClient.Update(ctx, resource, metav1.UpdateOptions{}) + resource, err = drClient.Update(ctx, resource, metav1.UpdateOptions{}) } else { s.log.Debug("patching resource") - err = s.patchResource(ctx, drClient, serverResource, resource) + resource, err = s.patchResource(ctx, drClient, serverResource, resource) } if err != nil { return fmt.Errorf("can't update resource: %w", err) } + if isCRD(resource) { + s.waitForCRD(ctx, resource.GetName()) + mapper.Reset() // so that the changed CRD gets rediscovered + } } s.keepResource(resource) } @@ -130,6 +141,29 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { return err } +// waitForCRD waits 5 seconds for a CRD to become established on a best-effort basis. +func (s *Stack) waitForCRD(ctx context.Context, crdName string) { + client, err := extensionsclient.NewForConfig(s.Clients.GetRESTConfig()) + if err != nil { + return + } + + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + _ = watch.CRDs(client.CustomResourceDefinitions()). + WithObjectName(crdName). + WithErrorCallback(watch.IsRetryable). + Until(ctx, func(item *extensionsv1.CustomResourceDefinition) (bool, error) { + for _, cond := range item.Status.Conditions { + if cond.Type == extensionsv1.Established { + return cond.Status == extensionsv1.ConditionTrue, nil + } + } + return false, nil + }) +} + func (s *Stack) keepResource(resource *unstructured.Unstructured) { resourceID := generateResourceID(*resource) logrus.WithField("stack", s.Name).Debugf("marking resource to be kept: %s", resourceID) @@ -359,23 +393,23 @@ func (s *Stack) isInStack(resource unstructured.Unstructured) bool { return false } -func (s *Stack) patchResource(ctx context.Context, drClient dynamic.ResourceInterface, serverResource *unstructured.Unstructured, localResource *unstructured.Unstructured) error { +func (s *Stack) patchResource(ctx context.Context, drClient dynamic.ResourceInterface, serverResource *unstructured.Unstructured, localResource *unstructured.Unstructured) (*unstructured.Unstructured, error) { original := serverResource.GetAnnotations()[LastConfigAnnotation] if original == "" { - return fmt.Errorf("%s does not have last-applied-configuration", localResource.GetSelfLink()) + return nil, fmt.Errorf("%s does not have last-applied-configuration", localResource.GetSelfLink()) } modified, _ := localResource.MarshalJSON() patch, err := jsonpatch.CreateMergePatch([]byte(original), modified) if err != nil { - return fmt.Errorf("failed to create jsonpatch data: %w", err) + return nil, fmt.Errorf("failed to create jsonpatch data: %w", err) } - _, err = drClient.Patch(ctx, localResource.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}) + resource, err := drClient.Patch(ctx, localResource.GetName(), types.MergePatchType, patch, metav1.PatchOptions{}) if err != nil { - return fmt.Errorf("failed to patch resource: %w", err) + return nil, fmt.Errorf("failed to patch resource: %w", err) } - return nil + return resource, nil } func (s *Stack) prepareResource(resource *unstructured.Unstructured) { @@ -398,6 +432,11 @@ func (s *Stack) prepareResource(resource *unstructured.Unstructured) { resource.SetAnnotations(annotations) } +func isCRD(resource *unstructured.Unstructured) bool { + gvk := resource.GroupVersionKind() + return gvk.Group == extensionsv1.GroupName && gvk.Kind == "CustomResourceDefinition" +} + func generateResourceID(resource unstructured.Unstructured) string { return fmt.Sprintf("%s/%s:%s@%s", resource.GetObjectKind().GroupVersionKind().Group, resource.GetKind(), resource.GetName(), resource.GetNamespace()) } From 970417b040cfd9e47c0bdc84f2c27bd4d755be8f Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 29 May 2024 13:32:59 +0200 Subject: [PATCH 7/8] Collect resource errors when applying stacks This allows k0s to apply stacks that are not well ordered, i.e. that don't declare dependencies before their dependents. By attempting to apply each resource, the dependencies will eventually be applied, allowing the dependents to be applied on the next retry. Signed-off-by: Tom Wieczorek --- pkg/applier/stack.go | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/pkg/applier/stack.go b/pkg/applier/stack.go index c54371f6859e..a873f084ddc6 100644 --- a/pkg/applier/stack.go +++ b/pkg/applier/stack.go @@ -84,11 +84,13 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { } } + var errs []error for _, resource := range sortedResources { s.prepareResource(resource) mapping, err := getRESTMapping(mapper, ptr.To(resource.GroupVersionKind())) if err != nil { - return err + errs = append(errs, err) + continue } var drClient dynamic.ResourceInterface if mapping.Scope.Name() == meta.RESTScopeNameNamespace { @@ -100,14 +102,18 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { if apiErrors.IsNotFound(err) { created, err := drClient.Create(ctx, resource, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("cannot create resource %s: %w", resource.GetName(), err) + err = fmt.Errorf("cannot create resource %s: %w", resource.GetName(), err) + errs = append(errs, err) + continue } if isCRD(created) { s.waitForCRD(ctx, created.GetName()) mapper.Reset() // so that the created CRD gets rediscovered } } else if err != nil { - return fmt.Errorf("unknown api error: %w", err) + err = fmt.Errorf("unknown api error: %w", err) + errs = append(errs, err) + continue } else { // The resource already exists, we need to update/patch it localChecksum := resource.GetAnnotations()[ChecksumAnnotation] if serverResource.GetAnnotations()[ChecksumAnnotation] == localChecksum { @@ -124,7 +130,9 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { resource, err = s.patchResource(ctx, drClient, serverResource, resource) } if err != nil { - return fmt.Errorf("can't update resource: %w", err) + err = fmt.Errorf("can't update resource: %w", err) + errs = append(errs, err) + continue } if isCRD(resource) { s.waitForCRD(ctx, resource.GetName()) @@ -134,11 +142,15 @@ func (s *Stack) Apply(ctx context.Context, prune bool) error { s.keepResource(resource) } - if prune { - err = s.prune(ctx, mapper) + if len(errs) > 0 { + return errors.Join(errs...) + } + + if !prune { + return nil } - return err + return s.prune(ctx, mapper) } // waitForCRD waits 5 seconds for a CRD to become established on a best-effort basis. From f33d15ffff7dd61388dff49088d787b3a0da365a Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Wed, 29 May 2024 13:52:58 +0200 Subject: [PATCH 8/8] Add an inttest for stack application Make sure that stacks that are badly ordered are eventually applied in their entirety. Signed-off-by: Tom Wieczorek --- inttest/Makefile.variables | 1 + inttest/stackapplier/rings.yaml | 82 ++++++++++++++ inttest/stackapplier/stackapplier_test.go | 132 ++++++++++++++++++++++ pkg/kubernetes/watch/unstructured.go | 25 ++++ 4 files changed, 240 insertions(+) create mode 100644 inttest/stackapplier/rings.yaml create mode 100644 inttest/stackapplier/stackapplier_test.go create mode 100644 pkg/kubernetes/watch/unstructured.go diff --git a/inttest/Makefile.variables b/inttest/Makefile.variables index 0f2582b89210..2573c91b69b5 100644 --- a/inttest/Makefile.variables +++ b/inttest/Makefile.variables @@ -59,5 +59,6 @@ smoketests := \ check-psp \ check-reset \ check-singlenode \ + check-stackapplier \ check-statussocket \ check-upgrade \ diff --git a/inttest/stackapplier/rings.yaml b/inttest/stackapplier/rings.yaml new file mode 100644 index 000000000000..b7985019b575 --- /dev/null +++ b/inttest/stackapplier/rings.yaml @@ -0,0 +1,82 @@ +# Have the wrong order: First the custom resources, then their definitions. Let +# one of the the resource be a cluster resource, so that the Stack's resource +# reordering won't let the CRD go before the CR. + +--- +apiVersion: k0s.example.com/v1 +kind: Character +metadata: + name: frodo + namespace: shire +spec: + speciesRef: + name: hobbit + +--- +apiVersion: k0s.example.com/v1 +kind: Species +metadata: + name: hobbit +spec: + characteristics: hairy feet + +--- +apiVersion: v1 +kind: Namespace +metadata: + name: shire + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: species.k0s.example.com +spec: + group: k0s.example.com + names: + kind: Species + singular: species + plural: species + scope: Cluster + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + characteristics: + type: string + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: characters.k0s.example.com +spec: + group: k0s.example.com + names: + kind: Character + singular: character + plural: characters + scope: Namespaced + versions: + - name: v1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + speciesRef: + type: object + properties: + name: + type: string diff --git a/inttest/stackapplier/stackapplier_test.go b/inttest/stackapplier/stackapplier_test.go new file mode 100644 index 000000000000..aa2b5251e153 --- /dev/null +++ b/inttest/stackapplier/stackapplier_test.go @@ -0,0 +1,132 @@ +/* +Copyright 2024 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nllb + +import ( + "context" + _ "embed" + "fmt" + "testing" + "time" + + "github.com/k0sproject/k0s/inttest/common" + "github.com/k0sproject/k0s/pkg/apis/k0s/v1beta1" + "github.com/k0sproject/k0s/pkg/kubernetes/watch" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + + "github.com/stretchr/testify/assert" + testifysuite "github.com/stretchr/testify/suite" + "sigs.k8s.io/yaml" +) + +type suite struct { + common.BootlooseSuite +} + +//go:embed rings.yaml +var rings string + +func (s *suite) TestStackApplier() { + ctx, cancel := context.WithCancelCause(s.Context()) + s.T().Cleanup(func() { cancel(nil) }) + + k0sConfig, err := yaml.Marshal(&v1beta1.ClusterConfig{ + Spec: &v1beta1.ClusterSpec{ + Storage: &v1beta1.StorageSpec{Type: v1beta1.KineStorageType}, + }, + }) + s.Require().NoError(err) + + s.WriteFileContent(s.ControllerNode(0), "/tmp/k0s.yaml", k0sConfig) + s.Require().NoError(s.InitController(0, "--config=/tmp/k0s.yaml", "--disable-components=control-api,konnectivity-server,kube-controller-manager,kube-scheduler")) + s.MakeDir(s.ControllerNode(0), "/var/lib/k0s/manifests/rings") + s.PutFile(s.ControllerNode(0), "/var/lib/k0s/manifests/rings/rings.yaml", rings) + + kubeconfig, err := s.GetKubeConfig(s.ControllerNode(0)) + s.Require().NoError(err) + client, err := dynamic.NewForConfig(kubeconfig) + s.Require().NoError(err) + + sgv := schema.GroupVersion{Group: "k0s.example.com", Version: "v1"} + + s.T().Run("hobbit", func(t *testing.T) { + t.Cleanup(func() { + if t.Failed() { + cancel(fmt.Errorf("%s failed", t.Name())) + } + }) + t.Parallel() + species := client.Resource(sgv.WithResource("species")) + assert.NoError(t, watch.Unstructured(species). + WithObjectName("hobbit"). + WithErrorCallback(retryWatchErrors(s.T().Logf)). + Until(ctx, func(item *unstructured.Unstructured) (bool, error) { + speciesName, found, err := unstructured.NestedString(item.Object, "spec", "characteristics") + if assert.NoError(t, err) && assert.True(t, found, "no characteristics found: %v", item.Object) { + assert.Equal(t, "hairy feet", speciesName) + } + return true, nil + })) + }) + + s.T().Run("frodo", func(t *testing.T) { + t.Cleanup(func() { + if t.Failed() { + cancel(fmt.Errorf("%s failed", t.Name())) + } + }) + t.Parallel() + characters := client.Resource(sgv.WithResource("characters")) + assert.NoError(t, watch.Unstructured(characters.Namespace("shire")). + WithObjectName("frodo"). + WithErrorCallback(retryWatchErrors(s.T().Logf)). + Until(ctx, func(item *unstructured.Unstructured) (bool, error) { + speciesName, found, err := unstructured.NestedString(item.Object, "spec", "speciesRef", "name") + if assert.NoError(t, err) && assert.True(t, found, "no species found: %v", item.Object) { + assert.Equal(t, "hobbit", speciesName) + } + return true, nil + })) + }) +} + +func retryWatchErrors(logf common.LogfFn) watch.ErrorCallback { + commonRetry := common.RetryWatchErrors(logf) + return func(err error) (time.Duration, error) { + if retryDelay, err := commonRetry(err); err == nil { + return retryDelay, nil + } + if apierrors.IsNotFound(err) { + return 350 * time.Millisecond, nil + } + return 0, err + } +} + +func TestStackApplierSuite(t *testing.T) { + s := suite{ + common.BootlooseSuite{ + ControllerCount: 1, + WorkerCount: 0, + }, + } + testifysuite.Run(t, &s) +} diff --git a/pkg/kubernetes/watch/unstructured.go b/pkg/kubernetes/watch/unstructured.go new file mode 100644 index 000000000000..85db5460ea89 --- /dev/null +++ b/pkg/kubernetes/watch/unstructured.go @@ -0,0 +1,25 @@ +/* +Copyright 2024 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func Unstructured(client Provider[*unstructured.UnstructuredList]) *Watcher[unstructured.Unstructured] { + return FromClient[*unstructured.UnstructuredList, unstructured.Unstructured](client) +}