From f5ff0839c0886e2563e42adee51d6b2e5ad2eba7 Mon Sep 17 00:00:00 2001 From: wenyingd Date: Fri, 9 Sep 2022 16:09:08 +0800 Subject: [PATCH] [ExternalNode] Implement support bundle collection status on Controller Signed-off-by: wenyingd --- cmd/antrea-controller/controller.go | 11 +- pkg/apiserver/apiserver.go | 65 +- .../supportbundlecollection/subresources.go | 63 ++ .../subresources_test.go | 109 ++++ .../fake_supportbundlecollection_expansion.go | 25 + .../v1beta2/generated_expansion.go | 2 - .../supportbundlecollection_expansion.go | 29 + .../supportbundlecollection/controller.go | 8 +- .../controller_test.go | 2 +- .../status_controller.go | 399 ++++++++++++ .../status_controller_test.go | 615 ++++++++++++++++++ 11 files changed, 1291 insertions(+), 37 deletions(-) create mode 100644 pkg/apiserver/registry/controlplane/supportbundlecollection/subresources.go create mode 100644 pkg/apiserver/registry/controlplane/supportbundlecollection/subresources_test.go create mode 100644 pkg/client/clientset/versioned/typed/controlplane/v1beta2/fake/fake_supportbundlecollection_expansion.go create mode 100644 pkg/client/clientset/versioned/typed/controlplane/v1beta2/supportbundlecollection_expansion.go create mode 100644 pkg/controller/supportbundlecollection/status_controller.go create mode 100644 pkg/controller/supportbundlecollection/status_controller_test.go diff --git a/cmd/antrea-controller/controller.go b/cmd/antrea-controller/controller.go index 76525364629..279eb76b44a 100644 --- a/cmd/antrea-controller/controller.go +++ b/cmd/antrea-controller/controller.go @@ -176,10 +176,12 @@ func run(o *Options) error { } var bundleCollectionController *supportbundlecollection.Controller + var bundleCollectionStatusController *supportbundlecollection.StatusController bundleCollectionStore := supportbundlecollectionstore.NewSupportBundleCollectionStore() if features.DefaultFeatureGate.Enabled(features.SupportBundleCollection) { bundleCollectionInformer := crdInformerFactory.Crd().V1alpha1().SupportBundleCollections() - bundleCollectionController = supportbundlecollection.NewSupportBundleCollectionController(client, crdClient, bundleCollectionInformer, nodeInformer, externalNodeInformer, bundleCollectionStore) + bundleCollectionStatusController = supportbundlecollection.NewStatusController(crdClient, bundleCollectionInformer, bundleCollectionStore) + bundleCollectionController = supportbundlecollection.NewSupportBundleCollectionController(client, crdClient, bundleCollectionInformer, nodeInformer, externalNodeInformer, bundleCollectionStore, bundleCollectionStatusController) } var networkPolicyStatusController *networkpolicy.StatusController @@ -270,6 +272,7 @@ func run(o *Options) error { egressController, statsAggregator, bundleCollectionController, + bundleCollectionStatusController, *o.config.EnablePrometheusMetrics, cipherSuites, cipher.TLSVersionMap[o.config.TLSMinVersion]) @@ -443,6 +446,7 @@ func createAPIServerConfig(kubeconfig string, egressController *egress.EgressController, statsAggregator *stats.Aggregator, bundleCollectionStore *supportbundlecollection.Controller, + bundleCollectionStatusController *supportbundlecollection.StatusController, enableMetrics bool, cipherSuites []uint16, tlsMinVersion uint16) (*apiserver.Config, error) { @@ -457,7 +461,7 @@ func createAPIServerConfig(kubeconfig string, secureServing.BindPort = bindPort secureServing.BindAddress = net.IPv4zero - // kubeconfig file is useful when antrea-controller isn't not running as a pod, like during development. + // kubeconfig file is useful when antrea-controller is not running as a pod, like during development. if len(kubeconfig) > 0 { authentication.RemoteKubeConfigFile = kubeconfig authorization.RemoteKubeConfigFile = kubeconfig @@ -505,5 +509,6 @@ func createAPIServerConfig(kubeconfig string, endpointQuerier, npController, egressController, - bundleCollectionStore), nil + bundleCollectionStore, + bundleCollectionStatusController), nil } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 1e29ec0a3b2..c11ab7c92ae 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -90,21 +90,22 @@ func init() { // ExtraConfig holds custom apiserver config. type ExtraConfig struct { - k8sClient kubernetes.Interface - addressGroupStore storage.Interface - appliedToGroupStore storage.Interface - networkPolicyStore storage.Interface - egressGroupStore storage.Interface - bundleCollectionStore storage.Interface - controllerQuerier querier.ControllerQuerier - endpointQuerier controllernetworkpolicy.EndpointQuerier - networkPolicyController *controllernetworkpolicy.NetworkPolicyController - egressController *egress.EgressController - externalIPPoolController *externalippool.ExternalIPPoolController - caCertController *certificate.CACertController - statsAggregator *stats.Aggregator - networkPolicyStatusController *controllernetworkpolicy.StatusController - bundleCollectionController *controllerbundlecollection.Controller + k8sClient kubernetes.Interface + addressGroupStore storage.Interface + appliedToGroupStore storage.Interface + networkPolicyStore storage.Interface + egressGroupStore storage.Interface + bundleCollectionStore storage.Interface + controllerQuerier querier.ControllerQuerier + endpointQuerier controllernetworkpolicy.EndpointQuerier + networkPolicyController *controllernetworkpolicy.NetworkPolicyController + egressController *egress.EgressController + externalIPPoolController *externalippool.ExternalIPPoolController + caCertController *certificate.CACertController + statsAggregator *stats.Aggregator + networkPolicyStatusController *controllernetworkpolicy.StatusController + bundleCollectionController *controllerbundlecollection.Controller + bundleCollectionStatusController *controllerbundlecollection.StatusController } // Config defines the config for Antrea apiserver. @@ -145,24 +146,26 @@ func NewConfig( endpointQuerier controllernetworkpolicy.EndpointQuerier, npController *controllernetworkpolicy.NetworkPolicyController, egressController *egress.EgressController, - bundleCollectionController *controllerbundlecollection.Controller) *Config { + bundleCollectionController *controllerbundlecollection.Controller, + bundleCollectionStatusController *controllerbundlecollection.StatusController) *Config { return &Config{ genericConfig: genericConfig, extraConfig: ExtraConfig{ - k8sClient: k8sClient, - addressGroupStore: addressGroupStore, - appliedToGroupStore: appliedToGroupStore, - networkPolicyStore: networkPolicyStore, - egressGroupStore: egressGroupStore, - bundleCollectionStore: supportBundleCollectionStore, - caCertController: caCertController, - statsAggregator: statsAggregator, - controllerQuerier: controllerQuerier, - endpointQuerier: endpointQuerier, - networkPolicyController: npController, - networkPolicyStatusController: networkPolicyStatusController, - egressController: egressController, - bundleCollectionController: bundleCollectionController, + k8sClient: k8sClient, + addressGroupStore: addressGroupStore, + appliedToGroupStore: appliedToGroupStore, + networkPolicyStore: networkPolicyStore, + egressGroupStore: egressGroupStore, + bundleCollectionStore: supportBundleCollectionStore, + caCertController: caCertController, + statsAggregator: statsAggregator, + controllerQuerier: controllerQuerier, + endpointQuerier: endpointQuerier, + networkPolicyController: npController, + networkPolicyStatusController: networkPolicyStatusController, + egressController: egressController, + bundleCollectionController: bundleCollectionController, + bundleCollectionStatusController: bundleCollectionStatusController, }, } } @@ -181,6 +184,7 @@ func installAPIGroup(s *APIServer, c completedConfig) error { nodeStatsSummaryStorage := nodestatssummary.NewREST(c.extraConfig.statsAggregator) egressGroupStorage := egressgroup.NewREST(c.extraConfig.egressGroupStore) bundleCollectionStorage := supportbundlecollection.NewREST(c.extraConfig.bundleCollectionStore) + bundleCollectionStatusStorage := supportbundlecollection.NewStatusREST(c.extraConfig.bundleCollectionStatusController) cpGroup := genericapiserver.NewDefaultAPIGroupInfo(controlplane.GroupName, Scheme, parameterCodec, Codecs) cpv1beta2Storage := map[string]rest.Storage{} cpv1beta2Storage["addressgroups"] = addressGroupStorage @@ -192,6 +196,7 @@ func installAPIGroup(s *APIServer, c completedConfig) error { cpv1beta2Storage["clustergroupmembers"] = clusterGroupMembershipStorage cpv1beta2Storage["egressgroups"] = egressGroupStorage cpv1beta2Storage["supportbundlecollections"] = bundleCollectionStorage + cpv1beta2Storage["supportbundlecollections/status"] = bundleCollectionStatusStorage cpGroup.VersionedResourcesStorageMap["v1beta2"] = cpv1beta2Storage systemGroup := genericapiserver.NewDefaultAPIGroupInfo(system.GroupName, Scheme, metav1.ParameterCodec, Codecs) diff --git a/pkg/apiserver/registry/controlplane/supportbundlecollection/subresources.go b/pkg/apiserver/registry/controlplane/supportbundlecollection/subresources.go new file mode 100644 index 00000000000..5d35094c195 --- /dev/null +++ b/pkg/apiserver/registry/controlplane/supportbundlecollection/subresources.go @@ -0,0 +1,63 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supportbundlecollection + +import ( + "context" + "fmt" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/registry/rest" + + "antrea.io/antrea/pkg/apis/controlplane" +) + +// StatusREST implements the REST endpoint for getting NetworkPolicy's obj. +type StatusREST struct { + collector statusCollector +} + +// NewStatusREST returns a REST object that will work against API services. +func NewStatusREST(collector statusCollector) *StatusREST { + return &StatusREST{collector} +} + +// statusCollector is the interface required by the handler. +type statusCollector interface { + UpdateStatus(status *controlplane.SupportBundleCollectionStatus) error +} + +var _ rest.NamedCreater = &StatusREST{} + +func (s StatusREST) New() runtime.Object { + return &controlplane.SupportBundleCollectionStatus{} +} + +func (s StatusREST) Create(ctx context.Context, name string, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { + status, ok := obj.(*controlplane.SupportBundleCollectionStatus) + if !ok { + return nil, errors.NewBadRequest(fmt.Sprintf("not a SupportBundleCollectionStatus object: %T", obj)) + } + if name != status.Name { + return nil, errors.NewBadRequest("name in URL does not match name in SupportBundleCollectionStatus object") + } + err := s.collector.UpdateStatus(status) + if err != nil { + return nil, err + } + return &metav1.Status{Status: metav1.StatusSuccess}, nil +} diff --git a/pkg/apiserver/registry/controlplane/supportbundlecollection/subresources_test.go b/pkg/apiserver/registry/controlplane/supportbundlecollection/subresources_test.go new file mode 100644 index 00000000000..56d7823230a --- /dev/null +++ b/pkg/apiserver/registry/controlplane/supportbundlecollection/subresources_test.go @@ -0,0 +1,109 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supportbundlecollection + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "antrea.io/antrea/pkg/apis/controlplane" +) + +func TestCreate(t *testing.T) { + for _, tc := range []struct { + name string + obj runtime.Object + expError error + }{ + { + name: "invalid-status-type", + obj: runtime.Object(&controlplane.SupportBundleCollection{ + ObjectMeta: metav1.ObjectMeta{ + Name: "invalid-status-type", + }, + }), + expError: errors.NewBadRequest("not a SupportBundleCollectionStatus object"), + }, + { + name: "invalid-status-name", + obj: runtime.Object(&controlplane.SupportBundleCollectionStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: "invalid-name", + }, + }), + expError: errors.NewBadRequest("name in URL does not match name in SupportBundleCollectionStatus object"), + }, + { + name: "unable-update", + obj: runtime.Object(&controlplane.SupportBundleCollectionStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: "unable-update", + }, + }), + expError: fmt.Errorf("no Nodes status is updated"), + }, + { + name: "valid-status-update", + obj: runtime.Object(&controlplane.SupportBundleCollectionStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: "valid-status-update", + }, + Nodes: []controlplane.SupportBundleCollectionNodeStatus{ + { + NodeName: "n1", + NodeType: "Node", + }, + }, + }), + expError: nil, + }, + } { + statusController := &fakeStatusCollector{ + bundleCollectionStatuses: make(map[string]*controlplane.SupportBundleCollectionStatus), + } + r := NewStatusREST(statusController) + rsp, err := r.Create(context.TODO(), tc.name, tc.obj, nil, &metav1.CreateOptions{}) + if tc.expError == nil { + assert.NoError(t, err) + assert.Equal(t, &metav1.Status{Status: metav1.StatusSuccess}, rsp) + expStatus := tc.obj.(*controlplane.SupportBundleCollectionStatus) + status := statusController.bundleCollectionStatuses[tc.name] + assert.Equal(t, expStatus, status) + } else { + assert.Error(t, err) + assert.True(t, strings.Contains(err.Error(), tc.expError.Error())) + } + } +} + +type fakeStatusCollector struct { + bundleCollectionStatuses map[string]*controlplane.SupportBundleCollectionStatus +} + +func (c *fakeStatusCollector) UpdateStatus(status *controlplane.SupportBundleCollectionStatus) error { + bundleCollectionName := status.Name + if len(status.Nodes) == 0 { + return fmt.Errorf("no Nodes status is updated") + } + c.bundleCollectionStatuses[bundleCollectionName] = status + return nil +} diff --git a/pkg/client/clientset/versioned/typed/controlplane/v1beta2/fake/fake_supportbundlecollection_expansion.go b/pkg/client/clientset/versioned/typed/controlplane/v1beta2/fake/fake_supportbundlecollection_expansion.go new file mode 100644 index 00000000000..7c129324fac --- /dev/null +++ b/pkg/client/clientset/versioned/typed/controlplane/v1beta2/fake/fake_supportbundlecollection_expansion.go @@ -0,0 +1,25 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fake + +import ( + "context" + + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" +) + +func (c *FakeSupportBundleCollections) UpdateStatus(ctx context.Context, name string, status *v1beta2.SupportBundleCollectionStatus) error { + return nil +} diff --git a/pkg/client/clientset/versioned/typed/controlplane/v1beta2/generated_expansion.go b/pkg/client/clientset/versioned/typed/controlplane/v1beta2/generated_expansion.go index 1c4747d955e..76cbc12a2db 100644 --- a/pkg/client/clientset/versioned/typed/controlplane/v1beta2/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/controlplane/v1beta2/generated_expansion.go @@ -27,5 +27,3 @@ type EgressGroupExpansion interface{} type GroupAssociationExpansion interface{} type NodeStatsSummaryExpansion interface{} - -type SupportBundleCollectionExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/controlplane/v1beta2/supportbundlecollection_expansion.go b/pkg/client/clientset/versioned/typed/controlplane/v1beta2/supportbundlecollection_expansion.go new file mode 100644 index 00000000000..13b868159f8 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/controlplane/v1beta2/supportbundlecollection_expansion.go @@ -0,0 +1,29 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1beta2 + +import ( + "context" + + "antrea.io/antrea/pkg/apis/controlplane/v1beta2" +) + +type SupportBundleCollectionExpansion interface { + UpdateStatus(ctx context.Context, name string, status *v1beta2.SupportBundleCollectionStatus) error +} + +func (c *supportBundleCollections) UpdateStatus(ctx context.Context, name string, status *v1beta2.SupportBundleCollectionStatus) error { + return c.client.Post().Resource("supportbundlecollections").Name(name).SubResource("status").Body(status).Do(ctx).Error() +} diff --git a/pkg/controller/supportbundlecollection/controller.go b/pkg/controller/supportbundlecollection/controller.go index 8de1a535028..59f188f445c 100644 --- a/pkg/controller/supportbundlecollection/controller.go +++ b/pkg/controller/supportbundlecollection/controller.go @@ -115,6 +115,8 @@ type Controller struct { // supportBundleCollectionAppliedToStore is the storage where the required Nodes or ExternalNodes of a // SupportBundleCollection are stored. supportBundleCollectionAppliedToStore cache.Indexer + + StatusController *StatusController } func NewSupportBundleCollectionController( @@ -123,7 +125,8 @@ func NewSupportBundleCollectionController( supportBundleInformer crdinformers.SupportBundleCollectionInformer, nodeInformer coreinformers.NodeInformer, externalNodeInformer crdinformers.ExternalNodeInformer, - supportBundleCollectionStore storage.Interface) *Controller { + supportBundleCollectionStore storage.Interface, + statusController *StatusController) *Controller { c := &Controller{ kubeClient: kubeClient, crdClient: crdClient, @@ -141,6 +144,7 @@ func NewSupportBundleCollectionController( processingNodesIndex: processingNodesIndexFunc, processingExternalNodesIndex: processingExternalNodesIndexFunc, }), + StatusController: statusController, } c.supportBundleCollectionInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ @@ -169,6 +173,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) { go wait.Until(c.worker, time.Second, stopCh) + go c.StatusController.Run(stopCh) + <-stopCh } diff --git a/pkg/controller/supportbundlecollection/controller_test.go b/pkg/controller/supportbundlecollection/controller_test.go index 4c3a6480d1f..f26484d7d70 100644 --- a/pkg/controller/supportbundlecollection/controller_test.go +++ b/pkg/controller/supportbundlecollection/controller_test.go @@ -1148,7 +1148,7 @@ func newController(tc *testClient) *Controller { supportBundleInformer := tc.crdInformerFactory.Crd().V1alpha1().SupportBundleCollections() store := bundlecollectionstore.NewSupportBundleCollectionStore() - fakeController := NewSupportBundleCollectionController(tc.client, tc.crdClient, supportBundleInformer, nodeInformer, externalNodeInformer, store) + fakeController := NewSupportBundleCollectionController(tc.client, tc.crdClient, supportBundleInformer, nodeInformer, externalNodeInformer, store, nil) return fakeController } diff --git a/pkg/controller/supportbundlecollection/status_controller.go b/pkg/controller/supportbundlecollection/status_controller.go new file mode 100644 index 00000000000..f53ca771a5d --- /dev/null +++ b/pkg/controller/supportbundlecollection/status_controller.go @@ -0,0 +1,399 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supportbundlecollection + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/apis/controlplane" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + "antrea.io/antrea/pkg/apiserver/storage" + clientset "antrea.io/antrea/pkg/client/clientset/versioned" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + "antrea.io/antrea/pkg/controller/types" +) + +const ( + statusControllerName = "SupportBundleCollectionStatusController" +) + +// StatusController is responsible for synchronizing the status of Support Bundle Collection. +type StatusController struct { + crdClient clientset.Interface + + // queue maintains the keys of the Support Bundle Collection objects that need to be synced. + queue workqueue.RateLimitingInterface + + // internalSupportBundleCollectionStore is the storage where the internal Support Bundle Collection are stored. + internalSupportBundleCollectionStore storage.Interface + + // statuses is a nested map that keeps the realization statuses reported by antrea-agents. + // The outer map's keys are the SupportBundleCollection names. The inner map's keys are the Node names. The inner + // map's values are statuses reported by each Node for a SupportBundleCollection. + statuses map[string]map[string]*controlplane.SupportBundleCollectionNodeStatus + statusesLock sync.RWMutex + + supportBundleCollectionLister crdlisters.SupportBundleCollectionLister + supportBundleCollectionListerSynced cache.InformerSynced +} + +func NewStatusController(crdClient clientset.Interface, + supportBundleInformer crdinformers.SupportBundleCollectionInformer, + supportBundleCollectionStore storage.Interface) *StatusController { + return &StatusController{ + crdClient: crdClient, + internalSupportBundleCollectionStore: supportBundleCollectionStore, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "supportBundleCollectionStatus"), + statuses: make(map[string]map[string]*controlplane.SupportBundleCollectionNodeStatus), + supportBundleCollectionLister: supportBundleInformer.Lister(), + supportBundleCollectionListerSynced: supportBundleInformer.Informer().HasSynced, + } +} + +func (c *StatusController) UpdateStatus(status *controlplane.SupportBundleCollectionStatus) error { + key := status.Name + _, found, _ := c.internalSupportBundleCollectionStore.Get(key) + if !found { + klog.InfoS("SupportBundleCollection has been deleted, skip updating its status", "supportBundleCollection", key) + return nil + } + func() { + c.statusesLock.Lock() + defer c.statusesLock.Unlock() + statusPerNode, exists := c.statuses[key] + if !exists { + statusPerNode = map[string]*controlplane.SupportBundleCollectionNodeStatus{} + c.statuses[key] = statusPerNode + } + for i := range status.Nodes { + nodeStatus := status.Nodes[i] + nodeStatusKey := getStatusKey(&nodeStatus) + statusPerNode[nodeStatusKey] = &nodeStatus + } + }() + c.queue.Add(key) + return nil +} + +// Run begins watching and syncing of a StatusController. +func (c *StatusController) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.InfoS("Starting", "controllerName", statusControllerName) + defer klog.InfoS("Shutting down", "controllerName", statusControllerName) + + if !cache.WaitForNamedCacheSync(statusControllerName, stopCh, c.supportBundleCollectionListerSynced) { + return + } + + go wait.NonSlidingUntil(c.watchInternalSupportBundleCollection, 5*time.Second, stopCh) + + go wait.Until(c.runWorker, time.Second, stopCh) + + <-stopCh +} + +func (c *StatusController) watchInternalSupportBundleCollection() { + watcher, err := c.internalSupportBundleCollectionStore.Watch(context.TODO(), "", labels.Everything(), fields.Everything()) + if err != nil { + klog.ErrorS(err, "Failed to start watch for internal SupportBundleCollection") + return + } + defer watcher.Stop() + resultCh := watcher.ResultChan() + for { + select { + case event, ok := <-resultCh: + if !ok { + return + } + // Skip handling Bookmark events. + if event.Type == watch.Bookmark { + continue + } + bundleCollection := event.Object.(*controlplane.SupportBundleCollection) + c.queue.Add(bundleCollection.Name) + } + } +} + +func (c *StatusController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (c *StatusController) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + err := c.syncHandler(key.(string)) + if err == nil { + c.queue.Forget(key) + return true + } + klog.ErrorS(err, "Failed to sync SupportBundleCollection status", "supportBundleCollection", key) + c.queue.AddRateLimited(key) + return true +} + +// syncHandler calculates the SupportBundleCollection status based on the desired state from the +// internalSupportBundleCollectionStore and the actual state from the statuses map, and syncs it with the Kubernetes API. +// Each status update from agents can trigger syncHandler, however, the status updates' arrival time should not differ +// too much so some of them can be merged in workqueue. Besides, there are a limited number of workers. If there are +// many policies need to sync, some policies will have to wait in workqueue, during which their status updates can be +// merged. Therefore, it shouldn't happen that each status update leads to one CR update. +func (c *StatusController) syncHandler(key string) error { + klog.V(2).InfoS("Syncing SupportBundleCollection status", "supportBundleCollection", key) + internalBundleCollectionObj, found, _ := c.internalSupportBundleCollectionStore.Get(key) + if !found { + // It has been deleted, cleaning its statuses. + c.clearStatuses(key) + return nil + } + internalBundleCollection := internalBundleCollectionObj.(*types.SupportBundleCollection) + + updateStatus := func(currentNodes, desiredNodes int, updatedConditions []v1alpha1.SupportBundleCollectionCondition) error { + status := &v1alpha1.SupportBundleCollectionStatus{ + SucceededNodes: int32(currentNodes), + DesiredNodes: int32(desiredNodes), + Conditions: updatedConditions, + } + klog.V(2).InfoS("Updating SupportBundleCollection status", "supportBundleCollection", internalBundleCollection.Name, "status", status) + return c.updateSupportBundleCollectionStatus(internalBundleCollection.Name, status) + } + + // It means the SupportBundleCollection hasn't been processed once. Set it to started failed to differentiate from + // SupportBundleCollection that spans 0 Node. + now := metav1.Now() + if internalBundleCollection.SpanMeta.NodeNames == nil { + return updateStatus(0, 0, []v1alpha1.SupportBundleCollectionCondition{ + { + Type: v1alpha1.CollectionStarted, + Status: metav1.ConditionFalse, + LastTransitionTime: now, + }, + }) + } + + desiredNodes := len(internalBundleCollection.SpanMeta.NodeNames) + succeededNodes := 0 + // TODO: Use a map to store the failed Node/ExternalNode and its error, and report the failed list when the + // collection is completed. + failedNodes := make(map[string][]string) + failedNodesCount := 0 + statuses := c.getNodeStatuses(key) + for _, status := range statuses { + statusNode := getStatusKey(status) + // The node is no longer in the span of this Support Bundle Collection, delete its status. + if !internalBundleCollection.NodeNames.Has(statusNode) { + c.deleteNodeStatus(key, statusNode) + continue + } + if status.Completed { + succeededNodes += 1 + } else { + failedNodesCount += 1 + failedReason := status.Error + if failedReason == "" { + failedReason = "unknown error" + } + _, exists := failedNodes[failedReason] + if !exists { + failedNodes[failedReason] = make([]string, 0) + } + failedNodes[failedReason] = append(failedNodes[failedReason], statusNode) + } + } + + newConditions := []v1alpha1.SupportBundleCollectionCondition{ + // Mark the support bundle collection as started since the internal resource successfully created. + // It will not be added as a duplication if it already exists. + {Type: v1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.Now()}, + } + if succeededNodes > 0 { + newConditions = append(newConditions, + v1alpha1.SupportBundleCollectionCondition{ + Type: v1alpha1.BundleCollected, + Status: metav1.ConditionTrue, + LastTransitionTime: now, + }, + ) + } + if failedNodesCount > 0 { + failedNodeErrors := make([]string, 0, len(failedNodes)) + for k, v := range failedNodes { + failedNodeErrors = append(failedNodeErrors, fmt.Sprintf(`"%s":[%s]`, k, strings.Join(v, ", "))) + } + sort.Strings(failedNodeErrors) + failedConditionMessage := fmt.Sprintf("Failed Agent count: %d, %s", failedNodesCount, strings.Join(failedNodeErrors, ", ")) + newConditions = append(newConditions, + v1alpha1.SupportBundleCollectionCondition{ + Type: v1alpha1.CollectionFailure, + Status: metav1.ConditionTrue, + Reason: string(metav1.StatusReasonInternalError), + Message: failedConditionMessage, + LastTransitionTime: now, + }, + ) + } + if succeededNodes == desiredNodes { + newConditions = append(newConditions, + v1alpha1.SupportBundleCollectionCondition{ + Type: v1alpha1.CollectionFailure, + Status: metav1.ConditionFalse, + LastTransitionTime: now, + }, + ) + } + if succeededNodes+failedNodesCount == desiredNodes { + newConditions = append(newConditions, + v1alpha1.SupportBundleCollectionCondition{ + Type: v1alpha1.CollectionCompleted, + Status: metav1.ConditionTrue, + LastTransitionTime: now, + }, + ) + } + return updateStatus(succeededNodes, desiredNodes, newConditions) +} + +func getStatusKey(status *controlplane.SupportBundleCollectionNodeStatus) string { + // TODO: use NodeNamespace/NodeName for ExternalNode after the Namespace is added in the connection key between + // antrea-agent and antrea-controller. + return status.NodeName +} + +func (c *StatusController) getNodeStatuses(key string) []*controlplane.SupportBundleCollectionNodeStatus { + c.statusesLock.RLock() + defer c.statusesLock.RUnlock() + statusPerNode, exists := c.statuses[key] + if !exists { + return nil + } + statuses := make([]*controlplane.SupportBundleCollectionNodeStatus, 0, len(c.statuses[key])) + for _, status := range statusPerNode { + statuses = append(statuses, status) + } + return statuses +} + +func (c *StatusController) clearStatuses(key string) { + c.statusesLock.Lock() + defer c.statusesLock.Unlock() + delete(c.statuses, key) +} + +func (c *StatusController) deleteNodeStatus(key string, nodeName string) { + c.statusesLock.Lock() + defer c.statusesLock.Unlock() + statusPerNode, exists := c.statuses[key] + if !exists { + return + } + delete(statusPerNode, nodeName) +} + +func (c *StatusController) updateSupportBundleCollectionStatus(name string, updatedStatus *v1alpha1.SupportBundleCollectionStatus) error { + bundleCollection, err := c.supportBundleCollectionLister.Get(name) + if err != nil { + klog.InfoS("Didn't find the original SupportBundleCollection, skip updating status", "supportBundleCollection", name) + return nil + } + toUpdate := bundleCollection.DeepCopy() + if err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + updatedConditions := appendConditions(toUpdate.Status.Conditions, updatedStatus.Conditions) + updatedStatus.Conditions = updatedConditions + // If the current status equals to the desired status, no need to update. + if supportBundleCollectionStatusEqual(toUpdate.Status, *updatedStatus) { + return nil + } + toUpdate.Status = *updatedStatus + klog.V(2).InfoS("Updating SupportBundleCollection", "supportBundleCollection", name, "status", klog.KObj(toUpdate)) + _, updateErr := c.crdClient.CrdV1alpha1().SupportBundleCollections().UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}) + if updateErr != nil && errors.IsConflict(updateErr) { + var getErr error + if toUpdate, getErr = c.crdClient.CrdV1alpha1().SupportBundleCollections().Get(context.TODO(), name, metav1.GetOptions{}); getErr != nil { + return getErr + } + } + // Return the error from UPDATE. + return updateErr + }); err != nil { + return err + } + klog.V(2).InfoS("Updated SupportBundleCollection", "supportBundleCollection", name) + return nil +} + +// supportBundleCollectionStatusEqual compares two SupportBundleCollectionStatus objects. It disregards +// the LastTransitionTime field in the status Conditions. +func supportBundleCollectionStatusEqual(oldStatus, newStatus v1alpha1.SupportBundleCollectionStatus) bool { + semanticIgnoreLastTransitionTime := conversion.EqualitiesOrDie( + conditionSliceEqualsIgnoreLastTransitionTime, + ) + return semanticIgnoreLastTransitionTime.DeepEqual(oldStatus, newStatus) +} + +func conditionSliceEqualsIgnoreLastTransitionTime(as, bs []v1alpha1.SupportBundleCollectionCondition) bool { + sort.Slice(as, func(i, j int) bool { + a := as[i] + b := as[j] + if a.Type == b.Type { + return a.Status < b.Status + } + return a.Type < b.Type + }) + sort.Slice(bs, func(i, j int) bool { + a := bs[i] + b := bs[j] + if a.Type == b.Type { + return a.Status < b.Status + } + return a.Type < b.Type + }) + if len(as) != len(bs) { + return false + } + for i := range as { + a := as[i] + b := bs[i] + if !conditionEqualsIgnoreLastTransitionTime(a, b) { + return false + } + } + return true +} diff --git a/pkg/controller/supportbundlecollection/status_controller_test.go b/pkg/controller/supportbundlecollection/status_controller_test.go new file mode 100644 index 00000000000..81e027e2506 --- /dev/null +++ b/pkg/controller/supportbundlecollection/status_controller_test.go @@ -0,0 +1,615 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supportbundlecollection + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + + "antrea.io/antrea/pkg/apis/controlplane" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + bundlecollectionstore "antrea.io/antrea/pkg/controller/supportbundlecollection/store" + "antrea.io/antrea/pkg/controller/types" +) + +func TestSupportBundleCollectionStatusEqual(t *testing.T) { + for _, tc := range []struct { + oldStatus crdv1alpha1.SupportBundleCollectionStatus + newStatus crdv1alpha1.SupportBundleCollectionStatus + equal bool + }{ + { + oldStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue}, + }, + }, + newStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 5, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue}, + }, + }, + equal: false, + }, + { + oldStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue}, + }, + }, + newStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue}, + }, + }, + equal: false, + }, + { + oldStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue}, + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue}, + }, + }, + newStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue}, + }, + }, + equal: true, + }, { + oldStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(time.Now())}, + }, + }, + newStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(time.Now().Add(time.Minute))}, + }, + }, + equal: true, + }, { + oldStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(time.Now())}, + }, + }, + newStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 100, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionFalse, LastTransitionTime: metav1.NewTime(time.Now())}, + }, + }, + equal: false, + }, + } { + equals := supportBundleCollectionStatusEqual(tc.oldStatus, tc.newStatus) + assert.Equal(t, tc.equal, equals) + } +} + +func TestUpdateSupportBundleCollectionStatus(t *testing.T) { + now := time.Now() + for _, tc := range []struct { + existingCollection *crdv1alpha1.SupportBundleCollection + updateStatus *crdv1alpha1.SupportBundleCollectionStatus + expectedStatus crdv1alpha1.SupportBundleCollectionStatus + }{ + { + existingCollection: &crdv1alpha1.SupportBundleCollection{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b1", + }, + }, + updateStatus: &crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 0, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + }, + }, + expectedStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 0, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + }, + }, + }, + { + existingCollection: &crdv1alpha1.SupportBundleCollection{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b1", + }, + Status: crdv1alpha1.SupportBundleCollectionStatus{ + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionFalse, LastTransitionTime: metav1.NewTime(now)}, + }, + }, + }, + updateStatus: &crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 0, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second))}, + }, + }, + expectedStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 0, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionFalse, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second))}, + }, + }, + }, + { + existingCollection: &crdv1alpha1.SupportBundleCollection{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b1", + }, + Status: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 0, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + }, + }, + }, + updateStatus: &crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 1, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + }, + }, + expectedStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 1, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + }, + }, + }, + { + existingCollection: &crdv1alpha1.SupportBundleCollection{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b1", + }, + Status: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 1, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + }, + }, + }, + updateStatus: &crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 5, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + }, + }, + expectedStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 5, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + }, + }, + }, + { + existingCollection: &crdv1alpha1.SupportBundleCollection{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b1", + }, + Status: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 5, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + }, + }, + }, + updateStatus: &crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 8, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + {Type: crdv1alpha1.CollectionFailure, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 20)), Reason: string(metav1.StatusReasonInternalError), Message: "Agent error"}, + {Type: crdv1alpha1.CollectionCompleted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 20))}, + }, + }, + expectedStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 8, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + {Type: crdv1alpha1.CollectionFailure, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 20)), Reason: string(metav1.StatusReasonInternalError), Message: "Agent error"}, + {Type: crdv1alpha1.CollectionCompleted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 20))}, + }, + }, + }, + { + existingCollection: &crdv1alpha1.SupportBundleCollection{ + ObjectMeta: metav1.ObjectMeta{ + Name: "b1", + }, + Status: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 8, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + }, + }, + }, + updateStatus: &crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 10, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + {Type: crdv1alpha1.CollectionFailure, Status: metav1.ConditionFalse, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 20))}, + {Type: crdv1alpha1.CollectionCompleted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 20))}, + }, + }, + expectedStatus: crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 10, + SucceededNodes: 10, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now)}, + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 10))}, + {Type: crdv1alpha1.CollectionFailure, Status: metav1.ConditionFalse, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 20))}, + {Type: crdv1alpha1.CollectionCompleted, Status: metav1.ConditionTrue, LastTransitionTime: metav1.NewTime(now.Add(time.Second * 20))}, + }, + }, + }, + } { + testClient := newTestClient(nil, []runtime.Object{tc.existingCollection}) + statusController := newStatusController(testClient) + stopCh := make(chan struct{}) + testClient.start(stopCh) + testClient.waitForSync(stopCh) + collectionName := tc.existingCollection.Name + err := statusController.updateSupportBundleCollectionStatus(collectionName, tc.updateStatus) + require.NoError(t, err) + updatedCollection, err := statusController.crdClient.CrdV1alpha1().SupportBundleCollections().Get(context.TODO(), collectionName, metav1.GetOptions{}) + require.NoError(t, err) + assert.True(t, supportBundleCollectionStatusEqual(tc.expectedStatus, updatedCollection.Status)) + } +} + +func TestWorker(t *testing.T) { + name := "b1" + testBundleCollection := &crdv1alpha1.SupportBundleCollection{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: crdv1alpha1.SupportBundleCollectionSpec{ + Nodes: &crdv1alpha1.BundleNodes{ + NodeSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"test": "selected"}, + }, + }, + }, + } + testClient := newTestClient(nil, []runtime.Object{testBundleCollection}) + statusController := newStatusController(testClient) + stopCh := make(chan struct{}) + defer close(stopCh) + testClient.start(stopCh) + testClient.waitForSync(stopCh) + go statusController.Run(stopCh) + + checkStatus := func(targetStatus crdv1alpha1.SupportBundleCollectionStatus) error { + err := wait.PollImmediate(time.Millisecond*50, time.Second, func() (done bool, err error) { + collection, err := testClient.crdClient.CrdV1alpha1().SupportBundleCollections().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return false, err + } + status := collection.Status + if status.DesiredNodes != targetStatus.DesiredNodes { + return false, nil + } + if status.SucceededNodes != targetStatus.SucceededNodes { + return false, nil + } + for _, c := range targetStatus.Conditions { + if !conditionExistsIgnoreLastTransitionTime(status.Conditions, c) { + return false, nil + } + } + return true, nil + }) + return err + } + err := statusController.internalSupportBundleCollectionStore.Create(&types.SupportBundleCollection{ + Name: name, + SpanMeta: types.SpanMeta{ + NodeNames: sets.NewString("n0", "n1", "n2", "n3"), + }, + }) + require.NoError(t, err) + assert.NoError(t, checkStatus(crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 4, + SucceededNodes: 0, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionStarted, Status: metav1.ConditionTrue}, + }, + })) + + updateNodeStatus := func(nodeName string, completed bool) { + status := &controlplane.SupportBundleCollectionStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Nodes: []controlplane.SupportBundleCollectionNodeStatus{ + { + NodeName: nodeName, + NodeType: "Node", + Completed: completed, + }, + }, + } + err := statusController.UpdateStatus(status) + require.NoError(t, err) + } + + updateNodesFailures := func(nodesFailures map[string]string) { + status := &controlplane.SupportBundleCollectionStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Nodes: make([]controlplane.SupportBundleCollectionNodeStatus, 0), + } + for node, failure := range nodesFailures { + status.Nodes = append(status.Nodes, controlplane.SupportBundleCollectionNodeStatus{ + NodeName: node, + NodeType: "Node", + Completed: false, + Error: failure, + }) + } + err := statusController.UpdateStatus(status) + require.NoError(t, err) + } + nodeStatusExists := func(name string, statuses []*controlplane.SupportBundleCollectionNodeStatus) bool { + for _, s := range statuses { + if s.NodeName == name { + return true + } + } + return false + } + updateNodeStatus("n0", true) + updateNodeStatus("n4", true) + assert.NoError(t, checkStatus(crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 4, + SucceededNodes: 1, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.BundleCollected, Status: metav1.ConditionTrue}, + }, + })) + statuses := statusController.getNodeStatuses(name) + assert.Equal(t, 1, len(statuses)) + assert.True(t, nodeStatusExists("n0", statuses)) + assert.False(t, nodeStatusExists("n4", statuses)) + + updateNodesFailures(map[string]string{"n1": "can not access file server", "n2": "", "n3": "can not access file server"}) + assert.NoError(t, checkStatus(crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 4, + SucceededNodes: 1, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionFailure, Status: metav1.ConditionTrue, Reason: string(metav1.StatusReasonInternalError), Message: fmt.Sprintf(`Failed Agent count: 3, "can not access file server":[n1, n3], "unknown error":[n2]`)}, + {Type: crdv1alpha1.CollectionCompleted, Status: metav1.ConditionTrue}, + }, + })) + updateNodeStatus("n1", true) + updateNodeStatus("n2", true) + updateNodeStatus("n3", true) + assert.NoError(t, checkStatus(crdv1alpha1.SupportBundleCollectionStatus{ + DesiredNodes: 4, + SucceededNodes: 4, + Conditions: []crdv1alpha1.SupportBundleCollectionCondition{ + {Type: crdv1alpha1.CollectionFailure, Status: metav1.ConditionFalse}, + {Type: crdv1alpha1.CollectionCompleted, Status: metav1.ConditionTrue}, + }, + })) + + err = statusController.internalSupportBundleCollectionStore.Delete(name) + require.NoError(t, err) + err = wait.PollImmediate(time.Millisecond*50, time.Second, func() (done bool, err error) { + statuses := statusController.getNodeStatuses(name) + if len(statuses) == 0 { + return true, nil + } + return false, nil + }) + assert.NoError(t, err) +} + +func TestWatchInternalSupportBundleCollection(t *testing.T) { + store := bundlecollectionstore.NewSupportBundleCollectionStore() + statusController := &StatusController{ + internalSupportBundleCollectionStore: store, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "supportBundleCollectionStatus"), + } + go statusController.watchInternalSupportBundleCollection() + enqueuedItems := make(map[string]int) + stopKey := "stop" + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + processNextWorkItem := func() bool { + key, quit := statusController.queue.Get() + if quit { + return false + } + keyStr := key.(string) + if keyStr == stopKey { + return false + } + _, ok := enqueuedItems[keyStr] + if !ok { + enqueuedItems[keyStr] = 0 + } + enqueuedItems[keyStr] += 1 + defer statusController.queue.Done(key) + return true + } + for processNextWorkItem() { + } + }() + collectionName := "b1" + collection := &types.SupportBundleCollection{ + SpanMeta: types.SpanMeta{ + NodeNames: sets.NewString("n1", "en1"), + }, + Name: collectionName, + } + err := store.Create(collection) + require.NoError(t, err) + time.Sleep(time.Millisecond * 10) + err = store.Delete(collectionName) + require.NoError(t, err) + err = store.Create(&types.SupportBundleCollection{ + Name: stopKey, + }) + require.NoError(t, err) + wg.Wait() + count, exists := enqueuedItems[collectionName] + assert.True(t, exists) + assert.Equal(t, 2, count) +} + +func TestUpdateStatus(t *testing.T) { + testClient := newTestClient(nil, nil) + statusController := newStatusController(testClient) + collectionName := "b1" + namespace := "ns1" + statusController.internalSupportBundleCollectionStore.Create(&types.SupportBundleCollection{ + Name: collectionName, + SpanMeta: types.SpanMeta{ + NodeNames: sets.NewString("n0", "n1", "n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9"), + }, + }) + nodesCount := 10 + wg := sync.WaitGroup{} + wg.Add(nodesCount) + for i := 0; i < nodesCount; i++ { + nodeName := fmt.Sprintf("n%d", i) + nodeType := controlplane.SupportBundleCollectionNodeTypeNode + if i%2 == 0 { + nodeType = controlplane.SupportBundleCollectionNodeTypeExternalNode + } + completed := true + if i%3 == 0 { + completed = false + } + go func() { + defer wg.Done() + nodeStatus := controlplane.SupportBundleCollectionNodeStatus{ + NodeName: nodeName, + NodeType: nodeType, + Completed: completed, + } + if nodeType == "ExternalNode" { + nodeStatus.NodeNamespace = namespace + } + status := &controlplane.SupportBundleCollectionStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: collectionName, + }, + Nodes: []controlplane.SupportBundleCollectionNodeStatus{ + nodeStatus, + }, + } + err := statusController.UpdateStatus(status) + require.NoError(t, err) + }() + } + wg.Wait() + statusPerNode, exists := statusController.statuses[collectionName] + assert.True(t, exists) + assert.Equal(t, nodesCount, len(statusPerNode)) + + // Test UpdateStatus with non-existing SupportBundleCollection + nonExistCollectionName := "no-existing-collection" + status := &controlplane.SupportBundleCollectionStatus{ + ObjectMeta: metav1.ObjectMeta{ + Name: nonExistCollectionName, + }, + Nodes: []controlplane.SupportBundleCollectionNodeStatus{ + { + NodeName: "n1", + NodeType: "Node", + Completed: true, + }, + }, + } + err := statusController.UpdateStatus(status) + require.NoError(t, err) + _, exists = statusController.statuses["no-existing-collection"] + assert.False(t, exists) +} + +func newStatusController(client *testClient) *StatusController { + supportBundleInformer := client.crdInformerFactory.Crd().V1alpha1().SupportBundleCollections() + store := bundlecollectionstore.NewSupportBundleCollectionStore() + return NewStatusController(client.crdClient, supportBundleInformer, store) +}