From 73ad77831f9eb357a498954a2153ed99fe1185bd Mon Sep 17 00:00:00 2001 From: Lan Luo Date: Mon, 1 Aug 2022 17:41:14 +0800 Subject: [PATCH] Refine Node controller to support Gateway HA In order to support Gateway active-standby mode HA, the Node controller has been refined to handle Node readiness change. There will be at most one Gateway in a given Namespace, and the Gateway controller is updated correspondingly. 1. Check Node readiness and create a Gateway CR if Node is ready and there is no existing Gateway. 2. Initilize active Gateway and Gateway candidate Nodes with annotation 'multicluster.antrea.io/gateway'. 3. Add Gateway webhook to allow at most one Gateway in a given Namespace. Signed-off-by: Lan Luo --- .../antrea-multicluster-leader-namespaced.yml | 1 - .../yamls/antrea-multicluster-member.yml | 21 ++ .../gateway_webhook.go | 68 +++++ .../gateway_webhook_test.go | 123 +++++++++ .../cmd/multicluster-controller/member.go | 7 + .../overlays/leader-ns/kustomization.yaml | 1 + .../overlays/leader-ns/webhook_patch.yaml | 9 + multicluster/config/webhook/manifests.yaml | 21 ++ .../multicluster/gateway_controller.go | 53 +--- .../multicluster/gateway_controller_test.go | 88 +----- .../multicluster/node_controller.go | 239 ++++++++++++---- .../multicluster/node_controller_test.go | 254 +++++++++++++++--- 12 files changed, 675 insertions(+), 210 deletions(-) create mode 100644 multicluster/cmd/multicluster-controller/gateway_webhook.go create mode 100644 multicluster/cmd/multicluster-controller/gateway_webhook_test.go create mode 100644 multicluster/config/overlays/leader-ns/webhook_patch.yaml diff --git a/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml b/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml index d57b67e0872..4d1c104004a 100644 --- a/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml +++ b/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml @@ -448,7 +448,6 @@ webhooks: apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration metadata: - creationTimestamp: null labels: app: antrea name: antrea-multicluster-antrea-mc-validating-webhook-configuration diff --git a/multicluster/build/yamls/antrea-multicluster-member.yml b/multicluster/build/yamls/antrea-multicluster-member.yml index 34e51aaf826..ba7f6676fe9 100644 --- a/multicluster/build/yamls/antrea-multicluster-member.yml +++ b/multicluster/build/yamls/antrea-multicluster-member.yml @@ -1100,3 +1100,24 @@ webhooks: resources: - clustersets sideEffects: None +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: antrea-mc-webhook-service + namespace: kube-system + path: /validate-multicluster-crd-antrea-io-v1alpha1-gateway + failurePolicy: Fail + name: vgateway.kb.io + rules: + - apiGroups: + - multicluster.crd.antrea.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - gateways + sideEffects: None diff --git a/multicluster/cmd/multicluster-controller/gateway_webhook.go b/multicluster/cmd/multicluster-controller/gateway_webhook.go new file mode 100644 index 00000000000..b22377c3164 --- /dev/null +++ b/multicluster/cmd/multicluster-controller/gateway_webhook.go @@ -0,0 +1,68 @@ +/* +Copyright 2022 Antrea Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "net/http" + + admissionv1 "k8s.io/api/admission/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" +) + +//+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha1-gateway,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=gateways,verbs=create;update,versions=v1alpha1,name=vgateway.kb.io,admissionReviewVersions={v1,v1beta1} + +// Gateway validator +type gatewayValidator struct { + Client client.Client + decoder *admission.Decoder + namespace string +} + +// Handle handles admission requests. +func (v *gatewayValidator) Handle(ctx context.Context, req admission.Request) admission.Response { + gateway := &mcv1alpha1.Gateway{} + err := v.decoder.Decode(req, gateway) + if err != nil { + klog.ErrorS(err, "Error while decoding Gateway", "Gateway", req.Namespace+"/"+req.Name) + return admission.Errored(http.StatusBadRequest, err) + } + + // Check if there is any existing Gateway. + gatewayList := &mcv1alpha1.GatewayList{} + if err := v.Client.List(context.TODO(), gatewayList, client.InNamespace(v.namespace)); err != nil { + klog.ErrorS(err, "Error reading Gateway", "Namespace", v.namespace) + return admission.Errored(http.StatusPreconditionFailed, err) + } + + if req.Operation == admissionv1.Create && len(gatewayList.Items) > 0 { + err := fmt.Errorf("multiple Gateways in a Namespace are not allowed") + klog.ErrorS(err, "failed to create Gateway", "Gateway", klog.KObj(gateway), "Namespace", v.namespace) + return admission.Errored(http.StatusPreconditionFailed, err) + } + return admission.Allowed("") +} + +func (v *gatewayValidator) InjectDecoder(d *admission.Decoder) error { + v.decoder = d + return nil +} diff --git a/multicluster/cmd/multicluster-controller/gateway_webhook_test.go b/multicluster/cmd/multicluster-controller/gateway_webhook_test.go new file mode 100644 index 00000000000..f36bea4a186 --- /dev/null +++ b/multicluster/cmd/multicluster-controller/gateway_webhook_test.go @@ -0,0 +1,123 @@ +/* +Copyright 2022 Antrea Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + + mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" +) + +var gatewayWebhookUnderTest *gatewayValidator + +func TestWebhookGatewayEvents(t *testing.T) { + newGateway := &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "node-1", + }, + GatewayIP: "1.2.3.4", + InternalIP: "172.168.3.4", + } + existingGateway := &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "node-2", + }, + } + + newGW, _ := json.Marshal(newGateway) + + newReq := admission.Request{ + AdmissionRequest: v1.AdmissionRequest{ + UID: "07e52e8d-4513-11e9-a716-42010a800270", + Kind: metav1.GroupVersionKind{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Kind: "Gateway", + }, + Resource: metav1.GroupVersionResource{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Resource: "Gateways", + }, + Name: "node-1", + Namespace: "default", + Operation: v1.Create, + Object: runtime.RawExtension{ + Raw: newGW, + }, + }, + } + + tests := []struct { + name string + req admission.Request + existingGateway *mcsv1alpha1.Gateway + newGateway *mcsv1alpha1.Gateway + isAllowed bool + }{ + { + name: "create a new Gateway successfully", + req: newReq, + isAllowed: true, + }, + { + name: "failed to create a Gateway when there is an existing one", + existingGateway: existingGateway, + req: newReq, + isAllowed: false, + }, + } + + newScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(newScheme)) + utilruntime.Must(k8smcsv1alpha1.AddToScheme(newScheme)) + utilruntime.Must(mcsv1alpha1.AddToScheme(newScheme)) + decoder, err := admission.NewDecoder(newScheme) + if err != nil { + klog.ErrorS(err, "Error constructing a decoder") + } + for _, tt := range tests { + fakeClient := fake.NewClientBuilder().WithScheme(newScheme).WithObjects().Build() + if tt.existingGateway != nil { + fakeClient = fake.NewClientBuilder().WithScheme(newScheme).WithObjects(tt.existingGateway).Build() + } + gatewayWebhookUnderTest = &gatewayValidator{ + Client: fakeClient, + namespace: "default"} + gatewayWebhookUnderTest.InjectDecoder(decoder) + + t.Run(tt.name, func(t *testing.T) { + response := gatewayWebhookUnderTest.Handle(context.Background(), tt.req) + assert.Equal(t, tt.isAllowed, response.Allowed) + }) + } +} diff --git a/multicluster/cmd/multicluster-controller/member.go b/multicluster/cmd/multicluster-controller/member.go index 04e16c10c4e..89341055caa 100644 --- a/multicluster/cmd/multicluster-controller/member.go +++ b/multicluster/cmd/multicluster-controller/member.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook" multiclustercontrollers "antrea.io/antrea/multicluster/controllers/multicluster" "antrea.io/antrea/pkg/log" @@ -54,6 +55,12 @@ func runMember(o *Options) error { return err } + hookServer := mgr.GetWebhookServer() + hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-gateway", + &webhook.Admission{Handler: &gatewayValidator{ + Client: mgr.GetClient(), + namespace: env.GetPodNamespace()}}) + clusterSetReconciler := multiclustercontrollers.NewMemberClusterSetReconciler(mgr.GetClient(), mgr.GetScheme(), env.GetPodNamespace(), diff --git a/multicluster/config/overlays/leader-ns/kustomization.yaml b/multicluster/config/overlays/leader-ns/kustomization.yaml index a85f54bd96c..7ef94f1cd1e 100644 --- a/multicluster/config/overlays/leader-ns/kustomization.yaml +++ b/multicluster/config/overlays/leader-ns/kustomization.yaml @@ -54,3 +54,4 @@ resources: patchesStrategicMerge: - manager_command_patch.yaml + - webhook_patch.yaml diff --git a/multicluster/config/overlays/leader-ns/webhook_patch.yaml b/multicluster/config/overlays/leader-ns/webhook_patch.yaml new file mode 100644 index 00000000000..92bd74c1649 --- /dev/null +++ b/multicluster/config/overlays/leader-ns/webhook_patch.yaml @@ -0,0 +1,9 @@ +--- +apiVersion: admissionregistration.k8s.io/v1 +kind: ValidatingWebhookConfiguration +metadata: + name: validating-webhook-configuration +webhooks: +- admissionReviewVersions: + name: vgateway.kb.io + $patch: delete diff --git a/multicluster/config/webhook/manifests.yaml b/multicluster/config/webhook/manifests.yaml index 7b3f6137d7b..d4952da4df3 100644 --- a/multicluster/config/webhook/manifests.yaml +++ b/multicluster/config/webhook/manifests.yaml @@ -76,6 +76,27 @@ webhooks: resources: - clustersets sideEffects: None +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-multicluster-crd-antrea-io-v1alpha1-gateway + failurePolicy: Fail + name: vgateway.kb.io + rules: + - apiGroups: + - multicluster.crd.antrea.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - gateways + sideEffects: None - admissionReviewVersions: - v1 - v1beta1 diff --git a/multicluster/controllers/multicluster/gateway_controller.go b/multicluster/controllers/multicluster/gateway_controller.go index 247e03de917..5efe37cad63 100644 --- a/multicluster/controllers/multicluster/gateway_controller.go +++ b/multicluster/controllers/multicluster/gateway_controller.go @@ -99,7 +99,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct }, } - createOrUpdate := func(gwIP string, gwInfo *mcsv1alpha1.GatewayInfo) error { + createOrUpdate := func(gwIP string) error { existingResExport := &mcsv1alpha1.ResourceExport{} if err := commonArea.Get(ctx, resExportNamespacedName, existingResExport); err != nil { if !apierrors.IsNotFound(err) { @@ -110,7 +110,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } return nil } - if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, gwInfo); err != nil { + if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, &mcsv1alpha1.GatewayInfo{GatewayIP: gwIP}); err != nil { return err } return nil @@ -121,52 +121,18 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if !apierrors.IsNotFound(err) { return ctrl.Result{}, err } - gwInfo, err := r.getLastCreatedGateway() - if err != nil { - klog.ErrorS(err, "Failed to get Gateways") - return ctrl.Result{}, err - } - if gwInfo == nil { - // When the last Gateway is deleted, we will remove the ClusterInfo kind of ResourceExport - if err := commonArea.Delete(ctx, resExport, &client.DeleteOptions{}); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) - } - return ctrl.Result{}, nil - } - // When there are still Gateways exist, we should create or update existing ResourceExport - // with the latest Gateway in remaining Gateways. - if err := createOrUpdate(gwInfo.GatewayIP, gwInfo); err != nil { - return ctrl.Result{}, err + if err := commonArea.Delete(ctx, resExport, &client.DeleteOptions{}); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) } return ctrl.Result{}, nil } - if err := createOrUpdate(gw.GatewayIP, nil); err != nil { + + if err := createOrUpdate(gw.GatewayIP); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } -func (r *GatewayReconciler) getLastCreatedGateway() (*mcsv1alpha1.GatewayInfo, error) { - gws := &mcsv1alpha1.GatewayList{} - if err := r.Client.List(ctx, gws, &client.ListOptions{}); err != nil { - return nil, err - } - if len(gws.Items) == 0 { - return nil, nil - } - - // Comparing Gateway's CreationTimestamp to get the last created Gateway. - lastCreatedGW := gws.Items[0] - for _, gw := range gws.Items { - if lastCreatedGW.CreationTimestamp.Before(&gw.CreationTimestamp) { - lastCreatedGW = gw - } - } - - // Make sure we only return the last created Gateway for now. - return &mcsv1alpha1.GatewayInfo{GatewayIP: lastCreatedGW.GatewayIP}, nil -} - func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request, commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gwInfo *mcsv1alpha1.GatewayInfo) error { resExportSpec := mcsv1alpha1.ResourceExportSpec{ @@ -175,13 +141,6 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R Name: r.localClusterID, Namespace: r.namespace, } - var err error - if gwInfo == nil { - gwInfo, err = r.getLastCreatedGateway() - if err != nil { - return err - } - } resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{ ClusterID: r.localClusterID, ServiceCIDR: r.serviceCIDR, diff --git a/multicluster/controllers/multicluster/gateway_controller_test.go b/multicluster/controllers/multicluster/gateway_controller_test.go index edad64867eb..d2f78d7ae46 100644 --- a/multicluster/controllers/multicluster/gateway_controller_test.go +++ b/multicluster/controllers/multicluster/gateway_controller_test.go @@ -17,7 +17,6 @@ limitations under the License. package multicluster import ( - "fmt" "reflect" "testing" "time" @@ -39,7 +38,6 @@ var ( clusterID = "cluster-a" gw1CreationTime = metav1.NewTime(time.Now()) - gw2CreationTime = metav1.NewTime(time.Now().Add(10 * time.Minute)) gwNode1 = mcsv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ @@ -50,15 +48,7 @@ var ( GatewayIP: "10.10.10.10", InternalIP: "172.11.10.1", } - gwNode2 = mcsv1alpha1.Gateway{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node-2", - Namespace: "default", - CreationTimestamp: gw2CreationTime, - }, - GatewayIP: "10.8.8.8", - InternalIP: "172.11.10.1", - } + existingResExport = &mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Name: "cluster-a-clusterinfo", @@ -79,26 +69,6 @@ var ( }, }, } - existingResExport2 = &mcsv1alpha1.ResourceExport{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster-a-clusterinfo", - Namespace: leaderNamespace, - }, - Spec: mcsv1alpha1.ResourceExportSpec{ - Name: clusterID, - Namespace: "default", - Kind: common.ClusterInfoKind, - ClusterInfo: &mcsv1alpha1.ClusterInfo{ - ServiceCIDR: serviceCIDR, - ClusterID: clusterID, - GatewayInfos: []mcsv1alpha1.GatewayInfo{ - { - GatewayIP: "101.101.101.101", - }, - }, - }, - }, - } ) func TestGatewayReconciler(t *testing.T) { @@ -123,39 +93,6 @@ func TestGatewayReconciler(t *testing.T) { gateway: []mcsv1alpha1.Gateway{ gwNode1, }, - resExport: existingResExport, - expectedInfo: []mcsv1alpha1.GatewayInfo{ - { - GatewayIP: "10.10.10.10", - }, - }, - }, - { - name: "update a ResourceExport successfully by creating a new Gateway", - namespacedName: types.NamespacedName{ - Namespace: "default", - Name: "node-2", - }, - gateway: []mcsv1alpha1.Gateway{ - gwNode1, gwNode2, - }, - resExport: existingResExport, - expectedInfo: []mcsv1alpha1.GatewayInfo{ - { - GatewayIP: "10.8.8.8", - }, - }, - }, - { - name: "update a ResourceExport successfully by deleting a Gateway", - namespacedName: types.NamespacedName{ - Namespace: "default", - Name: "node-2", - }, - gateway: []mcsv1alpha1.Gateway{ - gwNode1, - }, - resExport: existingResExport2, expectedInfo: []mcsv1alpha1.GatewayInfo{ { GatewayIP: "10.10.10.10", @@ -210,27 +147,20 @@ func TestGatewayReconciler(t *testing.T) { if _, err := r.Reconcile(ctx, req); err != nil { t.Errorf("Gateway Reconciler should handle ResourceExports events successfully but got error = %v", err) } else { - gws := &mcsv1alpha1.GatewayList{} - _ = fakeClient.List(ctx, gws, &client.ListOptions{}) - fmt.Printf("output list: %v", gws) ciExport := mcsv1alpha1.ResourceExport{} ciExportName := types.NamespacedName{ Namespace: leaderNamespace, Name: newClusterInfoResourceExportName(localClusterID), } err := fakeRemoteClient.Get(ctx, ciExportName, &ciExport) - if err == nil { - if !reflect.DeepEqual(ciExport.Spec.ClusterInfo.GatewayInfos, tt.expectedInfo) { - t.Errorf("Expected GatewayInfos are %v but got %v", tt.expectedInfo, ciExport.Spec.ClusterInfo.GatewayInfos) - } - } else { - if tt.isDelete { - if !apierrors.IsNotFound(err) { - t.Errorf("Gateway Reconciler expects not found error but got error = %v", err) - } - } else { - t.Errorf("Expected a ClusterInfo kind of ResourceExport but got error = %v", err) - } + if tt.isDelete && !apierrors.IsNotFound(err) { + t.Errorf("Gateway Reconciler expects not found error but got error = %v", err) + } + if err == nil && !reflect.DeepEqual(ciExport.Spec.ClusterInfo.GatewayInfos, tt.expectedInfo) { + t.Errorf("Expected GatewayInfos are %v but got %v", tt.expectedInfo, ciExport.Spec.ClusterInfo.GatewayInfos) + } + if !tt.isDelete && apierrors.IsNotFound(err) { + t.Errorf("Expected a ClusterInfo kind of ResourceExport but got error = %v", err) } } }) diff --git a/multicluster/controllers/multicluster/node_controller.go b/multicluster/controllers/multicluster/node_controller.go index dc03f8ecc77..8e836aafeed 100644 --- a/multicluster/controllers/multicluster/node_controller.go +++ b/multicluster/controllers/multicluster/node_controller.go @@ -37,18 +37,22 @@ import ( type ( // NodeReconciler is for member cluster only. - // It will create a Gateway object if a Node has an annotation `multicluster.antrea.io/gateway:true` - // and update corresponding Gateway if any subnets changes. NodeReconciler struct { client.Client - Scheme *runtime.Scheme - namespace string - precedence mcsv1alpha1.Precedence + Scheme *runtime.Scheme + namespace string + precedence mcsv1alpha1.Precedence + gatewayCandidates map[string]bool + activeGateway string + initialized bool } ) -// NewNodeReconciler creates a NodeReconciler to watch Node object changes and create a -// corresponding Gateway if the Node has the annotation `multicluster.antrea.io/gateway:true`. +// NewNodeReconciler creates a NodeReconciler to watch Node resource changes. +// It's responsible for creating a Gateway for the first ready Node with +// annotation `multicluster.antrea.io/gateway:true` if there is no existing Gateway. +// It guarantees there is always only one Gateway CR when there are multiple Nodes +// with annotation `multicluster.antrea.io/gateway:true`. func NewNodeReconciler( client client.Client, scheme *runtime.Scheme, @@ -58,10 +62,11 @@ func NewNodeReconciler( precedence = mcsv1alpha1.PrecedenceInternal } reconciler := &NodeReconciler{ - Client: client, - Scheme: scheme, - namespace: namespace, - precedence: precedence, + Client: client, + Scheme: scheme, + namespace: namespace, + precedence: precedence, + gatewayCandidates: make(map[string]bool), } return reconciler } @@ -73,65 +78,194 @@ func NewNodeReconciler( func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { klog.V(2).InfoS("Reconciling Node", "node", req.Name) + if !r.initialized { + if err := r.initialize(); err != nil { + return ctrl.Result{}, err + } + r.initialized = true + } gw := &mcsv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: req.Name, Namespace: r.namespace, }, } - // When the Node is annotated with 'multicluster.antrea.io/gateway=true' as a Gateway: - // - Delete the Gateway if the Node is deleted - // - Update the Gateway if Node's InternalIP or GatewayIP is updated - // - Create a new Gateway if there is no existing Gateway + + noActiveGateway := r.activeGateway == "" + isActiveGateway := r.activeGateway == req.Name + stillGatewayNode := false + node := &corev1.Node{} if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil { - if apierrors.IsNotFound(err) { - err := r.Client.Delete(ctx, gw, &client.DeleteOptions{}) - return ctrl.Result{}, client.IgnoreNotFound(err) + if !apierrors.IsNotFound(err) { + klog.ErrorS(err, "Failed to get Node", "node", req.Name) + return ctrl.Result{}, err } - klog.ErrorS(err, "Failed to get Node", "node", req.Name) - return ctrl.Result{}, err + } else { + _, hasGWAnnotation := node.Annotations[common.GatewayAnnotation] + stillGatewayNode = hasGWAnnotation } - _, isGW := node.Annotations[common.GatewayAnnotation] - var err error - gwNamespacedName := types.NamespacedName{ - Name: node.Name, - Namespace: r.namespace, + if stillGatewayNode { + r.gatewayCandidates[req.Name] = true + } else { + delete(r.gatewayCandidates, req.Name) } - // TODO: cache might be stale. Need to revisit here and other reconcilers to - // check if we can improve this with 'Owns' or other methods. - var gwIP, internalIP string - if isGW { - if internalIP, gwIP, err = r.getGatawayNodeIP(node); err != nil { - klog.ErrorS(err, "There is no valid Gateway IP for Node, will retry later when there is any new Node update", "node", node.Name) - return ctrl.Result{}, nil + var err error + var isValidGateway bool + + if stillGatewayNode { + gw.InternalIP, gw.GatewayIP, err = r.getGatawayNodeIP(node) + if err != nil { + klog.ErrorS(err, "There is no valid Gateway IP for Node", "node", node.Name) + } else { + isValidGateway = true } } - if err := r.Client.Get(ctx, gwNamespacedName, gw); err != nil { - if apierrors.IsNotFound(err) && isGW { - gw.GatewayIP = gwIP - gw.InternalIP = internalIP - if err := r.Client.Create(ctx, gw, &client.CreateOptions{}); err != nil { + + if isActiveGateway { + if !isValidGateway || !isReadyNode(node) { + if err := r.recreateActiveGateway(ctx, gw); err != nil { + return ctrl.Result{}, err + } + } else { + if err := r.updateActiveGateway(ctx, gw); err != nil { return ctrl.Result{}, err } - return ctrl.Result{}, nil } - return ctrl.Result{}, client.IgnoreNotFound(err) + return ctrl.Result{}, nil } - if !isGW { - err := r.Client.Delete(ctx, gw, &client.DeleteOptions{}) - return ctrl.Result{}, client.IgnoreNotFound(err) + if noActiveGateway && isValidGateway && isReadyNode(node) { + if err := r.createGateway(gw); err != nil { + return ctrl.Result{}, err + } } + return ctrl.Result{}, nil +} - gw.GatewayIP = gwIP - gw.InternalIP = internalIP - if err := r.Client.Update(ctx, gw, &client.UpdateOptions{}); err != nil { - return ctrl.Result{}, err +// initialize initializes 'activeGateway' and 'gatewayCandidates' and removes +// stale Gateway during controller startup. +func (r *NodeReconciler) initialize() error { + ctx := context.Background() + nodeList := &corev1.NodeList{} + if err := r.Client.List(ctx, nodeList, &client.ListOptions{}); err != nil { + return err } - return ctrl.Result{}, nil + + gwList := &mcsv1alpha1.GatewayList{} + if err := r.Client.List(ctx, gwList, &client.ListOptions{}); err != nil { + return err + } + // Gateway webhook guarantees that there is at most one Gateway in the member cluster. + if len(gwList.Items) > 0 { + existingGWName := gwList.Items[0].Name + node := &corev1.Node{} + if err := r.Client.Get(ctx, types.NamespacedName{Name: existingGWName}, node); err != nil { + if !apierrors.IsNotFound(err) { + return err + } + staleGateway := &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: r.namespace, + Name: existingGWName}, + } + err := r.Client.Delete(ctx, staleGateway, &client.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } else { + r.activeGateway = existingGWName + } + } + for _, n := range nodeList.Items { + if _, isGW := n.Annotations[common.GatewayAnnotation]; isGW { + r.gatewayCandidates[n.Name] = true + } + } + return nil +} + +func (r *NodeReconciler) updateActiveGateway(ctx context.Context, newGateway *mcsv1alpha1.Gateway) error { + existingGW := &mcsv1alpha1.Gateway{} + // TODO: cache might be stale. Need to revisit here and other reconcilers to + // check if we can improve this with 'Owns' or other methods. + if err := r.Client.Get(ctx, types.NamespacedName{Name: newGateway.Name, Namespace: r.namespace}, existingGW); err != nil { + return err + } + if existingGW.GatewayIP == newGateway.GatewayIP && existingGW.InternalIP == newGateway.InternalIP { + return nil + } + existingGW.GatewayIP = newGateway.GatewayIP + existingGW.InternalIP = newGateway.InternalIP + // If the Gateway version in the client cache is stale, the update operation will fail, + // then the reconciler will retry with latest state again. + if err := r.Client.Update(ctx, existingGW, &client.UpdateOptions{}); err != nil { + return err + } + return nil +} + +// recreateActiveGateway will delete the existing Gateway CR and create a new Gateway +// from the pool of Gateway candidates. +func (r *NodeReconciler) recreateActiveGateway(ctx context.Context, gateway *mcsv1alpha1.Gateway) error { + err := r.Client.Delete(ctx, gateway, &client.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + r.activeGateway = "" + // Check remaining Gateway candidates and create a new Gateway. + newGateway, err := r.getValidGatewayFromCandidates() + if err != nil { + return err + } + if newGateway != nil { + return r.createGateway(newGateway) + } + return nil +} + +// getValidGatewayFromCandidates picks a valid Node from Gateway candidates and +// creates a Gateway. It returns no error if no good Gateway candidate. +func (r *NodeReconciler) getValidGatewayFromCandidates() (*mcsv1alpha1.Gateway, error) { + var activeGateway *mcsv1alpha1.Gateway + var internalIP, gwIP string + var err error + gatewayNode := &corev1.Node{} + for name := range r.gatewayCandidates { + if err = r.Client.Get(ctx, types.NamespacedName{Name: name}, gatewayNode); err == nil { + if !isReadyNode(gatewayNode) { + continue + } + if internalIP, gwIP, err = r.getGatawayNodeIP(gatewayNode); err != nil { + klog.V(2).ErrorS(err, "Node has no valid IP", "node", gatewayNode.Name) + continue + } + activeGateway = &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: gatewayNode.Name, + Namespace: r.namespace, + }, + GatewayIP: gwIP, + InternalIP: internalIP, + } + klog.InfoS("Found good Gateway candidate", "node", gatewayNode.Name) + return activeGateway, nil + } + if !apierrors.IsNotFound(err) { + return nil, err + } + } + return nil, nil +} + +func (r *NodeReconciler) createGateway(gateway *mcsv1alpha1.Gateway) error { + if err := r.Client.Create(ctx, gateway, &client.CreateOptions{}); err != nil { + return err + } + r.activeGateway = gateway.Name + return nil } func (r *NodeReconciler) getGatawayNodeIP(node *corev1.Node) (string, string, error) { @@ -172,3 +306,14 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { }). Complete(r) } + +func isReadyNode(node *corev1.Node) bool { + var nodeIsReady bool + for _, s := range node.Status.Conditions { + if s.Type == corev1.NodeReady && s.Status == corev1.ConditionTrue { + nodeIsReady = true + break + } + } + return nodeIsReady +} diff --git a/multicluster/controllers/multicluster/node_controller_test.go b/multicluster/controllers/multicluster/node_controller_test.go index b3ed403f0ed..8dfed6e7e18 100644 --- a/multicluster/controllers/multicluster/node_controller_test.go +++ b/multicluster/controllers/multicluster/node_controller_test.go @@ -32,8 +32,17 @@ import ( "antrea.io/antrea/multicluster/controllers/multicluster/common" ) -func TestNodeReconciler(t *testing.T) { - node1 := &corev1.Node{ +var ( + node1 *corev1.Node + node2 *corev1.Node + node3 *corev1.Node + node4 *corev1.Node + updatedGateway2 *mcsv1alpha1.Gateway + gateway3 *mcsv1alpha1.Gateway +) + +func initializeCommonData() { + node1 = &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node-1", Annotations: map[string]string{ @@ -51,10 +60,59 @@ func TestNodeReconciler(t *testing.T) { Address: "172.11.10.1", }, }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + + node2 = node1.DeepCopy() + node2.Name = "node-2" + node2.Status.Addresses = []corev1.NodeAddress{ + { + Type: corev1.NodeExternalIP, + Address: "10.10.10.12", + }, + { + Type: corev1.NodeInternalIP, + Address: "172.11.10.2", + }, + } + + node3 = node1.DeepCopy() + node3.Name = "node-3" + node3.Status.Conditions = []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + } + + node4 = node1.DeepCopy() + node4.Name = "node-4" + node4.Annotations = map[string]string{ + common.GatewayAnnotation: "true", + common.GatewayIPAnnotation: "invalid-gatewayip", + } + + updatedGateway2 = &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-2", + Namespace: "default", }, + GatewayIP: "10.10.10.12", + InternalIP: "172.11.10.2", } - node1NoValidUpdate := *node1 - node1NoValidUpdate.Labels = map[string]string{"hostname.k8s.io": "node-1"} + + gateway3 = gwNode1.DeepCopy() + gateway3.Name = "node-3" +} + +func TestNodeReconciler(t *testing.T) { + initializeCommonData() node1NoAnnotation := *node1 node1NoAnnotation.Annotations = map[string]string{} node1WithIPAnnotation := *node1 @@ -62,26 +120,41 @@ func TestNodeReconciler(t *testing.T) { common.GatewayAnnotation: "true", common.GatewayIPAnnotation: "11.11.10.10", } + gateway4 := gwNode1.DeepCopy() + gateway4.Name = "node-4" + newGateway1 := gwNode1.DeepCopy() + newGateway1.GatewayIP = "172.11.10.1" + newNode1 := node1.DeepCopy() + newNode1.Name = "node-1" + newNode1.Status.Addresses = []corev1.NodeAddress{ + { + Type: corev1.NodeHostName, + Address: "node-1", + }, + } tests := []struct { - name string - nodes []*corev1.Node - req reconcile.Request - existingGW *mcsv1alpha1.Gateway - expectedGW *mcsv1alpha1.Gateway - isDelete bool - expectedErr string + name string + nodes []*corev1.Node + req reconcile.Request + precedence mcsv1alpha1.Precedence + existingGW *mcsv1alpha1.Gateway + expectedGW *mcsv1alpha1.Gateway + activeGateway string + candidates map[string]bool + isDelete bool }{ { name: "create a Gateway successfully", nodes: []*corev1.Node{node1}, - req: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "", Name: node1.Name}}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, expectedGW: &gwNode1, + precedence: mcsv1alpha1.PrecedencePublic, }, { name: "update a Gateway successfully by changing GatewayIP", nodes: []*corev1.Node{&node1WithIPAnnotation}, - req: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "", Name: node1.Name}}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, existingGW: &gwNode1, expectedGW: &mcsv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ @@ -91,20 +164,61 @@ func TestNodeReconciler(t *testing.T) { GatewayIP: "11.11.10.10", InternalIP: "172.11.10.1", }, + activeGateway: "node-1", + precedence: mcsv1alpha1.PrecedencePublic, }, { - name: "remove a Gateway Node to delete a Gateway successfully", - nodes: []*corev1.Node{}, - req: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "", Name: node1.Name}}, - existingGW: &gwNode1, - isDelete: true, + name: "remove a Gateway Node to delete a Gateway successfully", + nodes: []*corev1.Node{}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, + existingGW: &gwNode1, + activeGateway: "node-1", + isDelete: true, + precedence: mcsv1alpha1.PrecedencePublic, }, { - name: "remove a Gateway Node's annotation to delete a Gateway successfully", - nodes: []*corev1.Node{&node1NoAnnotation}, - req: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "", Name: node1.Name}}, + name: "remove a Gateway Node's annotation to delete a Gateway successfully", + nodes: []*corev1.Node{&node1NoAnnotation}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, + existingGW: &gwNode1, + activeGateway: "node-1", + isDelete: true, + precedence: mcsv1alpha1.PrecedencePublic, + }, + { + name: "remote a Gateway due to no IPs", + nodes: []*corev1.Node{newNode1}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: newNode1.Name}}, existingGW: &gwNode1, isDelete: true, + precedence: mcsv1alpha1.PrecedencePrivate, + }, + { + name: "remove a Gateway Node to create a new Gateway from candidates successfully", + nodes: []*corev1.Node{node2, node4}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, + existingGW: &gwNode1, + expectedGW: updatedGateway2, + activeGateway: "node-1", + precedence: mcsv1alpha1.PrecedencePublic, + }, + { + name: "create a new Gateway successfully when active Gateway Node is not ready", + nodes: []*corev1.Node{node2, node3}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node3.Name}}, + existingGW: gateway3, + expectedGW: updatedGateway2, + activeGateway: "node-3", + precedence: mcsv1alpha1.PrecedencePublic, + }, + { + name: "create a new Gateway successfully when active Gateway Node has no valid IP", + nodes: []*corev1.Node{node2, node4}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node4.Name}}, + existingGW: gateway4, + expectedGW: updatedGateway2, + activeGateway: "node-4", + precedence: mcsv1alpha1.PrecedencePublic, }, } for _, tt := range tests { @@ -117,27 +231,95 @@ func TestNodeReconciler(t *testing.T) { obj = append(obj, tt.existingGW) } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(obj...).Build() - r := NewNodeReconciler(fakeClient, scheme, "default", mcsv1alpha1.PrecedencePublic) + r := NewNodeReconciler(fakeClient, scheme, "default", tt.precedence) + r.activeGateway = tt.activeGateway if _, err := r.Reconcile(ctx, tt.req); err != nil { - if tt.expectedErr != "" { - assert.Contains(t, err.Error(), tt.expectedErr) - } else { - t.Errorf("Node Reconciler should handle Node events successfully but got error = %v", err) - } + t.Errorf("Node Reconciler should handle Node events successfully but got error = %v", err) } else { newGW := &mcsv1alpha1.Gateway{} - gwNamespcedName := types.NamespacedName{Name: "node-1", Namespace: "default"} + gwNamespcedName := types.NamespacedName{Name: tt.req.Name, Namespace: "default"} + if tt.expectedGW != nil { + gwNamespcedName = types.NamespacedName{Name: tt.expectedGW.Name, Namespace: "default"} + } err := fakeClient.Get(ctx, gwNamespcedName, newGW) - if err != nil { - if tt.isDelete { - if !apierrors.IsNotFound(err) { - t.Errorf("Expected to get not found error but got err: %v", err) - } - } else { + if tt.isDelete { + if err == nil || (err != nil && !apierrors.IsNotFound(err)) { + t.Errorf("Expected to get not found error but got err: %v", err) + } + } else { + if err != nil { t.Errorf("Expected to get Gateway but got err: %v", err) + } else { + if tt.expectedGW.GatewayIP != newGW.GatewayIP || tt.expectedGW.InternalIP != newGW.InternalIP { + t.Errorf("Expected Gateway %v but got: %v", tt.expectedGW, newGW) + } + } + } + } + }) + } +} + +func TestInitialize(t *testing.T) { + initializeCommonData() + node5 := node1.DeepCopy() + node5.Name = "node-5" + node5.Annotations = map[string]string{} + tests := []struct { + name string + nodes []*corev1.Node + req reconcile.Request + existingGW *mcsv1alpha1.Gateway + expectedActiveGateway string + isDelete bool + candidatesSize int + }{ + { + name: "initialize and set active Gateway successfully", + nodes: []*corev1.Node{node1, node2, node5}, + existingGW: &gwNode1, + expectedActiveGateway: "node-1", + candidatesSize: 2, + }, + { + name: "initialize successfully without Gateway", + nodes: []*corev1.Node{node3, node4, node5}, + expectedActiveGateway: "", + candidatesSize: 2, + }, + { + name: "initialize and delete Gateway successfully", + nodes: []*corev1.Node{node1, node5}, + existingGW: gateway3, + isDelete: true, + expectedActiveGateway: "", + candidatesSize: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var obj []client.Object + for _, n := range tt.nodes { + obj = append(obj, n) + } + if tt.existingGW != nil { + obj = append(obj, tt.existingGW) + } + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(obj...).Build() + r := NewNodeReconciler(fakeClient, scheme, "default", mcsv1alpha1.PrecedencePublic) + if err := r.initialize(); err != nil { + t.Errorf("Expected initialize() successfully but got err: %v", err) + } else { + assert.Equal(t, tt.expectedActiveGateway, r.activeGateway) + assert.Equal(t, tt.candidatesSize, len(r.gatewayCandidates)) + if tt.isDelete { + deletedGW := &mcsv1alpha1.Gateway{} + gwNamespcedName := types.NamespacedName{Name: tt.existingGW.Name, Namespace: "default"} + err := fakeClient.Get(ctx, gwNamespcedName, deletedGW) + if !apierrors.IsNotFound(err) { + t.Errorf("Expected to get not found error but got err: %v", err) } - } else if tt.expectedGW.GatewayIP != newGW.GatewayIP || tt.expectedGW.InternalIP != newGW.InternalIP { - t.Errorf("Expected Gateway %v but got: %v", tt.expectedGW, newGW) } } })