Skip to content

Commit

Permalink
feat: support lb status sync with agent status
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed Jun 16, 2024
1 parent ed10e74 commit a904f69
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,20 @@ func (m *IPManager) Assign(ips []string) (IPVRID, error) {
if len(noConflictIPs) == 0 {
return m.Get()
}

for vrid := 0; vrid < VRIDMAXVALUE; vrid++ {
var vrid int
for ; vrid < VRIDMAXVALUE; vrid++ {
if _, ok := m.ipVRIDs[vrid]; !ok {
m.ipVRIDs[vrid] = noConflictIPs
m.ipVRIDs[vrid] = append(m.ipVRIDs[vrid], noConflictIPs...)

for _, ip := range noConflictIPs {
m.ipPools[ip] = vrid
}

return IPVRID{IPs: noConflictIPs, VRID: vrid}, nil
}
}

return IPVRID{}, errors.New("no available IPs and VRID combination")
// Get fully vrid-ips pair
return IPVRID{VRID: vrid, IPs: m.ipVRIDs[vrid]}, nil
}

// Release release ips from vrid, if vrid is not assigned, return error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package viploadbalancer
import (
"context"
"fmt"
"sort"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -51,19 +53,26 @@ var (

const (
AnnotationVipLoadBalancerVRID = "service.openyurt.io/vrid"
AnnotationVipLoadBalancerIPS = "service.openyurt.io/desired-vips"
VipLoadBalancerClass = "service.openyurt.io/viplb"
AnnotationServiceTopologyKey = "openyurt.io/topologyKeys"
AnnotationServiceTopologyValueNodePool = "openyurt.io/nodepool"
AnnotationNodePoolAddressPools = "openyurt.io/address-pools"
AnnotationServiceVIPAddress = "service.openyurt.io/vip"
AnnotationServiceVIPStatus = "service.openyurt.io/service-vip-ready"
AnnotationServiceVIPStatusReady = "true"
AnnotationServiceVIPStatusUnReady = "false"
AnnotationServiceVIPStatus = "service.openyurt.io/vip-status"
AnnotationServiceVIPStatusOnline = "online"
AnnotationServiceVIPStatusOffline = "offline"

VipLoadBalancerFinalizer = "viploadbalancer.openyurt.io/resources"
poolServiceVRIDExhaustedEventMsgFormat = "PoolService %s/%s in NodePool %s has exhausted all VRIDs"
)

const (
ServiceVIPUnknown int = iota
ServiceVIPOnline
ServiceVIPOffline
)

func Format(format string, args ...interface{}) string {
s := fmt.Sprintf(format, args...)
return fmt.Sprintf("%s: %s", names.VipLoadBalancerController, s)
Expand Down Expand Up @@ -187,6 +196,11 @@ func (r *ReconcileVipLoadBalancer) syncPoolService(ctx context.Context, poolServ
return err
}

if err := r.syncPoolServiceStatus(ctx, poolService); err != nil {
klog.Errorf(Format("Failed to sync PoolService %s/%s status: %v", poolService.Namespace, poolService.Name, err))
return err
}

return nil
}

Expand Down Expand Up @@ -233,6 +247,50 @@ func (r *ReconcileVipLoadBalancer) getCurrentPoolAddress(ctx context.Context, po
return np.Annotations[AnnotationNodePoolAddressPools], nil
}

func (r *ReconcileVipLoadBalancer) syncPoolServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) error {
klog.V(4).Infof(Format("SyncPoolServiceStatus VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))

if !r.checkIfVipServiceOnline(ctx, poolService) {
klog.Infof(Format("SyncPoolServiceStatus VipLoadBalancer %s/%s is not online in the nodepool agent", poolService.Namespace, poolService.Name))
return nil
}

desiredLbStatus, err := r.desiredLbStatus(poolService)
if err != nil {
return fmt.Errorf("failed to calculate desire lb stattus for poolservice %s/%s: %v", poolService.Namespace, poolService.Name, err)
}

poolService.Status.LoadBalancer = desiredLbStatus
if err := r.Status().Update(ctx, poolService); err != nil {
klog.Errorf(Format("Failed to update PoolService %s/%s status: %v", poolService.Namespace, poolService.Name, err))
return err
}

return nil
}

func (r *ReconcileVipLoadBalancer) desiredLbStatus(poolService *netv1alpha1.PoolService) (corev1.LoadBalancerStatus, error) {
ips := strings.Split(poolService.Annotations[AnnotationVipLoadBalancerIPS], ",")
if len(ips) == 0 {
// not ready in assign, wait to have next reconclie
klog.Infof(Format("PoolService: %s/%s has no ips, please check vrid maybe out of limit", poolService.Namespace, poolService.Name))
return corev1.LoadBalancerStatus{}, fmt.Errorf("PoolService: %s/%s has no ips, please check vrid maybe out of limit", poolService.Namespace, poolService.Name)
}

var lbIngress []corev1.LoadBalancerIngress
for _, ip := range ips {
lbIngress = append(lbIngress, corev1.LoadBalancerIngress{IP: ip})
}

sort.Slice(lbIngress, func(i, j int) bool {
return lbIngress[i].IP < lbIngress[j].IP
})

return corev1.LoadBalancerStatus{
Ingress: lbIngress,
}, nil
}

func (r *ReconcileVipLoadBalancer) getCurrentIPVRIDs(ctx context.Context, poolService *netv1alpha1.PoolService) ([]IPVRID, error) {
// Get the poolservice list
listSelector := &client.ListOptions{
Expand Down Expand Up @@ -311,8 +369,10 @@ func (r *ReconcileVipLoadBalancer) isValidIPVRID(poolService *netv1alpha1.PoolSe
}

ips := []string{}
for _, ip := range poolService.Status.LoadBalancer.Ingress {
ips = append(ips, ip.IP)
if poolService.Status.LoadBalancer.Ingress != nil {
for _, ip := range poolService.Status.LoadBalancer.Ingress {
ips = append(ips, ip.IP)
}
}

ipvrid := NewIPVRID(ips, vrid)
Expand Down Expand Up @@ -370,10 +430,8 @@ func (r *ReconcileVipLoadBalancer) assignVRID(ctx context.Context, poolService *
poolService.Annotations = make(map[string]string)
}

for _, ip := range ipvrid.IPs {
poolService.Status.LoadBalancer.Ingress = append(poolService.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip})
}

// add ips and vrid in annotions for status sync
poolService.Annotations[AnnotationVipLoadBalancerIPS] = strings.Join(ipvrid.IPs, ",")
poolService.Annotations[AnnotationVipLoadBalancerVRID] = strconv.Itoa(ipvrid.VRID)

// Update the PoolService
Expand All @@ -395,22 +453,6 @@ func (r *ReconcileVipLoadBalancer) getReferenceService(ctx context.Context, ps *
return service, nil
}

func canRemoveFinalizer(poolService *netv1alpha1.PoolService) bool {
if poolService.Annotations == nil {
return false
}

if _, ok := poolService.Annotations[AnnotationServiceVIPStatus]; !ok {
return false
}

if poolService.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady {
return false
}

return true
}

func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolService *netv1alpha1.PoolService) (reconcile.Result, error) {
klog.V(4).Infof(Format("ReconcilDelete VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
poolName := poolService.Labels[network.LabelNodePoolName]
Expand Down Expand Up @@ -443,7 +485,17 @@ func (r *ReconcileVipLoadBalancer) reconcileDelete(ctx context.Context, poolServ
}

func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context, poolService *netv1alpha1.PoolService) bool {
klog.V(4).Infof(Format("CheckIfVipServiceOffline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
klog.V(4).Infof(Format("checkIfVipServiceOffline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
return r.getVipServiceStatus(ctx, poolService) == ServiceVIPOffline
}

func (r *ReconcileVipLoadBalancer) checkIfVipServiceOnline(ctx context.Context, poolService *netv1alpha1.PoolService) bool {
klog.V(4).Infof(Format("checkIfVipServiceOnline VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
return r.getVipServiceStatus(ctx, poolService) == ServiceVIPOnline
}

func (r *ReconcileVipLoadBalancer) getVipServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) int {
klog.V(4).Infof(Format("getVipServiceStatus VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))
// Get the reference endpoint
listSelector := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
Expand All @@ -455,11 +507,11 @@ func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context,
endpointSlice := &corev1.EndpointsList{}
if err := r.List(ctx, endpointSlice, listSelector); err != nil {
klog.Errorf(Format("Failed to get Endpoints from PoolService %s/%s: %v", poolService.Namespace, poolService.Name, err))
return false
return ServiceVIPUnknown
}

ready := 0
target := len(endpointSlice.Items) / 2
target := len(endpointSlice.Items)/2 + 1
for _, ep := range endpointSlice.Items {
if ep.Annotations == nil {
continue
Expand All @@ -469,12 +521,16 @@ func (r *ReconcileVipLoadBalancer) checkIfVipServiceOffline(ctx context.Context,
continue
}

if ep.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusReady {
if ep.Annotations[AnnotationServiceVIPStatus] == AnnotationServiceVIPStatusOnline {
ready++
}
}

return ready >= target
if ready < target {
return ServiceVIPOffline
}

return ServiceVIPOnline
}

func (r *ReconcileVipLoadBalancer) addFinalizer(ctx context.Context, poolService *netv1alpha1.PoolService) error {
Expand Down

0 comments on commit a904f69

Please sign in to comment.