Skip to content

Commit

Permalink
Refine Node controller to support Gateway HA
Browse files Browse the repository at this point in the history
In order to support Gateway active-standby mode HA, the Node
controller has been refined to handle Node readiness change.
There will be at most one Gateway in a given Namespace, and the
Gateway controller is updated correspondingly.

1. Check Node readiness and create a Gateway CR if Node is ready
and there is no existing Gateway.
2. Initilize active Gateway and Gateway candidate Nodes with annotation
'multicluster.antrea.io/gateway'.
3. Add Gateway webhook to allow at most one Gateway in a given
   Namespace.

Signed-off-by: Lan Luo <[email protected]>
  • Loading branch information
luolanzone committed Sep 1, 2022
1 parent 3714619 commit 49ceb32
Show file tree
Hide file tree
Showing 12 changed files with 686 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,6 @@ webhooks:
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
creationTimestamp: null
labels:
app: antrea
name: antrea-multicluster-antrea-mc-validating-webhook-configuration
Expand Down
21 changes: 21 additions & 0 deletions multicluster/build/yamls/antrea-multicluster-member.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1100,3 +1100,24 @@ webhooks:
resources:
- clustersets
sideEffects: None
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: antrea-mc-webhook-service
namespace: kube-system
path: /validate-multicluster-crd-antrea-io-v1alpha1-gateway
failurePolicy: Fail
name: vgateway.kb.io
rules:
- apiGroups:
- multicluster.crd.antrea.io
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- gateways
sideEffects: None
68 changes: 68 additions & 0 deletions multicluster/cmd/multicluster-controller/gateway_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2022 Antrea Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"net/http"

admissionv1 "k8s.io/api/admission/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
)

//+kubebuilder:webhook:path=/validate-multicluster-crd-antrea-io-v1alpha1-gateway,mutating=false,failurePolicy=fail,sideEffects=None,groups=multicluster.crd.antrea.io,resources=gateways,verbs=create;update,versions=v1alpha1,name=vgateway.kb.io,admissionReviewVersions={v1,v1beta1}

// Gateway validator
type gatewayValidator struct {
Client client.Client
decoder *admission.Decoder
namespace string
}

// Handle handles admission requests.
func (v *gatewayValidator) Handle(ctx context.Context, req admission.Request) admission.Response {
gateway := &mcv1alpha1.Gateway{}
err := v.decoder.Decode(req, gateway)
if err != nil {
klog.ErrorS(err, "Error while decoding Gateway", "Gateway", req.Namespace+"/"+req.Name)
return admission.Errored(http.StatusBadRequest, err)
}

// Check if there is any existing Gateway.
gatewayList := &mcv1alpha1.GatewayList{}
if err := v.Client.List(context.TODO(), gatewayList, client.InNamespace(v.namespace)); err != nil {
klog.ErrorS(err, "Error reading Gateway", "Namespace", v.namespace)
return admission.Errored(http.StatusPreconditionFailed, err)
}

if req.Operation == admissionv1.Create && len(gatewayList.Items) > 0 {
err := fmt.Errorf("multiple Gateways in a Namespace are not allowed")
klog.ErrorS(err, "failed to create Gateway", "Gateway", klog.KObj(gateway), "Namespace", v.namespace)
return admission.Errored(http.StatusPreconditionFailed, err)
}
return admission.Allowed("")
}

func (v *gatewayValidator) InjectDecoder(d *admission.Decoder) error {
v.decoder = d
return nil
}
134 changes: 134 additions & 0 deletions multicluster/cmd/multicluster-controller/gateway_webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2022 Antrea Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
k8smcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

mcsv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1"
)

var gatewayWebhookUnderTest *gatewayValidator

func TestWebhookGatewayEvents(t *testing.T) {
newGateway := &mcsv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "node-1",
},
GatewayIP: "1.2.3.4",
InternalIP: "172.168.3.4",
}
existingGateway := &mcsv1alpha1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "node-2",
},
}

newGW, _ := json.Marshal(newGateway)

newReq := admission.Request{
AdmissionRequest: v1.AdmissionRequest{
UID: "07e52e8d-4513-11e9-a716-42010a800270",
Kind: metav1.GroupVersionKind{
Group: "multicluster.crd.antrea.io",
Version: "v1alpha1",
Kind: "Gateway",
},
Resource: metav1.GroupVersionResource{
Group: "multicluster.crd.antrea.io",
Version: "v1alpha1",
Resource: "Gateways",
},
Name: "node-1",
Namespace: "default",
Operation: v1.Create,
Object: runtime.RawExtension{
Raw: newGW,
},
},
}

newReqCopy := newReq.DeepCopy()
invalidReq := admission.Request{
AdmissionRequest: *newReqCopy,
}
invalidReq.Object = runtime.RawExtension{Raw: []byte("a")}

tests := []struct {
name string
req admission.Request
existingGateway *mcsv1alpha1.Gateway
newGateway *mcsv1alpha1.Gateway
isAllowed bool
}{
{
name: "create a new Gateway successfully",
req: newReq,
isAllowed: true,
},
{
name: "failed to create a Gateway when there is an existing one",
existingGateway: existingGateway,
req: newReq,
isAllowed: false,
},
{
name: "failed to decode request",
req: invalidReq,
isAllowed: false,
},
}

