From 204f359c9b9e75af95cad1850c77688138f3ff80 Mon Sep 17 00:00:00 2001 From: Lan Luo Date: Mon, 1 Aug 2022 17:41:14 +0800 Subject: [PATCH] Refine Node controller to support Gateway HA In order to support Gateway active-standby mode HA, the Node controller has been refined to handle Node readiness change. There will be at most one Gateway in a given Namespace, and the Gateway controller is updated correspondingly. 1. Check Node readiness and create a Gateway CR if Node is ready and there is no existing Gateway. 2. Initilize active Gateway and Gateway candidate Nodes with annotation 'multicluster.antrea.io/gateway'. 3. Add Gateway webhook to allow at most one Gateway in a given Namespace. Signed-off-by: Lan Luo --- .../antrea-multicluster-leader-namespaced.yml | 1 - .../yamls/antrea-multicluster-member.yml | 21 ++ .../gateway_webhook.go | 68 +++++ .../gateway_webhook_test.go | 123 ++++++++ .../cmd/multicluster-controller/member.go | 7 + .../overlays/leader-ns/kustomization.yaml | 1 + .../overlays/leader-ns/webhook_patch.yaml | 9 + multicluster/config/webhook/manifests.yaml | 21 ++ .../multicluster/gateway_controller.go | 53 +--- .../multicluster/gateway_controller_test.go | 87 +----- .../multicluster/node_controller.go | 264 +++++++++++++++--- .../multicluster/node_controller_test.go | 246 +++++++++++++--- 12 files changed, 697 insertions(+), 204 deletions(-) create mode 100644 multicluster/cmd/multicluster-controller/gateway_webhook.go create mode 100644 multicluster/cmd/multicluster-controller/gateway_webhook_test.go create mode 100644 multicluster/config/overlays/leader-ns/webhook_patch.yaml diff --git a/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml b/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml index 61b05535d18..cd95698d6bd 100644 --- a/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml +++ b/multicluster/build/yamls/antrea-multicluster-leader-namespaced.yml @@ -451,7 +451,6 @@ webhooks: apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration metadata: - creationTimestamp: null labels: app: antrea name: antrea-multicluster-antrea-mc-validating-webhook-configuration diff --git a/multicluster/build/yamls/antrea-multicluster-member.yml b/multicluster/build/yamls/antrea-multicluster-member.yml index 45cf629373b..a6469d35f81 100644 --- a/multicluster/build/yamls/antrea-multicluster-member.yml +++ b/multicluster/build/yamls/antrea-multicluster-member.yml @@ -1098,3 +1098,24 @@ webhooks: resources: - clustersets sideEffects: None +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: antrea-mc-webhook-service + namespace: kube-system + path: /validate-multicluster-crd-antrea-io-v1alpha1-gateway + failurePolicy: Fail + name: vgateway.kb.io + rules: + - apiGroups: + - multicluster.crd.antrea.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - gateways + sideEffects: None diff --git a/multicluster/cmd/multicluster-controller/gateway_webhook.go b/multicluster/cmd/multicluster-controller/gateway_webhook.go new file mode 100644 index 00000000000..b22377c3164 --- /dev/null +++ b/multicluster/cmd/multicluster-controller/gateway_webhook.go @@ -0,0 +1,68 @@ +/* +Copyright 2022 Antrea Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "net/http" + + admissionv1 "k8s.io/api/admission/v1" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" +) + +//+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha1-gateway,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=gateways,verbs=create;update,versions=v1alpha1,name=vgateway.kb.io,admissionReviewVersions={v1,v1beta1} + +// Gateway validator +type gatewayValidator struct { + Client client.Client + decoder *admission.Decoder + namespace string +} + +// Handle handles admission requests. +func (v *gatewayValidator) Handle(ctx context.Context, req admission.Request) admission.Response { + gateway := &mcv1alpha1.Gateway{} + err := v.decoder.Decode(req, gateway) + if err != nil { + klog.ErrorS(err, "Error while decoding Gateway", "Gateway", req.Namespace+"/"+req.Name) + return admission.Errored(http.StatusBadRequest, err) + } + + // Check if there is any existing Gateway. + gatewayList := &mcv1alpha1.GatewayList{} + if err := v.Client.List(context.TODO(), gatewayList, client.InNamespace(v.namespace)); err != nil { + klog.ErrorS(err, "Error reading Gateway", "Namespace", v.namespace) + return admission.Errored(http.StatusPreconditionFailed, err) + } + + if req.Operation == admissionv1.Create && len(gatewayList.Items) > 0 { + err := fmt.Errorf("multiple Gateways in a Namespace are not allowed") + klog.ErrorS(err, "failed to create Gateway", "Gateway", klog.KObj(gateway), "Namespace", v.namespace) + return admission.Errored(http.StatusPreconditionFailed, err) + } + return admission.Allowed("") +} + +func (v *gatewayValidator) InjectDecoder(d *admission.Decoder) error { + v.decoder = d + return nil +} diff --git a/multicluster/cmd/multicluster-controller/gateway_webhook_test.go b/multicluster/cmd/multicluster-controller/gateway_webhook_test.go new file mode 100644 index 00000000000..f36bea4a186 --- /dev/null +++ b/multicluster/cmd/multicluster-controller/gateway_webhook_test.go @@ -0,0 +1,123 @@ +/* +Copyright 2022 Antrea Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" + k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + + mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" +) + +var gatewayWebhookUnderTest *gatewayValidator + +func TestWebhookGatewayEvents(t *testing.T) { + newGateway := &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "node-1", + }, + GatewayIP: "1.2.3.4", + InternalIP: "172.168.3.4", + } + existingGateway := &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "node-2", + }, + } + + newGW, _ := json.Marshal(newGateway) + + newReq := admission.Request{ + AdmissionRequest: v1.AdmissionRequest{ + UID: "07e52e8d-4513-11e9-a716-42010a800270", + Kind: metav1.GroupVersionKind{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Kind: "Gateway", + }, + Resource: metav1.GroupVersionResource{ + Group: "multicluster.crd.antrea.io", + Version: "v1alpha1", + Resource: "Gateways", + }, + Name: "node-1", + Namespace: "default", + Operation: v1.Create, + Object: runtime.RawExtension{ + Raw: newGW, + }, + }, + } + + tests := []struct { + name string + req admission.Request + existingGateway *mcsv1alpha1.Gateway + newGateway *mcsv1alpha1.Gateway + isAllowed bool + }{ + { + name: "create a new Gateway successfully", + req: newReq, + isAllowed: true, + }, + { + name: "failed to create a Gateway when there is an existing one", + existingGateway: existingGateway, + req: newReq, + isAllowed: false, + }, + } + + newScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(newScheme)) + utilruntime.Must(k8smcsv1alpha1.AddToScheme(newScheme)) + utilruntime.Must(mcsv1alpha1.AddToScheme(newScheme)) + decoder, err := admission.NewDecoder(newScheme) + if err != nil { + klog.ErrorS(err, "Error constructing a decoder") + } + for _, tt := range tests { + fakeClient := fake.NewClientBuilder().WithScheme(newScheme).WithObjects().Build() + if tt.existingGateway != nil { + fakeClient = fake.NewClientBuilder().WithScheme(newScheme).WithObjects(tt.existingGateway).Build() + } + gatewayWebhookUnderTest = &gatewayValidator{ + Client: fakeClient, + namespace: "default"} + gatewayWebhookUnderTest.InjectDecoder(decoder) + + t.Run(tt.name, func(t *testing.T) { + response := gatewayWebhookUnderTest.Handle(context.Background(), tt.req) + assert.Equal(t, tt.isAllowed, response.Allowed) + }) + } +} diff --git a/multicluster/cmd/multicluster-controller/member.go b/multicluster/cmd/multicluster-controller/member.go index 5315d72a98f..f72508c5e09 100644 --- a/multicluster/cmd/multicluster-controller/member.go +++ b/multicluster/cmd/multicluster-controller/member.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/cobra" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook" multiclustercontrollers "antrea.io/antrea/multicluster/controllers/multicluster" "antrea.io/antrea/pkg/log" @@ -54,6 +55,12 @@ func runMember(o *Options) error { return err } + hookServer := mgr.GetWebhookServer() + hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-gateway", + &webhook.Admission{Handler: &gatewayValidator{ + Client: mgr.GetClient(), + namespace: env.GetPodNamespace()}}) + clusterSetReconciler := multiclustercontrollers.NewMemberClusterSetReconciler(mgr.GetClient(), mgr.GetScheme(), env.GetPodNamespace(), diff --git a/multicluster/config/overlays/leader-ns/kustomization.yaml b/multicluster/config/overlays/leader-ns/kustomization.yaml index a85f54bd96c..7ef94f1cd1e 100644 --- a/multicluster/config/overlays/leader-ns/kustomization.yaml +++ b/multicluster/config/overlays/leader-ns/kustomization.yaml @@ -54,3 +54,4 @@ resources: patchesStrategicMerge: - manager_command_patch.yaml + - webhook_patch.yaml diff --git a/multicluster/config/overlays/leader-ns/webhook_patch.yaml b/multicluster/config/overlays/leader-ns/webhook_patch.yaml new file mode 100644 index 00000000000..92bd74c1649 --- /dev/null +++ b/multicluster/config/overlays/leader-ns/webhook_patch.yaml @@ -0,0 +1,9 @@ +--- +apiVersion: admissionregistration.k8s.io/v1 +kind: ValidatingWebhookConfiguration +metadata: + name: validating-webhook-configuration +webhooks: +- admissionReviewVersions: + name: vgateway.kb.io + $patch: delete diff --git a/multicluster/config/webhook/manifests.yaml b/multicluster/config/webhook/manifests.yaml index c3d94e305e4..20c7d16f2b0 100644 --- a/multicluster/config/webhook/manifests.yaml +++ b/multicluster/config/webhook/manifests.yaml @@ -75,6 +75,27 @@ webhooks: resources: - clustersets sideEffects: None +- admissionReviewVersions: + - v1 + - v1beta1 + clientConfig: + service: + name: webhook-service + namespace: system + path: /validate-multicluster-crd-antrea-io-v1alpha1-gateway + failurePolicy: Fail + name: vgateway.kb.io + rules: + - apiGroups: + - multicluster.crd.antrea.io + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - gateways + sideEffects: None - admissionReviewVersions: - v1 - v1beta1 diff --git a/multicluster/controllers/multicluster/gateway_controller.go b/multicluster/controllers/multicluster/gateway_controller.go index 247e03de917..5efe37cad63 100644 --- a/multicluster/controllers/multicluster/gateway_controller.go +++ b/multicluster/controllers/multicluster/gateway_controller.go @@ -99,7 +99,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct }, } - createOrUpdate := func(gwIP string, gwInfo *mcsv1alpha1.GatewayInfo) error { + createOrUpdate := func(gwIP string) error { existingResExport := &mcsv1alpha1.ResourceExport{} if err := commonArea.Get(ctx, resExportNamespacedName, existingResExport); err != nil { if !apierrors.IsNotFound(err) { @@ -110,7 +110,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } return nil } - if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, gwInfo); err != nil { + if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, &mcsv1alpha1.GatewayInfo{GatewayIP: gwIP}); err != nil { return err } return nil @@ -121,52 +121,18 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct if !apierrors.IsNotFound(err) { return ctrl.Result{}, err } - gwInfo, err := r.getLastCreatedGateway() - if err != nil { - klog.ErrorS(err, "Failed to get Gateways") - return ctrl.Result{}, err - } - if gwInfo == nil { - // When the last Gateway is deleted, we will remove the ClusterInfo kind of ResourceExport - if err := commonArea.Delete(ctx, resExport, &client.DeleteOptions{}); err != nil { - return ctrl.Result{}, client.IgnoreNotFound(err) - } - return ctrl.Result{}, nil - } - // When there are still Gateways exist, we should create or update existing ResourceExport - // with the latest Gateway in remaining Gateways. - if err := createOrUpdate(gwInfo.GatewayIP, gwInfo); err != nil { - return ctrl.Result{}, err + if err := commonArea.Delete(ctx, resExport, &client.DeleteOptions{}); err != nil { + return ctrl.Result{}, client.IgnoreNotFound(err) } return ctrl.Result{}, nil } - if err := createOrUpdate(gw.GatewayIP, nil); err != nil { + + if err := createOrUpdate(gw.GatewayIP); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } -func (r *GatewayReconciler) getLastCreatedGateway() (*mcsv1alpha1.GatewayInfo, error) { - gws := &mcsv1alpha1.GatewayList{} - if err := r.Client.List(ctx, gws, &client.ListOptions{}); err != nil { - return nil, err - } - if len(gws.Items) == 0 { - return nil, nil - } - - // Comparing Gateway's CreationTimestamp to get the last created Gateway. - lastCreatedGW := gws.Items[0] - for _, gw := range gws.Items { - if lastCreatedGW.CreationTimestamp.Before(&gw.CreationTimestamp) { - lastCreatedGW = gw - } - } - - // Make sure we only return the last created Gateway for now. - return &mcsv1alpha1.GatewayInfo{GatewayIP: lastCreatedGW.GatewayIP}, nil -} - func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request, commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gwInfo *mcsv1alpha1.GatewayInfo) error { resExportSpec := mcsv1alpha1.ResourceExportSpec{ @@ -175,13 +141,6 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R Name: r.localClusterID, Namespace: r.namespace, } - var err error - if gwInfo == nil { - gwInfo, err = r.getLastCreatedGateway() - if err != nil { - return err - } - } resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{ ClusterID: r.localClusterID, ServiceCIDR: r.serviceCIDR, diff --git a/multicluster/controllers/multicluster/gateway_controller_test.go b/multicluster/controllers/multicluster/gateway_controller_test.go index 630634b28cd..fec78b698dc 100644 --- a/multicluster/controllers/multicluster/gateway_controller_test.go +++ b/multicluster/controllers/multicluster/gateway_controller_test.go @@ -17,7 +17,6 @@ limitations under the License. package multicluster import ( - "fmt" "reflect" "testing" "time" @@ -39,7 +38,6 @@ var ( clusterID = "cluster-a" gw1CreationTime = metav1.NewTime(time.Now()) - gw2CreationTime = metav1.NewTime(time.Now().Add(10 * time.Minute)) gwNode1 = mcsv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ @@ -50,15 +48,7 @@ var ( GatewayIP: "10.10.10.10", InternalIP: "172.11.10.1", } - gwNode2 = mcsv1alpha1.Gateway{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node-2", - Namespace: "default", - CreationTimestamp: gw2CreationTime, - }, - GatewayIP: "10.8.8.8", - InternalIP: "172.11.10.1", - } + existingResExport = &mcsv1alpha1.ResourceExport{ ObjectMeta: metav1.ObjectMeta{ Name: "cluster-a-clusterinfo", @@ -79,26 +69,6 @@ var ( }, }, } - existingResExport2 = &mcsv1alpha1.ResourceExport{ - ObjectMeta: metav1.ObjectMeta{ - Name: "cluster-a-clusterinfo", - Namespace: leaderNamespace, - }, - Spec: mcsv1alpha1.ResourceExportSpec{ - Name: clusterID, - Namespace: "default", - Kind: common.ClusterInfoKind, - ClusterInfo: &mcsv1alpha1.ClusterInfo{ - ServiceCIDR: serviceCIDR, - ClusterID: clusterID, - GatewayInfos: []mcsv1alpha1.GatewayInfo{ - { - GatewayIP: "101.101.101.101", - }, - }, - }, - }, - } ) func TestGatewayReconciler(t *testing.T) { @@ -130,38 +100,6 @@ func TestGatewayReconciler(t *testing.T) { }, }, }, - { - name: "update a ResourceExport successfully by creating a new Gateway", - namespacedName: types.NamespacedName{ - Namespace: "default", - Name: "node-2", - }, - gateway: []mcsv1alpha1.Gateway{ - gwNode1, gwNode2, - }, - resExport: existingResExport, - expectedInfo: []mcsv1alpha1.GatewayInfo{ - { - GatewayIP: "10.8.8.8", - }, - }, - }, - { - name: "update a ResourceExport successfully by deleting a Gateway", - namespacedName: types.NamespacedName{ - Namespace: "default", - Name: "node-2", - }, - gateway: []mcsv1alpha1.Gateway{ - gwNode1, - }, - resExport: existingResExport2, - expectedInfo: []mcsv1alpha1.GatewayInfo{ - { - GatewayIP: "10.10.10.10", - }, - }, - }, { name: "update a ResourceExport successfully by updating an existing Gateway", namespacedName: types.NamespacedName{ @@ -210,27 +148,20 @@ func TestGatewayReconciler(t *testing.T) { if _, err := r.Reconcile(ctx, req); err != nil { t.Errorf("Gateway Reconciler should handle ResourceExports events successfully but got error = %v", err) } else { - gws := &mcsv1alpha1.GatewayList{} - _ = fakeClient.List(ctx, gws, &client.ListOptions{}) - fmt.Printf("output list: %v", gws) ciExport := mcsv1alpha1.ResourceExport{} ciExportName := types.NamespacedName{ Namespace: leaderNamespace, Name: newClusterInfoResourceExportName(localClusterID), } err := fakeRemoteClient.Get(ctx, ciExportName, &ciExport) - if err == nil { - if !reflect.DeepEqual(ciExport.Spec.ClusterInfo.GatewayInfos, tt.expectedInfo) { - t.Errorf("Expected GatewayInfos are %v but got %v", tt.expectedInfo, ciExport.Spec.ClusterInfo.GatewayInfos) - } - } else { - if tt.isDelete { - if !apierrors.IsNotFound(err) { - t.Errorf("Gateway Reconciler expects not found error but got error = %v", err) - } - } else { - t.Errorf("Expected a ClusterInfo kind of ResourceExport but got error = %v", err) - } + if tt.isDelete && !apierrors.IsNotFound(err) { + t.Errorf("Gateway Reconciler expects not found error but got error = %v", err) + } + if err == nil && !reflect.DeepEqual(ciExport.Spec.ClusterInfo.GatewayInfos, tt.expectedInfo) { + t.Errorf("Expected GatewayInfos are %v but got %v", tt.expectedInfo, ciExport.Spec.ClusterInfo.GatewayInfos) + } + if !tt.isDelete && apierrors.IsNotFound(err) { + t.Errorf("Expected a ClusterInfo kind of ResourceExport but got error = %v", err) } } }) diff --git a/multicluster/controllers/multicluster/node_controller.go b/multicluster/controllers/multicluster/node_controller.go index dc03f8ecc77..9682d75115f 100644 --- a/multicluster/controllers/multicluster/node_controller.go +++ b/multicluster/controllers/multicluster/node_controller.go @@ -37,18 +37,22 @@ import ( type ( // NodeReconciler is for member cluster only. - // It will create a Gateway object if a Node has an annotation `multicluster.antrea.io/gateway:true` - // and update corresponding Gateway if any subnets changes. NodeReconciler struct { client.Client - Scheme *runtime.Scheme - namespace string - precedence mcsv1alpha1.Precedence + Scheme *runtime.Scheme + namespace string + precedence mcsv1alpha1.Precedence + gatewayCandidates map[string]bool + activeGateway string + initialized bool } ) -// NewNodeReconciler creates a NodeReconciler to watch Node object changes and create a -// corresponding Gateway if the Node has the annotation `multicluster.antrea.io/gateway:true`. +// NewNodeReconciler creates a NodeReconciler to watch Node resource changes. +// It's responsible for creating a Gateway for the first ready Node with +// annotation `multicluster.antrea.io/gateway:true` if there is no existing Gateway. +// It guarantees there is always only one Gateway CR when there are multiple Nodes +// with annotation `multicluster.antrea.io/gateway:true`. func NewNodeReconciler( client client.Client, scheme *runtime.Scheme, @@ -58,10 +62,11 @@ func NewNodeReconciler( precedence = mcsv1alpha1.PrecedenceInternal } reconciler := &NodeReconciler{ - Client: client, - Scheme: scheme, - namespace: namespace, - precedence: precedence, + Client: client, + Scheme: scheme, + namespace: namespace, + precedence: precedence, + gatewayCandidates: make(map[string]bool), } return reconciler } @@ -73,65 +78,215 @@ func NewNodeReconciler( func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { klog.V(2).InfoS("Reconciling Node", "node", req.Name) + if !r.initialized { + if err := r.initialize(); err != nil { + return ctrl.Result{}, err + } + r.initialized = true + } gw := &mcsv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ Name: req.Name, Namespace: r.namespace, }, } - // When the Node is annotated with 'multicluster.antrea.io/gateway=true' as a Gateway: - // - Delete the Gateway if the Node is deleted - // - Update the Gateway if Node's InternalIP or GatewayIP is updated - // - Create a new Gateway if there is no existing Gateway + node := &corev1.Node{} if err := r.Client.Get(ctx, req.NamespacedName, node); err != nil { if apierrors.IsNotFound(err) { - err := r.Client.Delete(ctx, gw, &client.DeleteOptions{}) - return ctrl.Result{}, client.IgnoreNotFound(err) + if r.activeGateway == req.Name { + if err := r.recreateActiveGateway(ctx, gw); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } } klog.ErrorS(err, "Failed to get Node", "node", req.Name) - return ctrl.Result{}, err + return ctrl.Result{}, client.IgnoreNotFound(err) } - _, isGW := node.Annotations[common.GatewayAnnotation] - var err error - gwNamespacedName := types.NamespacedName{ - Name: node.Name, - Namespace: r.namespace, + _, hasGWAnnotation := node.Annotations[common.GatewayAnnotation] + nodeIsReady := isReadyNode(node) + noActiveGateway := r.activeGateway == "" + isActiveGateway := r.activeGateway == node.Name + toRecreate := false + if hasGWAnnotation { + r.gatewayCandidates[req.Name] = true } - // TODO: cache might be stale. Need to revisit here and other reconcilers to - // check if we can improve this with 'Owns' or other methods. + if !nodeIsReady && isActiveGateway { + toRecreate = true + } + + var err error var gwIP, internalIP string - if isGW { - if internalIP, gwIP, err = r.getGatawayNodeIP(node); err != nil { - klog.ErrorS(err, "There is no valid Gateway IP for Node, will retry later when there is any new Node update", "node", node.Name) - return ctrl.Result{}, nil + // When Node is ready without Gateway annotation + if !hasGWAnnotation && isActiveGateway { + // The annotation is removed from the Node, we need recreate a new Gateway. + toRecreate = true + } + + // When the Node is ready with the Gateway annotation + internalIP, gwIP, err = r.getGatawayNodeIP(node) + if err != nil && isActiveGateway { + klog.ErrorS(err, "There is no valid Gateway IP for Node", "node", node.Name) + toRecreate = true + } + + if toRecreate { + if err := r.recreateActiveGateway(ctx, gw); err != nil { + return ctrl.Result{}, err } + return ctrl.Result{}, nil } - if err := r.Client.Get(ctx, gwNamespacedName, gw); err != nil { - if apierrors.IsNotFound(err) && isGW { - gw.GatewayIP = gwIP - gw.InternalIP = internalIP - if err := r.Client.Create(ctx, gw, &client.CreateOptions{}); err != nil { - return ctrl.Result{}, err + + gw.InternalIP = internalIP + gw.GatewayIP = gwIP + if isActiveGateway { + if err := r.updateActiveGateway(ctx, gw); err != nil { + return ctrl.Result{}, err + } + } + if noActiveGateway { + if err := r.createGateway(gw); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +// initialize initializes 'activeGateway' and 'gatewayCandidates' and removes +// stale Gateway during controller startup. +func (r *NodeReconciler) initialize() error { + ctx := context.Background() + nodeList := &corev1.NodeList{} + if err := r.Client.List(ctx, nodeList, &client.ListOptions{}); err != nil { + return err + } + + initCandidates := func() { + for _, n := range nodeList.Items { + if _, isGW := n.Annotations[common.GatewayAnnotation]; isGW { + r.gatewayCandidates[n.Name] = true } - return ctrl.Result{}, nil } - return ctrl.Result{}, client.IgnoreNotFound(err) } - if !isGW { - err := r.Client.Delete(ctx, gw, &client.DeleteOptions{}) - return ctrl.Result{}, client.IgnoreNotFound(err) + gwList := &mcsv1alpha1.GatewayList{} + if err := r.Client.List(ctx, gwList, &client.ListOptions{}); err != nil { + return err } + // Gateway webhook guarantees that there is at most one Gateway in the member cluster. + size := len(gwList.Items) + if size > 0 { + existingGWName := gwList.Items[0].Name + // Check existing Gateway Node's health + var nodeNotFound bool + node := &corev1.Node{} + if err := r.Client.Get(ctx, types.NamespacedName{Name: existingGWName}, node); err != nil { + if !apierrors.IsNotFound(err) { + return err + } + nodeNotFound = true + } + _, _, err := r.getGatawayNodeIP(node) + // Delete the existing Gateway if it's no longer valid. + if !isReadyGateway(node) || err != nil || nodeNotFound { + if err != nil { + klog.ErrorS(err, "There is no valid Gateway IP for Node", "node", node.Name) + } + staleGateway := &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: r.namespace, + Name: existingGWName}, + } + err := r.Client.Delete(ctx, staleGateway, &client.DeleteOptions{}) + if err == nil || apierrors.IsNotFound(err) { + initCandidates() + return nil + } + return err + } + r.activeGateway = existingGWName + } + initCandidates() + return nil +} - gw.GatewayIP = gwIP - gw.InternalIP = internalIP - if err := r.Client.Update(ctx, gw, &client.UpdateOptions{}); err != nil { - return ctrl.Result{}, err +func (r *NodeReconciler) updateActiveGateway(ctx context.Context, newGateway *mcsv1alpha1.Gateway) error { + existingGW := &mcsv1alpha1.Gateway{} + // TODO: cache might be stale. Need to revisit here and other reconcilers to + // check if we can improve this with 'Owns' or other methods. + if err := r.Client.Get(ctx, types.NamespacedName{Name: newGateway.Name, Namespace: r.namespace}, existingGW); err != nil { + return client.IgnoreNotFound(err) } - return ctrl.Result{}, nil + existingGW.GatewayIP = newGateway.GatewayIP + existingGW.InternalIP = newGateway.InternalIP + if err := r.Client.Update(ctx, existingGW, &client.UpdateOptions{}); err != nil { + return err + } + return nil +} + +// newActiveGateway will delete existing Gateway CR and create a new Gateway +// from the pool of Gateway candidates. +func (r *NodeReconciler) recreateActiveGateway(ctx context.Context, gateway *mcsv1alpha1.Gateway) error { + err := r.Client.Delete(ctx, gateway, &client.DeleteOptions{}) + if err == nil || apierrors.IsNotFound(err) { + r.activeGateway = "" + delete(r.gatewayCandidates, gateway.Name) + // Check remaining Gateway's candidates and create a new Gateway. + return r.newActiveGatewayFromCandidates() + } + return err +} + +// newActiveGatewayFromCandidates picks a valid Node from Gateway candidates and +// creates a Gateway. It returns no error if no good Gateway candidate. +func (r *NodeReconciler) newActiveGatewayFromCandidates() error { + var activeGateway *mcsv1alpha1.Gateway + var internalIP, gwIP string + var err error + gatewayNode := &corev1.Node{} + if len(r.gatewayCandidates) == 0 { + return nil + } + for name := range r.gatewayCandidates { + if err = r.Client.Get(ctx, types.NamespacedName{Name: name}, gatewayNode); err == nil { + if !isReadyNode(gatewayNode) { + continue + } + if internalIP, gwIP, err = r.getGatawayNodeIP(gatewayNode); err != nil { + klog.V(2).ErrorS(err, "Node has no valid IP", "node", gatewayNode.Name) + continue + } + activeGateway = &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: gatewayNode.Name, + Namespace: r.namespace, + }, + GatewayIP: gwIP, + InternalIP: internalIP, + } + klog.InfoS("Found available Gateway candidate, creating Gateway", "node", gatewayNode.Name) + return r.createGateway(activeGateway) + } + if apierrors.IsNotFound(err) { + delete(r.gatewayCandidates, name) + continue + } + return err + } + return nil +} + +func (r *NodeReconciler) createGateway(gateway *mcsv1alpha1.Gateway) error { + if err := r.Client.Create(ctx, gateway, &client.CreateOptions{}); err != nil { + return err + } + r.activeGateway = gateway.Name + r.gatewayCandidates[gateway.Name] = true + return nil } func (r *NodeReconciler) getGatawayNodeIP(node *corev1.Node) (string, string, error) { @@ -172,3 +327,22 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { }). Complete(r) } + +func isReadyNode(node *corev1.Node) bool { + var nodeIsReady bool + for _, s := range node.Status.Conditions { + if s.Type == corev1.NodeReady && s.Status == corev1.ConditionTrue { + nodeIsReady = true + break + } + } + return nodeIsReady +} + +func isReadyGateway(node *corev1.Node) bool { + _, isGW := node.Annotations[common.GatewayAnnotation] + if isGW && isReadyNode(node) { + return true + } + return false +} diff --git a/multicluster/controllers/multicluster/node_controller_test.go b/multicluster/controllers/multicluster/node_controller_test.go index b3ed403f0ed..1dfed0310c4 100644 --- a/multicluster/controllers/multicluster/node_controller_test.go +++ b/multicluster/controllers/multicluster/node_controller_test.go @@ -32,8 +32,17 @@ import ( "antrea.io/antrea/multicluster/controllers/multicluster/common" ) -func TestNodeReconciler(t *testing.T) { - node1 := &corev1.Node{ +var ( + node1 *corev1.Node + node2 *corev1.Node + node3 *corev1.Node + node4 *corev1.Node + updatedGateway2 *mcsv1alpha1.Gateway + gateway3 *mcsv1alpha1.Gateway +) + +func initializeCommonData() { + node1 = &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node-1", Annotations: map[string]string{ @@ -51,10 +60,59 @@ func TestNodeReconciler(t *testing.T) { Address: "172.11.10.1", }, }, + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + + node2 = node1.DeepCopy() + node2.Name = "node-2" + node2.Status.Addresses = []corev1.NodeAddress{ + { + Type: corev1.NodeExternalIP, + Address: "10.10.10.12", + }, + { + Type: corev1.NodeInternalIP, + Address: "172.11.10.2", + }, + } + + node3 = node1.DeepCopy() + node3.Name = "node-3" + node3.Status.Conditions = []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + } + + node4 = node1.DeepCopy() + node4.Name = "node-4" + node4.Annotations = map[string]string{ + common.GatewayAnnotation: "true", + common.GatewayIPAnnotation: "invalid-gatewayip", + } + + updatedGateway2 = &mcsv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-2", + Namespace: "default", }, + GatewayIP: "10.10.10.12", + InternalIP: "172.11.10.2", } - node1NoValidUpdate := *node1 - node1NoValidUpdate.Labels = map[string]string{"hostname.k8s.io": "node-1"} + + gateway3 = gwNode1.DeepCopy() + gateway3.Name = "node-3" +} + +func TestNodeReconciler(t *testing.T) { + initializeCommonData() node1NoAnnotation := *node1 node1NoAnnotation.Annotations = map[string]string{} node1WithIPAnnotation := *node1 @@ -62,26 +120,30 @@ func TestNodeReconciler(t *testing.T) { common.GatewayAnnotation: "true", common.GatewayIPAnnotation: "11.11.10.10", } - + gateway4 := gwNode1.DeepCopy() + gateway4.Name = "node-4" tests := []struct { - name string - nodes []*corev1.Node - req reconcile.Request - existingGW *mcsv1alpha1.Gateway - expectedGW *mcsv1alpha1.Gateway - isDelete bool - expectedErr string + name string + nodes []*corev1.Node + req reconcile.Request + existingGW *mcsv1alpha1.Gateway + expectedGW *mcsv1alpha1.Gateway + activeGateway string + candidates map[string]bool + isDelete bool + expectedErr string }{ { name: "create a Gateway successfully", nodes: []*corev1.Node{node1}, - req: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "", Name: node1.Name}}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, expectedGW: &gwNode1, + candidates: make(map[string]bool), }, { name: "update a Gateway successfully by changing GatewayIP", nodes: []*corev1.Node{&node1WithIPAnnotation}, - req: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "", Name: node1.Name}}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, existingGW: &gwNode1, expectedGW: &mcsv1alpha1.Gateway{ ObjectMeta: metav1.ObjectMeta{ @@ -91,20 +153,65 @@ func TestNodeReconciler(t *testing.T) { GatewayIP: "11.11.10.10", InternalIP: "172.11.10.1", }, + activeGateway: "node-1", + candidates: make(map[string]bool), }, { - name: "remove a Gateway Node to delete a Gateway successfully", - nodes: []*corev1.Node{}, - req: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "", Name: node1.Name}}, - existingGW: &gwNode1, - isDelete: true, + name: "remove a Gateway Node to delete a Gateway successfully", + nodes: []*corev1.Node{}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, + existingGW: &gwNode1, + activeGateway: "node-1", + isDelete: true, }, { - name: "remove a Gateway Node's annotation to delete a Gateway successfully", - nodes: []*corev1.Node{&node1NoAnnotation}, - req: reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "", Name: node1.Name}}, - existingGW: &gwNode1, - isDelete: true, + name: "remove a Gateway Node's annotation to delete a Gateway successfully", + nodes: []*corev1.Node{&node1NoAnnotation}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, + existingGW: &gwNode1, + activeGateway: "node-1", + isDelete: true, + }, + { + name: "remove a Gateway Node to create a new Gateway from candidates successfully", + nodes: []*corev1.Node{node2, node4}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node1.Name}}, + existingGW: &gwNode1, + expectedGW: updatedGateway2, + activeGateway: "node-1", + candidates: map[string]bool{ + "node-0": true, + "node-2": true, + "node-4": true, + }, + }, + { + name: "create a new Gateway successfully when active Gateway Node is not ready", + nodes: []*corev1.Node{node2, node3}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node3.Name}}, + existingGW: gateway3, + expectedGW: updatedGateway2, + activeGateway: "node-3", + candidates: map[string]bool{ + "node-0": true, + "node-2": true, + "node-3": true, + "node-4": true, + }, + }, + { + name: "create a new Gateway successfully when active Gateway Node has no valid IP", + nodes: []*corev1.Node{node2, node4}, + req: reconcile.Request{NamespacedName: types.NamespacedName{Name: node4.Name}}, + existingGW: gateway4, + expectedGW: updatedGateway2, + activeGateway: "node-4", + candidates: map[string]bool{ + "node-0": true, + "node-2": true, + "node-3": true, + "node-4": true, + }, }, } for _, tt := range tests { @@ -118,6 +225,8 @@ func TestNodeReconciler(t *testing.T) { } fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(obj...).Build() r := NewNodeReconciler(fakeClient, scheme, "default", mcsv1alpha1.PrecedencePublic) + r.activeGateway = tt.activeGateway + r.gatewayCandidates = tt.candidates if _, err := r.Reconcile(ctx, tt.req); err != nil { if tt.expectedErr != "" { assert.Contains(t, err.Error(), tt.expectedErr) @@ -126,18 +235,89 @@ func TestNodeReconciler(t *testing.T) { } } else { newGW := &mcsv1alpha1.Gateway{} - gwNamespcedName := types.NamespacedName{Name: "node-1", Namespace: "default"} + gwNamespcedName := types.NamespacedName{Name: tt.req.Name, Namespace: "default"} + if tt.expectedGW != nil { + gwNamespcedName = types.NamespacedName{Name: tt.expectedGW.Name, Namespace: "default"} + } err := fakeClient.Get(ctx, gwNamespcedName, newGW) - if err != nil { - if tt.isDelete { - if !apierrors.IsNotFound(err) { - t.Errorf("Expected to get not found error but got err: %v", err) - } - } else { + if tt.isDelete { + if err == nil || err != nil && !apierrors.IsNotFound(err) { + t.Errorf("Expected to get not found error but got err: %v", err) + } + } else { + if err != nil { t.Errorf("Expected to get Gateway but got err: %v", err) + } else { + if tt.expectedGW.GatewayIP != newGW.GatewayIP || tt.expectedGW.InternalIP != newGW.InternalIP { + t.Errorf("Expected Gateway %v but got: %v", tt.expectedGW, newGW) + } + } + } + } + }) + } +} + +func TestInitialize(t *testing.T) { + initializeCommonData() + node5 := node1.DeepCopy() + node5.Name = "node-5" + node5.Annotations = map[string]string{} + tests := []struct { + name string + nodes []*corev1.Node + req reconcile.Request + existingGW *mcsv1alpha1.Gateway + expectedActiveGateway string + isDelete bool + candidatesSize int + }{ + { + name: "initialize and set active Gateway successfully", + nodes: []*corev1.Node{node1, node2, node5}, + existingGW: &gwNode1, + expectedActiveGateway: "node-1", + candidatesSize: 2, + }, + { + name: "initialize successfully without Gateway", + nodes: []*corev1.Node{node3, node4, node5}, + expectedActiveGateway: "", + candidatesSize: 2, + }, + { + name: "initialize and delete not ready Gateway successfully", + nodes: []*corev1.Node{node1, node3, node5}, + existingGW: gateway3, + isDelete: true, + expectedActiveGateway: "", + candidatesSize: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var obj []client.Object + for _, n := range tt.nodes { + obj = append(obj, n) + } + if tt.existingGW != nil { + obj = append(obj, tt.existingGW) + } + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(obj...).Build() + r := NewNodeReconciler(fakeClient, scheme, "default", mcsv1alpha1.PrecedencePublic) + if err := r.initialize(); err != nil { + t.Errorf("Expected initialize() successfully but got err: %v", err) + } else { + assert.Equal(t, tt.expectedActiveGateway, r.activeGateway) + assert.Equal(t, tt.candidatesSize, len(r.gatewayCandidates)) + if tt.isDelete { + deletedGW := &mcsv1alpha1.Gateway{} + gwNamespcedName := types.NamespacedName{Name: tt.existingGW.Name, Namespace: "default"} + err := fakeClient.Get(ctx, gwNamespcedName, deletedGW) + if !apierrors.IsNotFound(err) { + t.Errorf("Expected to get not found error but got err: %v", err) } - } else if tt.expectedGW.GatewayIP != newGW.GatewayIP || tt.expectedGW.InternalIP != newGW.InternalIP { - t.Errorf("Expected Gateway %v but got: %v", tt.expectedGW, newGW) } } })