Skip to content

Commit

Permalink
Multicluster Pod to Pod connectivity
Browse files Browse the repository at this point in the history
Signed-off-by: hujiajing <[email protected]>
  • Loading branch information
hjiajing committed Sep 13, 2022
1 parent 2ce2ef0 commit 0c694e9
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 4 deletions.
2 changes: 2 additions & 0 deletions multicluster/apis/multicluster/v1alpha1/gateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type ClusterInfo struct {
ServiceCIDR string `json:"serviceCIDR,omitempty"`
// GatewayInfos has information of Gateways
GatewayInfos []GatewayInfo `json:"gatewayInfos,omitempty"`
// PodCIDRs is the IP ranges used by each Node.
PodCIDRs []string `json:"podCIDRs,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
10 changes: 10 additions & 0 deletions multicluster/build/yamls/antrea-multicluster-leader-global.yml
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ spec:
type: string
type: object
type: array
podCIDRs:
description: PodCIDRs is the IP ranges used by each Node.
items:
type: string
type: array
serviceCIDR:
description: ServiceCIDR is the IP ranges used by Service ClusterIP.
type: string
Expand Down Expand Up @@ -3268,6 +3273,11 @@ spec:
type: string
type: object
type: array
podCIDRs:
description: PodCIDRs is the IP ranges used by each Node.
items:
type: string
type: array
serviceCIDR:
description: ServiceCIDR is the IP ranges used by Service ClusterIP.
type: string
Expand Down
5 changes: 5 additions & 0 deletions multicluster/build/yamls/antrea-multicluster-member.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ spec:
type: string
type: object
type: array
podCIDRs:
description: PodCIDRs is the IP ranges used by each Node.
items:
type: string
type: array
serviceCIDR:
description: ServiceCIDR is the IP ranges used by Service ClusterIP.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ spec:
type: string
type: object
type: array
podCIDRs:
description: PodCIDRs is the IP ranges used by each Node.
items:
type: string
type: array
serviceCIDR:
description: ServiceCIDR is the IP ranges used by Service ClusterIP.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ spec:
type: string
type: object
type: array
podCIDRs:
description: PodCIDRs is the IP ranges used by each Node.
items:
type: string
type: array
serviceCIDR:
description: ServiceCIDR is the IP ranges used by Service ClusterIP.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ spec:
type: string
type: object
type: array
podCIDRs:
description: PodCIDRs is the IP ranges used by each Node.
items:
type: string
type: array
serviceCIDR:
description: ServiceCIDR is the IP ranges used by Service ClusterIP.
type: string
Expand Down
1 change: 1 addition & 0 deletions multicluster/controllers/multicluster/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
AntreaMCACNPAnnotation = "multicluster.antrea.io/imported-acnp"
GatewayAnnotation = "multicluster.antrea.io/gateway"
GatewayIPAnnotation = "multicluster.antrea.io/gateway-ip"
LocalNodesAnnotation = "multicluster.antrea.io/local-nodes"

AntreaMCSPrefix = "antrea-mc-"
ServiceKind = "Service"
Expand Down
25 changes: 25 additions & 0 deletions multicluster/controllers/multicluster/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"reflect"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -44,6 +45,7 @@ type (
namespace string
localClusterID string
serviceCIDR string
podCIDRs []string
leaderNamespace string
}
)
Expand All @@ -61,6 +63,7 @@ func NewGatewayReconciler(
Scheme: scheme,
namespace: namespace,
serviceCIDR: serviceCIDR,
podCIDRs: make([]string, 0),
commonAreaGetter: commonAreaGetter,
}
return reconciler
Expand Down Expand Up @@ -141,9 +144,13 @@ func (r *GatewayReconciler) updateResourceExport(ctx context.Context, req ctrl.R
Name: r.localClusterID,
Namespace: r.namespace,
}
if err := r.getPodCIDRs(ctx); err != nil {
return nil
}
resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: r.serviceCIDR,
PodCIDRs: r.podCIDRs,
GatewayInfos: []mcsv1alpha1.GatewayInfo{*gwInfo},
}
if reflect.DeepEqual(existingResExport.Spec, resExportSpec) {
Expand All @@ -168,9 +175,13 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R
Name: r.localClusterID,
Namespace: r.namespace,
}
if err := r.getPodCIDRs(ctx); err != nil {
return err
}
resExportSpec.ClusterInfo = &mcsv1alpha1.ClusterInfo{
ClusterID: r.localClusterID,
ServiceCIDR: r.serviceCIDR,
PodCIDRs: r.podCIDRs,
GatewayInfos: []mcsv1alpha1.GatewayInfo{
{
GatewayIP: gatewayIP,
Expand All @@ -184,6 +195,7 @@ func (r *GatewayReconciler) createResourceExport(ctx context.Context, req ctrl.R
},
Spec: resExportSpec,
}

resExport.Finalizers = []string{common.ResourceExportFinalizer}
if err := commonArea.Create(ctx, resExport, &client.CreateOptions{}); err != nil {
return err
Expand Down Expand Up @@ -216,6 +228,19 @@ func (r *GatewayReconciler) getServiceCIDR(ctx context.Context) error {
return nil
}

func (r *GatewayReconciler) getPodCIDRs(ctx context.Context) error {
var podCIDRs []string
nodeList := &v1.NodeList{}
if err := r.Client.List(ctx, nodeList); err != nil {
return err
}
for _, node := range nodeList.Items {
podCIDRs = append(podCIDRs, node.Spec.PodCIDRs...)
}
r.podCIDRs = podCIDRs
return nil
}

func newClusterInfoResourceExportName(clusterID string) string {
return clusterID + "-clusterinfo"
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var (
Name: "node-1",
Namespace: "default",
CreationTimestamp: gw1CreationTime,
Annotations: map[string]string{
common.LocalNodesAnnotation: "node-1",
},
},
GatewayIP: "10.10.10.10",
InternalIP: "172.11.10.1",
Expand Down
37 changes: 35 additions & 2 deletions multicluster/controllers/multicluster/node_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
"strings"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -142,7 +143,17 @@ func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil

// When a Node is added to a member cluster or removed from a member cluster.
// The active Gateway need to update to get the Pod CIDRs and resync the flows.
activeGateway := &mcsv1alpha1.Gateway{}
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: r.namespace, Name: r.activeGateway}, activeGateway); err != nil {
return ctrl.Result{}, err
}
if err := r.setAnnotation(ctx, activeGateway); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, r.updateActiveGateway(ctx, activeGateway)
}

// initialize initializes 'activeGateway' and 'gatewayCandidates' and removes
Expand Down Expand Up @@ -194,11 +205,13 @@ func (r *NodeReconciler) updateActiveGateway(ctx context.Context, newGateway *mc
if err := r.Client.Get(ctx, types.NamespacedName{Name: newGateway.Name, Namespace: r.namespace}, existingGW); err != nil {
return err
}
if existingGW.GatewayIP == newGateway.GatewayIP && existingGW.InternalIP == newGateway.InternalIP {
if existingGW.GatewayIP == newGateway.GatewayIP && existingGW.InternalIP == newGateway.InternalIP &&
existingGW.Annotations[common.LocalNodesAnnotation] == newGateway.Annotations[common.LocalNodesAnnotation] {
return nil
}
existingGW.GatewayIP = newGateway.GatewayIP
existingGW.InternalIP = newGateway.InternalIP
existingGW.Annotations[common.GatewayAnnotation] = newGateway.Annotations[common.LocalNodesAnnotation]
// If the Gateway version in the client cache is stale, the update operation will fail,
// then the reconciler will retry with latest state again.
if err := r.Client.Update(ctx, existingGW, &client.UpdateOptions{}); err != nil {
Expand Down Expand Up @@ -261,6 +274,9 @@ func (r *NodeReconciler) getValidGatewayFromCandidates() (*mcsv1alpha1.Gateway,
}

func (r *NodeReconciler) createGateway(gateway *mcsv1alpha1.Gateway) error {
if err := r.setAnnotation(ctx, gateway); err != nil {
return err
}
if err := r.Client.Create(ctx, gateway, &client.CreateOptions{}); err != nil {
return err
}
Expand Down Expand Up @@ -307,6 +323,23 @@ func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}

func (r *NodeReconciler) setAnnotation(ctx context.Context, gw *mcsv1alpha1.Gateway) error {
nodeList := &corev1.NodeList{}
if err := r.Client.List(ctx, nodeList); err != nil {
return err
}

nodes := []string{}
for _, node := range nodeList.Items {
nodes = append(nodes, node.Name)
}

gw.Annotations = map[string]string{
common.LocalNodesAnnotation: strings.Join(nodes, ","),
}
return nil
}

func isReadyNode(node *corev1.Node) bool {
var nodeIsReady bool
for _, s := range node.Status.Conditions {
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/multicluster/mc_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,15 +331,16 @@ func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gatewa

if installedCIImp != nil {
oldTunnelPeerIPToRemoteGW := getPeerGatewayIP(installedCIImp.Spec)
if oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR {
if oldTunnelPeerIPToRemoteGW.Equal(tunnelPeerIPToRemoteGW) && installedCIImp.Spec.ServiceCIDR == ciImport.Spec.ServiceCIDR &&
sets.NewString(installedCIImp.Spec.PodCIDRs...).Equal(sets.NewString(ciImport.Spec.PodCIDRs...)) {
klog.V(2).InfoS("No difference between new and installed ClusterInfoImports, skip updating", "clusterinfoimport", ciImport.Name)
return nil
}
}

klog.InfoS("Adding/updating remote Gateway Node flows for Multi-cluster", "gateway", klog.KObj(activeGW),
"node", c.nodeConfig.Name, "peer", tunnelPeerIPToRemoteGW)
allCIDRs := []string{ciImport.Spec.ServiceCIDR}
allCIDRs := append([]string{ciImport.Spec.ServiceCIDR}, ciImport.Spec.PodCIDRs...)
peerConfigs, err := generatePeerConfigs(allCIDRs, tunnelPeerIPToRemoteGW)
if err != nil {
klog.ErrorS(err, "Parse error for serviceCIDR from remote cluster", "clusterinfoimport", ciImport.Name, "gateway", activeGW.Name)
Expand Down

0 comments on commit 0c694e9

Please sign in to comment.