newScheme := runtime.NewScheme()
utilruntime.Must(clientgoscheme.AddToScheme(newScheme))
utilruntime.Must(k8smcsv1alpha1.AddToScheme(newScheme))
utilruntime.Must(mcsv1alpha1.AddToScheme(newScheme))
decoder, err := admission.NewDecoder(newScheme)
if err != nil {
klog.ErrorS(err, "Error constructing a decoder")
}
for _, tt := range tests {
fakeClient := fake.NewClientBuilder().WithScheme(newScheme).WithObjects().Build()
if tt.existingGateway != nil {
fakeClient = fake.NewClientBuilder().WithScheme(newScheme).WithObjects(tt.existingGateway).Build()
}
gatewayWebhookUnderTest = &gatewayValidator{
Client: fakeClient,
namespace: "default"}
gatewayWebhookUnderTest.InjectDecoder(decoder)

t.Run(tt.name, func(t *testing.T) {
response := gatewayWebhookUnderTest.Handle(context.Background(), tt.req)
assert.Equal(t, tt.isAllowed, response.Allowed)
})
}
}
7 changes: 7 additions & 0 deletions multicluster/cmd/multicluster-controller/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/spf13/cobra"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"

multiclustercontrollers "antrea.io/antrea/multicluster/controllers/multicluster"
"antrea.io/antrea/pkg/log"
Expand Down Expand Up @@ -54,6 +55,12 @@ func runMember(o *Options) error {
return err
}

hookServer := mgr.GetWebhookServer()
hookServer.Register("/validate-multicluster-crd-antrea-io-v1alpha1-gateway",
&webhook.Admission{Handler: &gatewayValidator{
Client: mgr.GetClient(),
namespace: env.GetPodNamespace()}})

clusterSetReconciler := multiclustercontrollers.NewMemberClusterSetReconciler(mgr.GetClient(),
mgr.GetScheme(),
env.GetPodNamespace(),
Expand Down
1 change: 1 addition & 0 deletions multicluster/config/overlays/leader-ns/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,4 @@ resources:

patchesStrategicMerge:
- manager_command_patch.yaml
- webhook_patch.yaml
9 changes: 9 additions & 0 deletions multicluster/config/overlays/leader-ns/webhook_patch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
name: validating-webhook-configuration
webhooks:
- admissionReviewVersions:
name: vgateway.kb.io
$patch: delete
21 changes: 21 additions & 0 deletions multicluster/config/webhook/manifests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,27 @@ webhooks:
resources:
- clustersets
sideEffects: None
- admissionReviewVersions:
- v1
- v1beta1
clientConfig:
service:
name: webhook-service
namespace: system
path: /validate-multicluster-crd-antrea-io-v1alpha1-gateway
failurePolicy: Fail
name: vgateway.kb.io
rules:
- apiGroups:
- multicluster.crd.antrea.io
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- gateways
sideEffects: None
- admissionReviewVersions:
- v1
- v1beta1
Expand Down
53 changes: 6 additions & 47 deletions multicluster/controllers/multicluster/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
},
}

createOrUpdate := func(gwIP string, gwInfo *mcsv1alpha1.GatewayInfo) error {
createOrUpdate := func(gwIP string) error {
existingResExport := &mcsv1alpha1.ResourceExport{}
if err := commonArea.Get(ctx, resExportNamespacedName, existingResExport); err != nil {
if !apierrors.IsNotFound(err) {
Expand All @@ -110,7 +110,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
return nil
}
if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, gwInfo); err != nil {
if err = r.updateResourceExport(ctx, req, commonArea, existingResExport, &mcsv1alpha1.GatewayInfo{GatewayIP: gwIP}); err != nil {
return err
}
return nil
Expand All @@ -121,52 +121,18 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
if !apierrors.IsNotFound(err) {
return ctrl.Result{}, err
}
gwInfo, err := r.getLastCreatedGateway()
if err != nil {
klog.ErrorS(err, "Failed to get Gateways")
return ctrl.Result{}, err
}
if gwInfo == nil {
// When the last Gateway is deleted, we will remove the ClusterInfo kind of ResourceExport
if err := commonArea.Delete(ctx, resExport, &client.DeleteOptions{}); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
return ctrl.Result{}, nil
}
// When there are still Gateways exist, we should create or update existing ResourceExport
// with the latest Gateway in remaining Gateways.
if err := createOrUpdate(gwInfo.GatewayIP, gwInfo); err != nil {
return ctrl.Result{}, err
if err := commonArea.Delete(ctx, resExport, &client.DeleteOptions{}); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
return ctrl.Result{}, nil
}
if err := createOrUpdate(gw.GatewayIP, nil); err != nil {

if err := createOrUpdate(gw.GatewayIP); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

func (r *GatewayReconciler) getLastCreatedGateway() (*mcsv1alpha1.GatewayInfo, error) {
gws := &mcsv1alpha1.GatewayList{}
if err := r.Client.List(ctx, gws, &client.ListOptions{}); err != nil {
return nil, err
}
if len(gws.Items) == 0 {
return nil, nil
}

// Comparing Gateway's CreationTimestamp to get the last created Gateway.
lastCreatedGW := gws.Items[0]
for _, gw := range gws.Items {
if lastCreatedGW.CreationTimestamp.Before(&gw.CreationTimestamp) {
lastCreatedGW = gw
}
}

// Make sure we only return the last created Gateway for now.
return &mcsv1alpha1.GatewayInfo{GatewayIP: lastCreatedGW.GatewayIP}, nil
}

func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.Request,
commonArea commonarea.RemoteCommonArea, existingResExport *mcsv1alpha1.ResourceExport, gwInfo *mcsv1alpha1.GatewayInfo) error {
resExportSpec := mcsv1alpha1.ResourceExportSpec{
Expand All @@ -175,13 +141,6 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R
Name: r.localClusterID,
Namespace: r.namespace,
}
var err error
if gwInfo == nil {
gwInfo, err = r.getLastCreatedGateway()
if err != nil {
return err
}
}
resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: r.serviceCIDR,
Expand Down
Loading

0 comments on commit 49ceb32

Please sign in to comment.