Skip to content

Commit

Permalink
Handle L4 ELB RBS controller race with legacy on service creation
Browse files Browse the repository at this point in the history
Clean up RBS resources if we found rbs finalizer and target pool
attached to the service
  • Loading branch information
panslava committed Jun 27, 2022
1 parent 4a8529a commit ac79093
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 31 deletions.
5 changes: 3 additions & 2 deletions pkg/firewalls/firewalls_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package firewalls

import (
"strings"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
"strings"
)

// FirewallParams holds all data needed to create firewall for L4 LB
Expand Down Expand Up @@ -151,7 +152,7 @@ func EnsureL4LBFirewallForHc(svc *v1.Service, shared bool, params *FirewallParam
return ensureFirewall(svc, shared, params, cloud, recorder)
}

// EnsureFirewallForHc creates or updates firewall rule for LB traffic to nodes
// EnsureL4LBFirewallForNodes creates or updates firewall rule for LB traffic to nodes
func EnsureL4LBFirewallForNodes(svc *v1.Service, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error {
return ensureFirewall(svc /*shared = */, false, params, cloud, recorder)
}
4 changes: 2 additions & 2 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeNormal, "SyncLoadBalancerSuccessful",
"Successfully ensured load balancer resources")
if err = updateAnnotations(l4c.ctx, service, syncResult.Annotations); err != nil {
if err = updateL4ResourcesAnnotations(l4c.ctx, service, syncResult.Annotations); err != nil {
l4c.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncLoadBalancerFailed",
"Failed to update annotations for load balancer, err: %v", err)
syncResult.Error = fmt.Errorf("failed to set resource annotations, err: %w", err)
Expand All @@ -258,7 +258,7 @@ func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) *lo
return result
}
// Also remove any ILB annotations from the service metadata
if err := updateAnnotations(l4c.ctx, svc, nil); err != nil {
if err := updateL4ResourcesAnnotations(l4c.ctx, svc, nil); err != nil {
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer",
"Error resetting resource annotations for load balancer: %v", err)
result.Error = fmt.Errorf("failed to reset resource annotations, err: %w", err)
Expand Down
26 changes: 18 additions & 8 deletions pkg/l4lb/l4lbcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
// computeNewAnnotationsIfNeeded checks if new annotations should be added to service.
// If needed creates new service meta object.
// This function is used by External and Internal L4 LB controllers.
func computeNewAnnotationsIfNeeded(svc *v1.Service, newAnnotations map[string]string) *metav1.ObjectMeta {
func computeNewAnnotationsIfNeeded(svc *v1.Service, newAnnotations map[string]string, keysToClean []string) *metav1.ObjectMeta {
newObjectMeta := svc.ObjectMeta.DeepCopy()
newObjectMeta.Annotations = mergeAnnotations(newObjectMeta.Annotations, newAnnotations)
newObjectMeta.Annotations = mergeAnnotations(newObjectMeta.Annotations, newAnnotations, keysToClean)
if reflect.DeepEqual(svc.Annotations, newObjectMeta.Annotations) {
return nil
}
Expand All @@ -47,12 +47,12 @@ func computeNewAnnotationsIfNeeded(svc *v1.Service, newAnnotations map[string]st
// mergeAnnotations merges the new set of l4lb resource annotations with the pre-existing service annotations.
// Existing L4 resource annotation values will be replaced with the values in the new map.
// This function is used by External and Internal L4 LB controllers.
func mergeAnnotations(existing, lbAnnotations map[string]string) map[string]string {
func mergeAnnotations(existing, lbAnnotations map[string]string, keysToClean []string) map[string]string {
if existing == nil {
existing = make(map[string]string)
} else {
// Delete existing L4 annotations.
for _, key := range loadbalancers.L4LBResourceAnnotationKeys {
// Delete existing annotations.
for _, key := range keysToClean {
delete(existing, key)
}
}
Expand All @@ -63,16 +63,26 @@ func mergeAnnotations(existing, lbAnnotations map[string]string) map[string]stri
return existing
}

// updateAnnotations this function checks if new annotations should be added to service and patch service metadata if needed.
func updateAnnotations(ctx *context.ControllerContext, svc *v1.Service, newL4LBAnnotations map[string]string) error {
newObjectMeta := computeNewAnnotationsIfNeeded(svc, newL4LBAnnotations)
// updateL4ResourcesAnnotations this function checks if new annotations should be added to service and patch service metadata if needed.
func updateL4ResourcesAnnotations(ctx *context.ControllerContext, svc *v1.Service, newL4LBAnnotations map[string]string) error {
newObjectMeta := computeNewAnnotationsIfNeeded(svc, newL4LBAnnotations, loadbalancers.L4LBResourceAnnotationKeys)
if newObjectMeta == nil {
return nil
}
klog.V(3).Infof("Patching annotations of service %v/%v", svc.Namespace, svc.Name)
return patch.PatchServiceObjectMetadata(ctx.KubeClient.CoreV1(), svc, *newObjectMeta)
}

// deleteL4RBSAnnotations deletes all annotations which could be added by L4 ELB RBS controller
func deleteL4RBSAnnotations(ctx *context.ControllerContext, svc *v1.Service) error {
newObjectMeta := computeNewAnnotationsIfNeeded(svc, nil, loadbalancers.L4RBSAnnotations)
if newObjectMeta == nil {
return nil
}
klog.V(3).Infof("Deleting all annotations for L4 ELB RBS service %v/%v", svc.Namespace, svc.Name)
return patch.PatchServiceObjectMetadata(ctx.KubeClient.CoreV1(), svc, *newObjectMeta)
}

func deleteAnnotation(ctx *context.ControllerContext, svc *v1.Service, annotationKey string) error {
newObjectMeta := svc.ObjectMeta.DeepCopy()
if _, ok := newObjectMeta.Annotations[annotationKey]; !ok {
Expand Down
45 changes: 32 additions & 13 deletions pkg/l4lb/l4netlbcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,21 @@ func (lc *L4NetLBController) hasRBSAnnotation(service *v1.Service) bool {
return false
}

// Target Pool to RBS migration is NOT yet supported and causes service to break (for now).
// If we detect such case, we remove RBS annotation, so service stays with Legacy Target Pool implementation
func (lc *L4NetLBController) preventTargetPoolToRBSMigration(service *v1.Service) {
func (lc *L4NetLBController) preventLegacyServiceHandling(service *v1.Service, key string) error {
if lc.hasRBSAnnotation(service) && lc.hasTargetPoolForwardingRule(service) {
lc.revertToTargetPool(service)
if utils.HasL4NetLBFinalizerV2(service) {
// If we found that RBS finalizer was attached to service, it means that RBS controller
// had a race condition on Service creation with Legacy Controller.
//It should only happen during service creation and we should clean up RBS resources
return lc.preventTargetPoolRaceWithRBSOnCreation(service, key)
} else {
// Target Pool to RBS migration is NOT yet supported and causes service to break (for now).
// If we detect RBS annotation on legacy service, we remove RBS annotation,
// so service stays with Legacy Target Pool implementation
return lc.preventExistingTargetPoolToRBSMigration(service)
}
}
return nil
}

func (lc *L4NetLBController) hasTargetPoolForwardingRule(service *v1.Service) bool {
Expand All @@ -274,16 +283,26 @@ func (lc *L4NetLBController) hasTargetPoolForwardingRule(service *v1.Service) bo
return false
}

func (lc *L4NetLBController) revertToTargetPool(service *v1.Service) {
func (lc *L4NetLBController) preventTargetPoolRaceWithRBSOnCreation(service *v1.Service, key string) error {
lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "TargetPoolRaceWithRBS",
"Target Pool found on provisioned RBS service. Deleting RBS resources")

metrics.IncreaseL4NetLBTargetPoolRaceWithRBS()
result := lc.garbageCollectRBSNetLB(key, service)
if result.Error != nil {
lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "CleanRBSResourcesForLegacyService",
"Failed to clean RBS resources for load balancer with target pool, err: %v", result.Error)
return result.Error
}
return nil
}

func (lc *L4NetLBController) preventExistingTargetPoolToRBSMigration(service *v1.Service) error {
lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "CanNotMigrateTargetPoolToRBS",
"RBS annotation was attached to the Legacy Target Pool service. Migration is not supported. Removing annotation")
metrics.IncreaseL4NetLBLegacyToRBSMigrationAttempts()

err := lc.deleteRBSAnnotation(service)
if err != nil {
lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "RemoveRBSAnnotationOnTargetPoolServiceFailed",
"Failed to delete rbs annotation for load balancer with target pool, err: %v", err)
}
return lc.deleteRBSAnnotation(service)
}

func (lc *L4NetLBController) deleteRBSAnnotation(service *v1.Service) error {
Expand Down Expand Up @@ -348,7 +367,7 @@ func (lc *L4NetLBController) sync(key string) error {
return nil
}

lc.preventTargetPoolToRBSMigration(svc)
err = lc.preventLegacyServiceHandling(svc, key)

if lc.needsDeletion(svc) {
klog.V(3).Infof("Deleting L4 External LoadBalancer resources for service %s", key)
Expand Down Expand Up @@ -423,7 +442,7 @@ func (lc *L4NetLBController) syncInternal(service *v1.Service) *loadbalancers.L4
}
lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeNormal, "SyncLoadBalancerSuccessful",
"Successfully ensured L4 External LoadBalancer resources")
if err = updateAnnotations(lc.ctx, service, syncResult.Annotations); err != nil {
if err = updateL4ResourcesAnnotations(lc.ctx, service, syncResult.Annotations); err != nil {
lc.ctx.Recorder(service.Namespace).Eventf(service, v1.EventTypeWarning, "SyncExternalLoadBalancerFailed",
"Failed to update annotations for load balancer, err: %v", err)
syncResult.Error = fmt.Errorf("failed to set resource annotations, err: %w", err)
Expand Down Expand Up @@ -473,7 +492,7 @@ func (lc *L4NetLBController) garbageCollectRBSNetLB(key string, svc *v1.Service)
}

// Remove LB annotations from the Service when processing the finalizer.
if err := updateAnnotations(lc.ctx, svc, nil); err != nil {
if err := deleteL4RBSAnnotations(lc.ctx, svc); err != nil {
lc.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeWarning, "DeleteLoadBalancer",
"Error removing resource annotations: %v: %v", err)
result.Error = fmt.Errorf("Failed to reset resource annotations, err: %w", err)
Expand Down
15 changes: 11 additions & 4 deletions pkg/l4lb/l4netlbcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,23 +1157,30 @@ func TestPreventTargetPoolToRBSMigration(t *testing.T) {
controller.ctx.Cloud.Compute().(*cloud.MockGCE).MockForwardingRules.GetHook = testCase.frHook
addNetLBService(controller, svc)

// test only preventTargetPoolToRBSMigration
controller.preventTargetPoolToRBSMigration(svc)
key, err := common.KeyFunc(svc)
if err != nil {
t.Fatalf("common.KeyFunc(%v) returned error %v, want nil", svc, err)
}
// test only preventLegacyServiceHandling
err = controller.preventLegacyServiceHandling(svc, key)
if err != nil {
t.Fatalf("controller.preventLegacyServiceHandling(%v, %s) returned error %v, want nil", svc, key, err)
}

resultSvc, err := controller.ctx.KubeClient.CoreV1().Services(svc.Namespace).Get(context.TODO(), svc.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("controller.ctx.KubeClient.CoreV1().Services(%s).Get(_, %s, _) returned error %v, want nil", svc.Namespace, svc.Name, err)
}
hasRBSAnnotation := controller.hasRBSAnnotation(resultSvc)
if hasRBSAnnotation != testCase.expectRBSAnnotationAfterSync {
t.Errorf("After preventTargetPoolToRBSMigration, hasRBSAnnotation = %t, testCase.expectRBSAnnotationAfterSync = %t, want equal", hasRBSAnnotation, testCase.expectRBSAnnotationAfterSync)
t.Errorf("After preventLegacyServiceHandling, hasRBSAnnotation = %t, testCase.expectRBSAnnotationAfterSync = %t, want equal", hasRBSAnnotation, testCase.expectRBSAnnotationAfterSync)
}

// test that whole sync process is skipped
svc2 := test.NewL4NetLBRBSServiceMultiplePorts("test-2", []int32{30234})
addNetLBService(controller, svc2)

key, err := common.KeyFunc(svc2)
key, err = common.KeyFunc(svc2)
if err != nil {
t.Fatalf("common.KeyFunc(%v) returned error %v, want nil", svc2, err)
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/l4lb/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
L4netlbLatencyMetricName = "l4_netlb_sync_duration_seconds"
L4netlbErrorMetricName = "l4_netlb_sync_error_count"
L4netlbLegacyToRBSMigrationAttemptsMetricName = "l4_netlb_legacy_to_rbs_migration_attempts_count"
L4NetLBTargetPoolRaceWithRBSMetricName = "l4_netlb_target_pool_rbs_race"
l4failedHealthCheckName = "l4_failed_healthcheck_count"
)

Expand Down Expand Up @@ -95,6 +96,12 @@ var (
Help: "Count of customer attempts to migrate legacy service to RBS by adding rbs annotation",
},
)
l4NetLBTargetPoolRaceWithRBS = prometheus.NewCounter(
prometheus.CounterOpts{
Name: L4NetLBTargetPoolRaceWithRBSMetricName,
Help: "Count of times target pool attached to service with RBS finalizer",
},
)
)

// init registers l4 ilb nad netlb sync metrics.
Expand Down Expand Up @@ -149,3 +156,8 @@ func PublishL4FailedHealthCheckCount(controllerName string) {
func IncreaseL4NetLBLegacyToRBSMigrationAttempts() {
l4NetLBLegacyToRBSMigrationAttempts.Inc()
}

// IncreaseL4NetLBTargetPoolRaceWithRBS increases l4NetLBTargetPoolRaceWithRBS metric
func IncreaseL4NetLBTargetPoolRaceWithRBS() {
l4NetLBTargetPoolRaceWithRBS.Inc()
}
7 changes: 5 additions & 2 deletions pkg/loadbalancers/l4syncresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -31,4 +31,7 @@ var L4LBResourceAnnotationKeys = []string{
annotations.UDPForwardingRuleKey,
annotations.HealthcheckKey,
annotations.FirewallRuleKey,
annotations.FirewallRuleForHealthcheckKey}
annotations.FirewallRuleForHealthcheckKey,
}

var L4RBSAnnotations = append(L4LBResourceAnnotationKeys, annotations.RBSAnnotationKey)

0 comments on commit ac79093

Please sign in to comment.