From 3bebea91dcbda9f247a72defe88a947d1b538ba1 Mon Sep 17 00:00:00 2001 From: Cezary Zawadka Date: Thu, 5 May 2022 17:38:20 +0200 Subject: [PATCH 1/2] Rewrite L4 healthchecks creation and deletion - create a common singleton-like struct fot l4 health checks - new struct holds mutex for common resources (healthchecks and their firewall rules used for - [bugfix] delete shared healtcheck firewall rules safely - cross-check between ILB and NLB healthchecks is firewall rules are in use Logging New log line idicating firewall rule not deleted due to cross-check: "Failed to delete health check firewall rule %s: health check in use." Testing - healthcheck management is mostly covered (by existing tests), they required little update. - added test cases for sahred firewall rule deletion(lack of), named TestHealthCheckFirewallDeletionWithILB and TestHealthCheckFirewallDeletionWithNetLB - run test manual tests --- cmd/glbc/main.go | 3 + pkg/firewalls/firewalls_l4.go | 28 ++-- pkg/healthchecks/healthchecks_l4.go | 191 +++++++++++++++++++++++++--- pkg/l4lb/l4controller.go | 4 +- pkg/l4lb/l4controller_test.go | 2 + pkg/l4lb/l4netlbcontroller.go | 13 +- pkg/l4lb/l4netlbcontroller_test.go | 7 +- pkg/loadbalancers/interfaces.go | 11 ++ pkg/loadbalancers/l4.go | 95 +++++--------- pkg/loadbalancers/l4_test.go | 155 +++++++++++++++++----- pkg/loadbalancers/l4netlb.go | 118 ++++++----------- pkg/loadbalancers/l4netlb_test.go | 75 ++++++++++- pkg/utils/utils.go | 7 + 13 files changed, 480 insertions(+), 229 deletions(-) diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 8272064a82..17af41ed1d 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -26,6 +26,7 @@ import ( flag "github.com/spf13/pflag" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/ingress-gce/pkg/frontendconfig" + "k8s.io/ingress-gce/pkg/healthchecks" "k8s.io/ingress-gce/pkg/ingparams" "k8s.io/ingress-gce/pkg/l4lb" "k8s.io/ingress-gce/pkg/psc" @@ -274,6 +275,8 @@ func runControllers(ctx *ingctx.ControllerContext) { fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values()) + healthchecks.InitializeL4(ctx.Cloud, ctx) + if flags.F.RunL4Controller { l4Controller := l4lb.NewILBController(ctx, stopCh) go l4Controller.Run() diff --git a/pkg/firewalls/firewalls_l4.go b/pkg/firewalls/firewalls_l4.go index eef9293365..238095a250 100644 --- a/pkg/firewalls/firewalls_l4.go +++ b/pkg/firewalls/firewalls_l4.go @@ -17,9 +17,6 @@ limitations under the License. package firewalls import ( - "strings" - "sync" - "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" "google.golang.org/api/compute/v1" v1 "k8s.io/api/core/v1" @@ -27,6 +24,7 @@ import ( "k8s.io/ingress-gce/pkg/utils" "k8s.io/klog" "k8s.io/legacy-cloud-providers/gce" + "strings" ) // FirewallParams holds all data needed to create firewall for L4 LB @@ -50,7 +48,7 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam if err != nil { return err } - fwDesc, err := utils.MakeL4LBServiceDescription(nsName, params.IP, meta.VersionGA, sharedRule, params.L4Type) + fwDesc, err := utils.MakeL4LBFirewallDescription(nsName, params.IP, meta.VersionGA, sharedRule) if err != nil { klog.Warningf("EnsureL4FirewallRule(%v): failed to generate description for L4 %s rule, err: %v", params.Name, params.L4Type.ToString(), err) } @@ -78,7 +76,9 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam } return err } - if firewallRuleEqual(expectedFw, existingFw) { + + // Don't compare the "description" field for shared firewall rules + if firewallRuleEqual(expectedFw, existingFw, sharedRule) { return nil } klog.V(2).Infof("EnsureL4FirewallRule(%v): updating L4 %s firewall", params.Name, params.L4Type.ToString()) @@ -103,13 +103,19 @@ func EnsureL4FirewallRuleDeleted(cloud *gce.Cloud, fwName string) error { return nil } -func firewallRuleEqual(a, b *compute.Firewall) bool { - return a.Description == b.Description && - len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) && +func firewallRuleEqual(a, b *compute.Firewall, skipDescription bool) bool { + fwrEqual := len(a.Allowed) == 1 && + len(a.Allowed) == len(b.Allowed) && a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol && utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) && utils.EqualStringSets(a.SourceRanges, b.SourceRanges) && utils.EqualStringSets(a.TargetTags, b.TargetTags) + + // Don't compare the "description" field for shared firewall rules + if skipDescription { + return fwrEqual + } + return fwrEqual && a.Description == b.Description } func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error { @@ -126,12 +132,8 @@ func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud } // EnsureL4LBFirewallForHc creates or updates firewall rule for shared or non-shared health check to nodes -func EnsureL4LBFirewallForHc(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, sharedResourcesLock *sync.Mutex, recorder record.EventRecorder) error { +func EnsureL4LBFirewallForHc(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error { params.SourceRanges = gce.L4LoadBalancerSrcRanges() - if shared { - sharedResourcesLock.Lock() - defer sharedResourcesLock.Unlock() - } return ensureFirewall(svc, shared, params, cloud, recorder) } diff --git a/pkg/healthchecks/healthchecks_l4.go b/pkg/healthchecks/healthchecks_l4.go index 07dd7de2c1..fa5d6c2b82 100644 --- a/pkg/healthchecks/healthchecks_l4.go +++ b/pkg/healthchecks/healthchecks_l4.go @@ -18,13 +18,18 @@ package healthchecks import ( "fmt" + "strconv" + "sync" cloudprovider "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/cloud-provider/service/helpers" + "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/composite" + "k8s.io/ingress-gce/pkg/events" + "k8s.io/ingress-gce/pkg/firewalls" "k8s.io/ingress-gce/pkg/utils" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog" @@ -32,7 +37,6 @@ import ( ) const ( - // L4 Load Balancer parameters gceHcCheckIntervalSeconds = int64(8) gceHcTimeoutSeconds = int64(1) @@ -42,26 +46,107 @@ const ( gceHcUnhealthyThreshold = int64(3) ) -// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, based on the parameters provided. +var ( + // instance is a sinngleton instance, created by InitializeL4 + instance *l4HealthChecks + // mutex for preventing multiple initialization + initLock = &sync.Mutex{} +) + +type l4HealthChecks struct { + mutex sync.Mutex + cloud *gce.Cloud + recorderFactory events.RecorderProducer +} + +// InitializeL4 creates singleton instance, must be run before GetL4() func +func InitializeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) { + if instance == nil { + initLock.Lock() + defer initLock.Unlock() + + if instance == nil { + instance = &l4HealthChecks{ + cloud: cloud, + recorderFactory: recorderFactory, + } + } + } +} + +// FakeL4 creates instance of l4HealthChecks> USe for test only. +func FakeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) *l4HealthChecks { + instance = &l4HealthChecks{ + cloud: cloud, + recorderFactory: recorderFactory, + } + return instance +} + +// GetL4 returns singleton instance, must be run after InitializeL4 +func GetL4() *l4HealthChecks { + return instance +} + +// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, and the firewall rule required +// for the healthcheck. If healthcheck is shared (external traffic policy 'cluster') then firewall rules will be shared +// regardless of scope param. // If the healthcheck already exists, it is updated as needed. -func EnsureL4HealthCheck(cloud *gce.Cloud, svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, string, int32, string, error) { +func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) { hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC) - hcPath, hcPort := gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort() - if !sharedHC { - hcPath, hcPort = helpers.GetServiceHealthCheckPathPort(svc) + hcPath, hcPort := helpers.GetServiceHealthCheckPathPort(svc) + if sharedHC { + hcPath, hcPort = gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort() + // lock out entire EnsureL4HealthCheck process + l4hc.mutex.Lock() + defer l4hc.mutex.Unlock() + } + + namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + _, hcLink, err := l4hc.ensureL4HealthCheckInternal(hcName, namespacedName, sharedHC, hcPath, hcPort, scope, l4Type) + if err != nil { + return "", "", "", annotations.HealthcheckResource, err + } + err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames) + if err != nil { + return "", "", "", annotations.FirewallForHealthcheckResource, err } - nn := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} - _, hcLink, err := ensureL4HealthCheckInternal(cloud, hcName, nn, sharedHC, hcPath, hcPort, scope, l4Type) - return hcLink, hcFwName, hcPort, hcName, err + + return hcLink, hcFwName, hcName, "", err } -func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) { +// DeleteHealthCheck deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete. +func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) { + if sharedHC { + // lock out entire DeleteHealthCheck process + l4hc.mutex.Lock() + defer l4hc.mutex.Unlock() + } + + hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC) + namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + err := utils.IgnoreHTTPNotFound(l4hc.deleteHealthCheck(hcName, scope)) + if err != nil { + if !utils.IsInUsedByError(err) { + klog.Errorf("Failed to delete healthcheck for service %s - %v", namespacedName.String(), err) + return annotations.HealthcheckResource, err + } + // Ignore deletion error due to health check in use by another resource. + // This will be hit if this is a shared healthcheck. + klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName) + return "", nil + } + // Health check deleted, now delete the firewall rule + return l4hc.deleteHealthCheckFirewall(svc, hcName, hcFwName, sharedHC, l4Type) +} + +func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) { selfLink := "" - key, err := composite.CreateKey(cloud, name, scope) + key, err := composite.CreateKey(l4hc.cloud, name, scope) if err != nil { - return nil, selfLink, fmt.Errorf("Failed to create composite key for healthcheck %s - %w", name, err) + return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", name, svcName.String()) } - hc, err := composite.GetHealthCheck(cloud, key, meta.VersionGA) + hc, err := composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA) if err != nil { if !utils.IsNotFoundError(err) { return nil, selfLink, err @@ -69,17 +154,17 @@ func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.Na } var region string if scope == meta.Regional { - region = cloud.Region() + region = l4hc.cloud.Region() } expectedHC := NewL4HealthCheck(name, svcName, shared, path, port, l4Type, scope, region) if hc == nil { // Create the healthcheck klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared) - err = composite.CreateHealthCheck(cloud, key, expectedHC) + err = composite.CreateHealthCheck(l4hc.cloud, key, expectedHC) if err != nil { return nil, selfLink, err } - selfLink = cloudprovider.SelfLink(meta.VersionGA, cloud.ProjectID(), "healthChecks", key) + selfLink = cloudprovider.SelfLink(meta.VersionGA, l4hc.cloud.ProjectID(), "healthChecks", key) return expectedHC, selfLink, nil } selfLink = hc.SelfLink @@ -89,19 +174,85 @@ func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.Na } mergeHealthChecks(hc, expectedHC) klog.V(2).Infof("Updating healthcheck %s for service %s", name, svcName) - err = composite.UpdateHealthCheck(cloud, key, expectedHC) + err = composite.UpdateHealthCheck(l4hc.cloud, key, expectedHC) if err != nil { return nil, selfLink, err } return expectedHC, selfLink, err } -func DeleteHealthCheck(cloud *gce.Cloud, name string, scope meta.KeyType) error { - key, err := composite.CreateKey(cloud, name, scope) +// ensureFirewall rule for L4 service. +// The firewall rules are the same for ILB and NetLB that use external traffic policy 'local' (sharedHC == true) +// despite healthchecks have different scopes (global vs regional) +func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string, hcPort int32, sharedHC bool, nodeNames []string) error { + // Add firewall rule for healthchecks to nodes + hcFWRParams := firewalls.FirewallParams{ + PortRanges: []string{strconv.Itoa(int(hcPort))}, + SourceRanges: gce.L4LoadBalancerSrcRanges(), + Protocol: string(corev1.ProtocolTCP), + Name: hcFwName, + NodeNames: nodeNames, + } + return firewalls.EnsureL4LBFirewallForHc(svc, sharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorderFactory.Recorder(svc.Namespace)) +} + +func (l4hc *l4HealthChecks) deleteHealthCheck(name string, scope meta.KeyType) error { + key, err := composite.CreateKey(l4hc.cloud, name, scope) if err != nil { return fmt.Errorf("Failed to create composite key for healthcheck %s - %w", name, err) } - return composite.DeleteHealthCheck(cloud, key, meta.VersionGA) + return composite.DeleteHealthCheck(l4hc.cloud, key, meta.VersionGA) +} + +func (l4hc *l4HealthChecks) deleteHealthCheckFirewall(svc *corev1.Service, hcName, hcFwName string, sharedHC bool, l4Type utils.L4LBType) (string, error) { + namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + + safeToDelete, err := l4hc.healthCheckFirewallSafeToDelete(hcName, sharedHC, l4Type) + if err != nil { + klog.Errorf("Failed to delete health check firewall rule %s for service %s - %v", hcFwName, namespacedName.String(), err) + return annotations.HealthcheckResource, err + } + if !safeToDelete { + klog.V(2).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName) + return "", nil + } + // Delete healthcheck firewall rule if no healthcheck uses the firewall rule. + err = l4hc.deleteFirewall(hcFwName, svc) + if err != nil { + klog.Errorf("Failed to delete firewall rule %s for internal loadbalancer service %s, err %v", hcFwName, namespacedName.String(), err) + return annotations.FirewallForHealthcheckResource, err + } + return "", nil +} + +func (l4hc *l4HealthChecks) healthCheckFirewallSafeToDelete(hcName string, sharedHC bool, l4Type utils.L4LBType) (bool, error) { + if !sharedHC { + return true, nil + } + var scopeToCheck meta.KeyType + scopeToCheck = meta.Regional + if l4Type == utils.XLB { + scopeToCheck = meta.Global + } + key, err := composite.CreateKey(l4hc.cloud, hcName, scopeToCheck) + if err != nil { + return false, fmt.Errorf("Failed to create composite key for healthcheck %s - %w", hcName, err) + } + _, err = composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA) + return utils.IsNotFoundError(err), nil +} + +func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) error { + err := firewalls.EnsureL4FirewallRuleDeleted(l4hc.cloud, name) + if err != nil { + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + recorder := l4hc.recorderFactory.Recorder(svc.Namespace) + recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message) + return nil + } + return err + } + return nil } func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck { diff --git a/pkg/l4lb/l4controller.go b/pkg/l4lb/l4controller.go index d1c36ff5cc..f179a7c4ad 100644 --- a/pkg/l4lb/l4controller.go +++ b/pkg/l4lb/l4controller.go @@ -188,7 +188,7 @@ func (l4c *L4Controller) shouldProcessService(service *v1.Service, l4 *loadbalan // processServiceCreateOrUpdate ensures load balancer resources for the given service, as needed. // Returns an error if processing the service update failed. func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Service) *loadbalancers.L4ILBSyncResult { - l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace), &l4c.sharedResourcesLock) + l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace)) if !l4c.shouldProcessService(service, l4) { return nil } @@ -241,7 +241,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se } func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) *loadbalancers.L4ILBSyncResult { - l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(svc.Namespace), &l4c.sharedResourcesLock) + l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(svc.Namespace)) l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer for %s", key) result := l4.EnsureInternalLoadBalancerDeleted(svc) if result.Error != nil { diff --git a/pkg/l4lb/l4controller_test.go b/pkg/l4lb/l4controller_test.go index a9819d175d..d49c989cb2 100644 --- a/pkg/l4lb/l4controller_test.go +++ b/pkg/l4lb/l4controller_test.go @@ -19,6 +19,7 @@ package l4lb import ( context2 "context" "fmt" + "k8s.io/ingress-gce/pkg/healthchecks" "testing" "time" @@ -70,6 +71,7 @@ func newServiceController(t *testing.T, fakeGCE *gce.Cloud) *L4Controller { for _, n := range nodes { ctx.NodeInformer.GetIndexer().Add(n) } + healthchecks.FakeL4(ctx.Cloud, ctx) return NewILBController(ctx, stopCh) } diff --git a/pkg/l4lb/l4netlbcontroller.go b/pkg/l4lb/l4netlbcontroller.go index fdb912a211..96d520ad51 100644 --- a/pkg/l4lb/l4netlbcontroller.go +++ b/pkg/l4lb/l4netlbcontroller.go @@ -18,9 +18,6 @@ package l4lb import ( "fmt" - "reflect" - "sync" - "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" v1 "k8s.io/api/core/v1" @@ -38,6 +35,7 @@ import ( "k8s.io/ingress-gce/pkg/utils/common" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog" + "reflect" ) const l4NetLBControllerName = "l4netlb-controller" @@ -54,8 +52,7 @@ type L4NetLBController struct { // enqueueTracker tracks the latest time an update was enqueued enqueueTracker utils.TimeTracker // syncTracker tracks the latest time an enqueued service was synced - syncTracker utils.TimeTracker - sharedResourcesLock sync.Mutex + syncTracker utils.TimeTracker backendPool *backends.Backends instancePool instances.NodePool @@ -236,7 +233,7 @@ func (lc *L4NetLBController) hasForwardingRuleAnnotation(svc *v1.Service, frName // hasRBSForwardingRule checks if services loadbalancer has forwarding rule pointing to backend service func (lc *L4NetLBController) hasRBSForwardingRule(svc *v1.Service) bool { - l4netlb := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace), &lc.sharedResourcesLock) + l4netlb := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace)) frName := l4netlb.GetFRName() // to optimize number of api calls, at first, check if forwarding rule exists in annotation if lc.hasForwardingRuleAnnotation(svc, frName) { @@ -322,7 +319,7 @@ func (lc *L4NetLBController) sync(key string) error { // syncInternal ensures load balancer resources for the given service, as needed. // Returns an error if processing the service update failed. func (lc *L4NetLBController) syncInternal(service *v1.Service) *loadbalancers.L4NetLBSyncResult { - l4netlb := loadbalancers.NewL4NetLB(service, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(service.Namespace), &lc.sharedResourcesLock) + l4netlb := loadbalancers.NewL4NetLB(service, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(service.Namespace)) // check again that rbs is enabled. if !lc.isRBSBasedService(service) { klog.Infof("Skipping syncInternal. Service %s does not have RBS enabled", service.Name) @@ -400,7 +397,7 @@ func (lc *L4NetLBController) ensureInstanceGroups(service *v1.Service, nodeNames // garbageCollectRBSNetLB cleans-up all gce resources related to service and removes NetLB finalizer func (lc *L4NetLBController) garbageCollectRBSNetLB(key string, svc *v1.Service) *loadbalancers.L4NetLBSyncResult { - l4netLB := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace), &lc.sharedResourcesLock) + l4netLB := loadbalancers.NewL4NetLB(svc, lc.ctx.Cloud, meta.Regional, lc.namer, lc.ctx.Recorder(svc.Namespace)) lc.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting L4 External LoadBalancer for %s", key) diff --git a/pkg/l4lb/l4netlbcontroller_test.go b/pkg/l4lb/l4netlbcontroller_test.go index bb9d58f1e4..818f64948c 100644 --- a/pkg/l4lb/l4netlbcontroller_test.go +++ b/pkg/l4lb/l4netlbcontroller_test.go @@ -24,7 +24,6 @@ import ( "reflect" "sort" "strings" - "sync" "testing" "time" @@ -242,7 +241,7 @@ func newL4NetLBServiceController() *L4NetLBController { for _, n := range nodes { ctx.NodeInformer.GetIndexer().Add(n) } - + healthchecks.FakeL4(ctx.Cloud, ctx) return NewL4NetLBController(ctx, stopCh) } @@ -842,7 +841,7 @@ func TestHealthCheckWhenExternalTrafficPolicyWasUpdated(t *testing.T) { // delete shared health check if is created, update service to Cluster and // check that non-shared health check was created hcNameShared, _ := lc.namer.L4HealthCheck(svc.Namespace, svc.Name, true) - healthchecks.DeleteHealthCheck(lc.ctx.Cloud, hcNameShared, meta.Regional) + healthchecks.FakeL4(lc.ctx.Cloud, lc.ctx).DeleteHealthCheck(svc, lc.namer, true, meta.Regional, utils.XLB) // Update ExternalTrafficPolicy to Cluster check if shared HC was created err = updateAndAssertExternalTrafficPolicy(newSvc, lc, v1.ServiceExternalTrafficPolicyTypeCluster, hcNameShared) if err != nil { @@ -974,7 +973,7 @@ func TestIsRBSBasedService(t *testing.T) { func TestIsRBSBasedServiceWithILBServices(t *testing.T) { controller := newL4NetLBServiceController() ilbSvc := test.NewL4ILBService(false, 8080) - ilbFrName := loadbalancers.NewL4Handler(ilbSvc, controller.ctx.Cloud, meta.Regional, controller.namer, record.NewFakeRecorder(100), &sync.Mutex{}).GetFRName() + ilbFrName := loadbalancers.NewL4Handler(ilbSvc, controller.ctx.Cloud, meta.Regional, controller.namer, record.NewFakeRecorder(100)).GetFRName() ilbSvc.Annotations = map[string]string{ annotations.TCPForwardingRuleKey: ilbFrName, annotations.UDPForwardingRuleKey: ilbFrName, diff --git a/pkg/loadbalancers/interfaces.go b/pkg/loadbalancers/interfaces.go index c0312fb4bc..c6106cba3e 100644 --- a/pkg/loadbalancers/interfaces.go +++ b/pkg/loadbalancers/interfaces.go @@ -18,7 +18,10 @@ package loadbalancers import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/networking/v1" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/namer" ) // LoadBalancerPool is an interface to manage the cloud resources associated @@ -37,3 +40,11 @@ type LoadBalancerPool interface { // HasUrlMap returns true if an URL map exists in GCE for given ingress. HasUrlMap(ing *v1.Ingress) (bool, error) } + +// L4HealthChecks defines methods for creating and deleting health checks (and their firewall rules) for l4 services +type L4HealthChecks interface { + // EnsureL4HealthCheck creates health check (and firewall rule) for l4 service + EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) + // DeleteHealthCheck deletes health check (and firewall rule) for l4 service + DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) +} diff --git a/pkg/loadbalancers/l4.go b/pkg/loadbalancers/l4.go index f0b848487e..a0aba53651 100644 --- a/pkg/loadbalancers/l4.go +++ b/pkg/loadbalancers/l4.go @@ -18,9 +18,7 @@ package loadbalancers import ( "fmt" - "strconv" "strings" - "sync" "time" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" @@ -49,11 +47,11 @@ type L4 struct { scope meta.KeyType namer namer.L4ResourcesNamer // recorder is used to generate k8s Events. - recorder record.EventRecorder - Service *corev1.Service - ServicePort utils.ServicePort - NamespacedName types.NamespacedName - sharedResourcesLock *sync.Mutex + recorder record.EventRecorder + Service *corev1.Service + ServicePort utils.ServicePort + NamespacedName types.NamespacedName + l4HealthChecks L4HealthChecks } // L4ILBSyncResult contains information about the outcome of an L4 ILB sync. It stores the list of resource name annotations, @@ -69,8 +67,15 @@ type L4ILBSyncResult struct { } // NewL4Handler creates a new L4Handler for the given L4 service. -func NewL4Handler(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer namer.L4ResourcesNamer, recorder record.EventRecorder, lock *sync.Mutex) *L4 { - l := &L4{cloud: cloud, scope: scope, namer: namer, recorder: recorder, Service: service, sharedResourcesLock: lock} +func NewL4Handler(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer namer.L4ResourcesNamer, recorder record.EventRecorder) *L4 { + l := &L4{ + cloud: cloud, + scope: scope, + namer: namer, + recorder: recorder, + Service: service, + l4HealthChecks: healthchecks.GetL4(), + } l.NamespacedName = types.NamespacedName{Name: service.Name, Namespace: service.Namespace} l.backendPool = backends.NewPool(l.cloud, l.namer) l.ServicePort = utils.ServicePort{ID: utils.ServicePortID{Service: l.NamespacedName}, BackendNamer: l.namer, @@ -150,39 +155,28 @@ func (l *L4) EnsureInternalLoadBalancerDeleted(svc *corev1.Service) *L4ILBSyncRe // when externalTrafficPolicy is changed from Local to Cluster and a new health check was created. // When service is deleted we need to check both health checks shared and non-shared // and delete them if needed. - deleteHcFunc := func(sharedHC bool) { - hcName, hcFwName := l.namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC) - if sharedHC { - l.sharedResourcesLock.Lock() - defer l.sharedResourcesLock.Unlock() - } - err = utils.IgnoreHTTPNotFound(healthchecks.DeleteHealthCheck(l.cloud, hcName, meta.Global)) + for _, isShared := range []bool{true, false} { + resourceInError, err := l.l4HealthChecks.DeleteHealthCheck(svc, l.namer, isShared, meta.Global, utils.ILB) if err != nil { - if !utils.IsInUsedByError(err) { - klog.Errorf("Failed to delete healthcheck for internal loadbalancer service %s - %v", l.NamespacedName.String(), err) - result.GCEResourceInError = annotations.HealthcheckResource - result.Error = err - return - } - // Ignore deletion error due to health check in use by another resource. - // This will be hit if this is a shared healthcheck. - klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName) - } else { - // Delete healthcheck firewall rule if healthcheck deletion is successful. - err = deleteFwFunc(hcFwName) - if err != nil { - klog.Errorf("Failed to delete firewall rule %s for internal loadbalancer service %s, err %v", hcFwName, l.NamespacedName.String(), err) - result.GCEResourceInError = annotations.FirewallForHealthcheckResource - result.Error = err - } + result.GCEResourceInError = resourceInError + result.Error = err } } - for _, isShared := range []bool{true, false} { - deleteHcFunc(isShared) - } return result } +func (l *L4) deleteFirewall(name string) error { + err := firewalls.EnsureL4FirewallRuleDeleted(l.cloud, name) + if err != nil { + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + l.recorder.Eventf(l.Service, corev1.EventTypeNormal, "XPN", fwErr.Message) + return nil + } + return err + } + return nil +} + // GetFRName returns the name of the forwarding rule for the given ILB service. // This appends the protocol to the forwarding rule name, which will help supporting multiple protocols in the same ILB // service. @@ -221,18 +215,10 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) // create healthcheck sharedHC := !helpers.RequestsOnlyLocalTraffic(l.Service) - ensureHCFunc := func() (string, string, int32, string, error) { - if sharedHC { - // Take the lock when creating the shared healthcheck - l.sharedResourcesLock.Lock() - defer l.sharedResourcesLock.Unlock() - } - return healthchecks.EnsureL4HealthCheck(l.cloud, l.Service, l.namer, sharedHC, meta.Global, utils.ILB) - } + hcLink, hcFwName, hcName, resourceInErr, err := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, nodeNames) - hcLink, hcFwName, hcPort, hcName, err := ensureHCFunc() if err != nil { - result.GCEResourceInError = annotations.HealthcheckResource + result.GCEResourceInError = resourceInErr result.Error = err return result } @@ -253,7 +239,6 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) Protocol: string(protocol), Name: name, NodeNames: nodeNames, - L4Type: utils.ILB, } if err := firewalls.EnsureL4LBFirewallForNodes(l.Service, &nodesFWRParams, l.cloud, l.recorder); err != nil { @@ -262,22 +247,6 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) return result } result.Annotations[annotations.FirewallRuleKey] = name - - // Add firewall rule for healthchecks to nodes - hcFWRParams := firewalls.FirewallParams{ - PortRanges: []string{strconv.Itoa(int(hcPort))}, - SourceRanges: gce.L4LoadBalancerSrcRanges(), - Protocol: string(corev1.ProtocolTCP), - Name: hcFwName, - NodeNames: nodeNames, - L4Type: utils.ILB, - } - err = firewalls.EnsureL4LBFirewallForHc(l.Service, sharedHC, &hcFWRParams, l.cloud, l.sharedResourcesLock, l.recorder) - if err != nil { - result.GCEResourceInError = annotations.FirewallForHealthcheckResource - result.Error = err - return result - } result.Annotations[annotations.FirewallRuleForHealthcheckKey] = hcFwName // Check if protocol has changed for this service. In this case, forwarding rule should be deleted before diff --git a/pkg/loadbalancers/l4_test.go b/pkg/loadbalancers/l4_test.go index 2378f1b6cb..6c02d0ac9f 100644 --- a/pkg/loadbalancers/l4_test.go +++ b/pkg/loadbalancers/l4_test.go @@ -19,9 +19,9 @@ limitations under the License. import ( "context" "fmt" + "k8s.io/ingress-gce/pkg/healthchecks" "reflect" "strings" - "sync" "testing" "google.golang.org/api/compute/v1" @@ -37,7 +37,6 @@ import ( servicehelper "k8s.io/cloud-provider/service/helpers" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/composite" - "k8s.io/ingress-gce/pkg/healthchecks" "k8s.io/ingress-gce/pkg/test" namer_util "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/legacy-cloud-providers/gce" @@ -65,9 +64,12 @@ func getFakeGCECloud(vals gce.TestClusterValues) *gce.Cloud { func TestEnsureInternalBackendServiceUpdates(t *testing.T) { t.Parallel() fakeGCE := getFakeGCECloud(gce.DefaultTestClusterValues()) + svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + bsName, _ := l.namer.L4Backend(l.Service.Namespace, l.Service.Name) _, err := l.backendPool.EnsureL4BackendService(bsName, "", "TCP", string(svc.Spec.SessionAffinity), string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA) if err != nil { @@ -116,7 +118,9 @@ func TestEnsureInternalLoadBalancer(t *testing.T) { svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -171,7 +175,9 @@ func TestEnsureInternalLoadBalancerTypeChange(t *testing.T) { svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -202,7 +208,10 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) { fakeGCE := getFakeGCECloud(vals) svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -211,16 +220,8 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) { // Create the expected resources necessary for an Internal Load Balancer sharedHC := !servicehelper.RequestsOnlyLocalTraffic(svc) - ensureHCFunc := func() (string, string, int32, string, error) { - if sharedHC { - // Take the lock when creating the shared healthcheck - l.sharedResourcesLock.Lock() - defer l.sharedResourcesLock.Unlock() - } - return healthchecks.EnsureL4HealthCheck(l.cloud, l.Service, l.namer, sharedHC, meta.Global, utils.ILB) - } + hcLink, _, _, _, err := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, []string{}) - hcLink, _, _, _, err := ensureHCFunc() if err != nil { t.Errorf("Failed to create healthcheck, err %v", err) } @@ -248,9 +249,12 @@ func TestEnsureInternalLoadBalancerClearPreviousResources(t *testing.T) { nodeNames := []string{"test-node-1"} fakeGCE := getFakeGCECloud(vals) + svc := test.NewL4ILBService(true, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName) if err != nil { t.Errorf("Unexpected error when adding nodes %v", err) @@ -365,9 +369,12 @@ func TestUpdateResourceLinks(t *testing.T) { nodeNames := []string{"test-node-1"} fakeGCE := getFakeGCECloud(vals) + svc := test.NewL4ILBService(true, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName) if err != nil { t.Errorf("Unexpected error when adding nodes %v", err) @@ -440,9 +447,12 @@ func TestEnsureInternalLoadBalancerHealthCheckConfigurable(t *testing.T) { nodeNames := []string{"test-node-1"} fakeGCE := getFakeGCECloud(vals) + svc := test.NewL4ILBService(true, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName) if err != nil { t.Errorf("Unexpected error when adding nodes %v", err) @@ -479,10 +489,13 @@ func TestEnsureInternalLoadBalancerDeleted(t *testing.T) { vals := gce.DefaultTestClusterValues() fakeGCE := getFakeGCECloud(vals) + nodeNames := []string{"test-node-1"} svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -508,10 +521,13 @@ func TestEnsureInternalLoadBalancerDeletedTwiceDoesNotError(t *testing.T) { vals := gce.DefaultTestClusterValues() fakeGCE := getFakeGCECloud(vals) + nodeNames := []string{"test-node-1"} svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -544,6 +560,7 @@ func TestEnsureInternalLoadBalancerDeletedWithSharedHC(t *testing.T) { vals := gce.DefaultTestClusterValues() fakeGCE := getFakeGCECloud(vals) + (fakeGCE.Compute().(*cloud.MockGCE)).MockHealthChecks.DeleteHook = test.DeleteHealthCheckResourceInUseErrorHook nodeNames := []string{"test-node-1"} namer := namer_util.NewL4Namer(kubeSystemUID, nil) @@ -570,9 +587,64 @@ func TestEnsureInternalLoadBalancerDeletedWithSharedHC(t *testing.T) { } } +func TestHealthCheckFirewallDeletionWithNetLB(t *testing.T) { + t.Parallel() + vals := gce.DefaultTestClusterValues() + fakeGCE := getFakeGCECloud(vals) + + nodeNames := []string{"test-node-1"} + namer := namer_util.NewL4Namer(kubeSystemUID, nil) + + // Create ILB Service + ilbSvc, l, result := ensureService(fakeGCE, namer, nodeNames, vals.ZoneName, 8081, t) + if result != nil && result.Error != nil { + t.Fatalf("Error ensuring service err: %v", result.Error) + } + + // Create NetLB Service + netlbSvc := test.NewL4NetLBRBSService(8080) + l4NetLB := NewL4NetLB(netlbSvc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + // make sure both ilb and netlb use the same l4 healtcheck instance + l4NetLB.l4HealthChecks = l.l4HealthChecks + + // create netlb resources + xlbResult := l4NetLB.EnsureFrontend(nodeNames, netlbSvc) + if xlbResult.Error != nil { + t.Errorf("Failed to ensure loadBalancer, err %v", xlbResult.Error) + } + if len(xlbResult.Status.Ingress) == 0 { + t.Errorf("Got empty loadBalancer status using handler %v", l4NetLB) + } + assertNetLbResources(t, netlbSvc, l4NetLB, nodeNames) + + // Delete the ILB loadbalancer + result = l.EnsureInternalLoadBalancerDeleted(ilbSvc) + if result.Error != nil { + t.Errorf("Unexpected error %v", result.Error) + } + + // When NetLB health check uses the same firewall rules we expect that hc firewall rule will not be deleted. + haName, hcFwName := l.namer.L4HealthCheck(l.Service.Namespace, l.Service.Name, true) + firewall, err := l.cloud.GetFirewall(hcFwName) + if err != nil { + t.Errorf("Expected error: firewall exists, got %v", err) + } + if firewall == nil { + t.Error("Healthcheck Firewall should still exist, got nil") + } + + // The healthcheck itself should be deleted. + healthcheck, err := l.cloud.GetHealthCheck(haName) + if err == nil || healthcheck != nil { + t.Errorf("Expected error when looking up shared healthcheck after deletion") + } +} + func ensureService(fakeGCE *gce.Cloud, namer *namer_util.L4Namer, nodeNames []string, zoneName string, port int, t *testing.T) (*v1.Service, *L4, *L4ILBSyncResult) { svc := test.NewL4ILBService(false, 8080) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, zoneName); err != nil { return nil, nil, &L4ILBSyncResult{Error: fmt.Errorf("Unexpected error when adding nodes %v", err)} } @@ -591,10 +663,13 @@ func ensureService(fakeGCE *gce.Cloud, namer *namer_util.L4Namer, nodeNames []st func TestEnsureInternalLoadBalancerWithSpecialHealthCheck(t *testing.T) { vals := gce.DefaultTestClusterValues() fakeGCE := getFakeGCECloud(vals) + nodeNames := []string{"test-node-1"} svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -691,14 +766,17 @@ func TestEnsureInternalLoadBalancerErrors(t *testing.T) { }, } { t.Run(desc, func(t *testing.T) { - fakeGCE := getFakeGCECloud(gce.DefaultTestClusterValues()) nodeNames := []string{"test-node-1"} params = newEnsureILBParams() if tc.adjustParams != nil { tc.adjustParams(params) } namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(params.service, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + fakeGCE := getFakeGCECloud(gce.DefaultTestClusterValues()) + + l := NewL4Handler(params.service, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + //lbName := l.namer.L4Backend(params.service.Namespace, params.service.Name) frName := l.GetFRName() key, err := composite.CreateKey(l.cloud, frName, meta.Regional) @@ -779,7 +857,9 @@ func TestEnsureInternalLoadBalancerEnableGlobalAccess(t *testing.T) { nodeNames := []string{"test-node-1"} svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -859,7 +939,9 @@ func TestEnsureInternalLoadBalancerCustomSubnet(t *testing.T) { svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -952,9 +1034,12 @@ func TestEnsureInternalLoadBalancerCustomSubnet(t *testing.T) { func TestEnsureInternalFirewallPortRanges(t *testing.T) { vals := gce.DefaultTestClusterValues() fakeGCE := getFakeGCECloud(vals) + svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + fwName, _ := l.namer.L4Backend(l.Service.Namespace, l.Service.Name) tc := struct { Input []int @@ -980,7 +1065,6 @@ func TestEnsureInternalFirewallPortRanges(t *testing.T) { PortRanges: utils.GetPortRanges(tc.Input), NodeNames: nodeNames, Protocol: string(v1.ProtocolTCP), - L4Type: utils.ILB, IP: "1.2.3.4", } firewalls.EnsureL4FirewallRule(l.cloud, utils.ServiceKeyFunc(svc.Namespace, svc.Name), &fwrParams /*sharedRule = */, false) @@ -1002,11 +1086,14 @@ func TestEnsureInternalLoadBalancerModifyProtocol(t *testing.T) { vals := gce.DefaultTestClusterValues() fakeGCE := getFakeGCECloud(vals) + c := fakeGCE.Compute().(*cloud.MockGCE) nodeNames := []string{"test-node-1"} svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName) if err != nil { t.Errorf("Unexpected error when adding nodes %v", err) @@ -1092,10 +1179,13 @@ func TestEnsureInternalLoadBalancerAllPorts(t *testing.T) { vals := gce.DefaultTestClusterValues() fakeGCE := getFakeGCECloud(vals) + nodeNames := []string{"test-node-1"} svc := test.NewL4ILBService(false, 8080) namer := namer_util.NewL4Namer(kubeSystemUID, nil) - l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l := NewL4Handler(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + if _, err := test.CreateAndInsertNodes(l.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) } @@ -1185,6 +1275,8 @@ func assertInternalLbResources(t *testing.T, apiService *v1.Service, l *L4, node sharedHC := !servicehelper.RequestsOnlyLocalTraffic(apiService) resourceName, _ := l.namer.L4Backend(l.Service.Namespace, l.Service.Name) resourceDesc, err := utils.MakeL4LBServiceDescription(utils.ServiceKeyFunc(apiService.Namespace, apiService.Name), "", meta.VersionGA, false, utils.ILB) + // todo fix this shit. + //firewallDesc, err := utils.MakeL4LBFirewallDescription(utils.ServiceKeyFunc(apiService.Namespace, apiService.Name), "", meta.VersionGA, false, utils.ILB) if err != nil { t.Errorf("Failed to create description for resources, err %v", err) } @@ -1200,6 +1292,7 @@ func assertInternalLbResources(t *testing.T, apiService *v1.Service, l *L4, node if sharedHC { hcDesc = sharedResourceDesc } + type nameAndDesc struct { fwName string fwDesc string @@ -1224,7 +1317,7 @@ func assertInternalLbResources(t *testing.T, apiService *v1.Service, l *L4, node if len(firewall.SourceRanges) == 0 { t.Fatalf("Unexpected empty source range for firewall rule %v", firewall) } - if firewall.Description != info.fwDesc { + if !sharedHC && firewall.Description != info.fwDesc { t.Errorf("Unexpected description in firewall %q - Expected %s, Got %s", info.fwName, firewall.Description, info.fwDesc) } } diff --git a/pkg/loadbalancers/l4netlb.go b/pkg/loadbalancers/l4netlb.go index d55ecccab0..8a0fd753ab 100644 --- a/pkg/loadbalancers/l4netlb.go +++ b/pkg/loadbalancers/l4netlb.go @@ -18,8 +18,6 @@ package loadbalancers import ( "fmt" - "strconv" - "sync" "time" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" @@ -47,11 +45,11 @@ type L4NetLB struct { scope meta.KeyType namer namer.L4ResourcesNamer // recorder is used to generate k8s Events. - recorder record.EventRecorder - Service *corev1.Service - ServicePort utils.ServicePort - NamespacedName types.NamespacedName - sharedResourcesLock *sync.Mutex + recorder record.EventRecorder + Service *corev1.Service + ServicePort utils.ServicePort + NamespacedName types.NamespacedName + l4HealthChecks L4HealthChecks } // L4NetLBSyncResult contains information about the outcome of an L4 NetLB sync. It stores the list of resource name annotations, @@ -67,15 +65,15 @@ type L4NetLBSyncResult struct { } // NewL4NetLB creates a new Handler for the given L4NetLB service. -func NewL4NetLB(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer namer.L4ResourcesNamer, recorder record.EventRecorder, lock *sync.Mutex) *L4NetLB { +func NewL4NetLB(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer namer.L4ResourcesNamer, recorder record.EventRecorder) *L4NetLB { l4netlb := &L4NetLB{cloud: cloud, - scope: scope, - namer: namer, - recorder: recorder, - Service: service, - sharedResourcesLock: lock, - NamespacedName: types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, - backendPool: backends.NewPool(cloud, namer), + scope: scope, + namer: namer, + recorder: recorder, + Service: service, + NamespacedName: types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, + backendPool: backends.NewPool(cloud, namer), + l4HealthChecks: healthchecks.GetL4(), } portId := utils.ServicePortID{Service: l4netlb.NamespacedName} l4netlb.ServicePort = utils.ServicePort{ @@ -108,25 +106,19 @@ func (l4netlb *L4NetLB) EnsureFrontend(nodeNames []string, svc *corev1.Service) } l4netlb.Service = svc + sharedHC := !helpers.RequestsOnlyLocalTraffic(svc) - ensureHCFunc := func() (string, string, int32, string, error) { - if sharedHC { - // Take the lock when creating the shared healthcheck - l4netlb.sharedResourcesLock.Lock() - defer l4netlb.sharedResourcesLock.Unlock() - } - return healthchecks.EnsureL4HealthCheck(l4netlb.cloud, l4netlb.Service, l4netlb.namer, sharedHC, l4netlb.scope, utils.XLB) - } - hcLink, hcFwName, hcPort, hcName, err := ensureHCFunc() + hcLink, hcFwName, hcName, resourceInErr, err := l4netlb.l4HealthChecks.EnsureL4HealthCheck(l4netlb.Service, l4netlb.namer, sharedHC, l4netlb.scope, utils.XLB, nodeNames) + if err != nil { - result.GCEResourceInError = annotations.HealthcheckResource + result.GCEResourceInError = resourceInErr result.Error = fmt.Errorf("Failed to ensure health check %s - %w", hcName, err) return result } result.Annotations[annotations.HealthcheckKey] = hcName name := l4netlb.ServicePort.BackendName() - protocol, res := l4netlb.createFirewalls(name, hcLink, hcFwName, hcPort, nodeNames, sharedHC) + protocol, res := l4netlb.createFirewalls(name, nodeNames) if res.Error != nil { return res } @@ -184,19 +176,7 @@ func (l4netlb *L4NetLB) EnsureLoadBalancerDeleted(svc *corev1.Service) *L4NetLBS result.GCEResourceInError = annotations.AddressResource } - deleteFwFunc := func(name string) error { - err := firewalls.EnsureL4FirewallRuleDeleted(l4netlb.cloud, name) - if err != nil { - if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { - l4netlb.recorder.Eventf(l4netlb.Service, corev1.EventTypeNormal, "XPN", fwErr.Message) - return nil - } - return err - } - return nil - } - // delete firewall rule allowing load balancer source ranges - err = deleteFwFunc(name) + err = l4netlb.deleteFirewall(name) if err != nil { klog.Errorf("Failed to delete firewall rule %s for service %s - %v", name, l4netlb.NamespacedName.String(), err) result.GCEResourceInError = annotations.FirewallRuleResource @@ -209,45 +189,35 @@ func (l4netlb *L4NetLB) EnsureLoadBalancerDeleted(svc *corev1.Service) *L4NetLBS result.GCEResourceInError = annotations.BackendServiceResource result.Error = err } + // Delete healthcheck // We don't delete health check during service update so // it is possible that there might be some health check leak // when externalTrafficPolicy is changed from Local to Cluster and new a health check was created. // When service is deleted we need to check both health checks shared and non-shared // and delete them if needed. - deleteHcFunc := func(sharedHC bool) { - hcName, hcFwName := l4netlb.namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC) - if sharedHC { - l4netlb.sharedResourcesLock.Lock() - defer l4netlb.sharedResourcesLock.Unlock() - } - err = utils.IgnoreHTTPNotFound(healthchecks.DeleteHealthCheck(l4netlb.cloud, hcName, meta.Regional)) + for _, isShared := range []bool{true, false} { + resourceInError, err := l4netlb.l4HealthChecks.DeleteHealthCheck(svc, l4netlb.namer, isShared, meta.Regional, utils.XLB) if err != nil { - if !utils.IsInUsedByError(err) { - klog.Errorf("Failed to delete healthcheck for service %s - %v", l4netlb.NamespacedName.String(), err) - result.GCEResourceInError = annotations.HealthcheckResource - result.Error = err - return - } - // Ignore deletion error due to health check in use by another resource. - // This will be hit if this is a shared healthcheck. - klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName) - } else { - // Delete healthcheck firewall rule if healthcheck deletion is successful. - err = deleteFwFunc(hcFwName) - if err != nil { - klog.Errorf("Failed to delete firewall rule %s for service %s - %v", hcFwName, l4netlb.NamespacedName.String(), err) - result.GCEResourceInError = annotations.FirewallForHealthcheckResource - result.Error = err - } + result.GCEResourceInError = resourceInError + result.Error = err } } - for _, isShared := range []bool{true, false} { - deleteHcFunc(isShared) - } return result } +func (l4netlb *L4NetLB) deleteFirewall(name string) error { + err := firewalls.EnsureL4FirewallRuleDeleted(l4netlb.cloud, name) + if err != nil { + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + l4netlb.recorder.Eventf(l4netlb.Service, corev1.EventTypeNormal, "XPN", fwErr.Message) + return nil + } + return err + } + return nil +} + // GetFRName returns the name of the forwarding rule for the given L4 External LoadBalancer service. // This name should align with legacy forwarding rule name because we use forwarding rule to determine // which controller should process the service Ingress-GCE or k/k service controller. @@ -255,7 +225,7 @@ func (l4netlb *L4NetLB) GetFRName() string { return utils.LegacyForwardingRuleName(l4netlb.Service) } -func (l4netlb *L4NetLB) createFirewalls(name, hcLink, hcFwName string, hcPort int32, nodeNames []string, sharedHC bool) (string, *L4NetLBSyncResult) { +func (l4netlb *L4NetLB) createFirewalls(name string, nodeNames []string) (string, *L4NetLBSyncResult) { _, portRanges, _, protocol := utils.GetPortsAndProtocol(l4netlb.Service.Spec.Ports) result := &L4NetLBSyncResult{} sourceRanges, err := helpers.GetLoadBalancerSourceRanges(l4netlb.Service) @@ -272,7 +242,6 @@ func (l4netlb *L4NetLB) createFirewalls(name, hcLink, hcFwName string, hcPort in Name: name, IP: l4netlb.Service.Spec.LoadBalancerIP, NodeNames: nodeNames, - L4Type: utils.XLB, } result.Error = firewalls.EnsureL4LBFirewallForNodes(l4netlb.Service, &nodesFWRParams, l4netlb.cloud, l4netlb.recorder) if result.Error != nil { @@ -280,18 +249,5 @@ func (l4netlb *L4NetLB) createFirewalls(name, hcLink, hcFwName string, hcPort in result.Error = err return "", result } - // Add firewall rule for healthchecks to nodes - hcFWRParams := firewalls.FirewallParams{ - PortRanges: []string{strconv.Itoa(int(hcPort))}, - SourceRanges: gce.L4LoadBalancerSrcRanges(), - Protocol: string(corev1.ProtocolTCP), - Name: hcFwName, - NodeNames: nodeNames, - L4Type: utils.XLB, - } - result.Error = firewalls.EnsureL4LBFirewallForHc(l4netlb.Service, sharedHC, &hcFWRParams, l4netlb.cloud, l4netlb.sharedResourcesLock, l4netlb.recorder) - if result.Error != nil { - result.GCEResourceInError = annotations.FirewallForHealthcheckResource - } return string(protocol), result } diff --git a/pkg/loadbalancers/l4netlb_test.go b/pkg/loadbalancers/l4netlb_test.go index d5db33182a..4a7bcb0157 100644 --- a/pkg/loadbalancers/l4netlb_test.go +++ b/pkg/loadbalancers/l4netlb_test.go @@ -17,8 +17,8 @@ package loadbalancers import ( "fmt" + "k8s.io/ingress-gce/pkg/healthchecks" "strings" - "sync" "testing" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" @@ -52,7 +52,8 @@ func TestEnsureL4NetLoadBalancer(t *testing.T) { svc := test.NewL4NetLBRBSService(8080) namer := namer_util.NewL4Namer(kubeSystemUID, namer_util.NewNamer(vals.ClusterName, "cluster-fw")) - l4netlb := NewL4NetLB(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l4netlb := NewL4NetLB(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l4netlb.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) if _, err := test.CreateAndInsertNodes(l4netlb.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) @@ -102,7 +103,8 @@ func TestDeleteL4NetLoadBalancer(t *testing.T) { svc := test.NewL4NetLBRBSService(8080) namer := namer_util.NewL4Namer(kubeSystemUID, namer_util.NewNamer(vals.ClusterName, "cluster-fw")) - l4NetLB := NewL4NetLB(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l4NetLB := NewL4NetLB(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l4NetLB.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) if _, err := test.CreateAndInsertNodes(l4NetLB.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) @@ -139,15 +141,73 @@ func TestDeleteL4NetLoadBalancerWithSharedHC(t *testing.T) { _, hcFwName := l4NetLB.namer.L4HealthCheck(svc.Namespace, svc.Name, true) firewall, err := l4NetLB.cloud.GetFirewall(hcFwName) if err != nil || firewall == nil { - t.Fatalf("Firewall rule should not be deleted err: %v", err) + t.Errorf("Expected firewall exists err: %v, fwR: %v", err, firewall) } } +func TestHealthCheckFirewallDeletionWithILB(t *testing.T) { + t.Parallel() + nodeNames := []string{"test-node-1"} + vals := gce.DefaultTestClusterValues() + + fakeGCE := getFakeGCECloud(vals) + namer := namer_util.NewL4Namer(kubeSystemUID, namer_util.NewNamer(vals.ClusterName, "cluster-fw")) + + // Create ILB service + _, l4ilb, ilbResult := ensureService(fakeGCE, namer, nodeNames, vals.ZoneName, 8081, t) + if ilbResult != nil && ilbResult.Error != nil { + t.Fatalf("Error ensuring service err: %v", ilbResult.Error) + } + + // Create NetLB Service + netlbSvc := test.NewL4NetLBRBSService(8080) + + l4NetLB := NewL4NetLB(netlbSvc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + + // make sure both ilb and netlb use the same l4 healtcheck instance + l4NetLB.l4HealthChecks = l4ilb.l4HealthChecks + + // create netlb resources + result := l4NetLB.EnsureFrontend(nodeNames, netlbSvc) + if result.Error != nil { + t.Errorf("Failed to ensure loadBalancer, err %v", result.Error) + } + if len(result.Status.Ingress) == 0 { + t.Errorf("Got empty loadBalancer status using handler %v", l4NetLB) + } + assertNetLbResources(t, netlbSvc, l4NetLB, nodeNames) + + // Delete the NetLB loadbalancer. + if err := l4NetLB.EnsureLoadBalancerDeleted(netlbSvc); err.Error != nil { + t.Errorf("UnexpectedError %v", err.Error) + } + + // When ILB health check uses the same firewall rules we expect that hc firewall rule will not be deleted. + hcName, hcFwName := l4NetLB.namer.L4HealthCheck(l4NetLB.Service.Namespace, l4NetLB.Service.Name, true) + firewall, err := l4NetLB.cloud.GetFirewall(hcFwName) + if err != nil { + t.Errorf("Expected error: firewall exists, got %v", err) + } + if firewall == nil { + t.Error("Healthcheck Firewall should still exist, got nil") + } + + // The healthcheck itself should be deleted. + _, err = composite.GetHealthCheck(l4NetLB.cloud, meta.RegionalKey(hcName, l4NetLB.cloud.Region()), meta.VersionGA) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Errorf("Healthcheck %s should be deleted", hcName) + } + +} + func ensureLoadBalancer(port int, vals gce.TestClusterValues, fakeGCE *gce.Cloud, t *testing.T) (*v1.Service, *L4NetLB) { svc := test.NewL4NetLBRBSService(port) namer := namer_util.NewL4Namer(kubeSystemUID, namer_util.NewNamer(vals.ClusterName, "cluster-fw")) emptyNodes := []string{} - l4NetLB := NewL4NetLB(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + + l4NetLB := NewL4NetLB(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l4NetLB.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) + result := l4NetLB.EnsureFrontend(emptyNodes, svc) if result.Error != nil { t.Errorf("Failed to ensure loadBalancer, err %v", result.Error) @@ -169,7 +229,7 @@ func ensureNetLBResourceDeleted(t *testing.T, apiService *v1.Service, l4NetLb *L for _, fwName := range []string{resourceName, hcFwName} { _, err := l4NetLb.cloud.GetFirewall(fwName) if err == nil || !utils.IsNotFoundError(err) { - t.Fatalf("Firewall rule %q should be deleted", fwName) + t.Errorf("Firewall rule %q should be deleted", fwName) } } @@ -288,7 +348,8 @@ func TestMetricsForStandardNetworkTier(t *testing.T) { svc.ObjectMeta.Annotations[annotations.NetworkTierAnnotationKey] = string(cloud.NetworkTierStandard) namer := namer_util.NewL4Namer(kubeSystemUID, namer_util.NewNamer(vals.ClusterName, "cluster-fw")) - l4netlb := NewL4NetLB(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100), &sync.Mutex{}) + l4netlb := NewL4NetLB(svc, fakeGCE, meta.Regional, namer, record.NewFakeRecorder(100)) + l4netlb.l4HealthChecks = healthchecks.FakeL4(fakeGCE, &test.FakeRecorderSource{}) if _, err := test.CreateAndInsertNodes(l4netlb.cloud, nodeNames, vals.ZoneName); err != nil { t.Errorf("Unexpected error when adding nodes %v", err) diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 0c921cca2a..a01c6a4660 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -722,6 +722,13 @@ func (d *L4LBResourceDescription) Unmarshal(desc string) error { return json.Unmarshal([]byte(desc), d) } +func MakeL4LBFirewallDescription(svcName, ip string, version meta.Version, shared bool) (string, error) { + if shared { + return (&L4LBResourceDescription{APIVersion: version, ResourceDescription: fmt.Sprintf(L4LBSharedResourcesDesc, "")}).Marshal() + } + return (&L4LBResourceDescription{ServiceName: svcName, ServiceIP: ip, APIVersion: version}).Marshal() +} + func MakeL4LBServiceDescription(svcName, ip string, version meta.Version, shared bool, lbType L4LBType) (string, error) { if shared { return (&L4LBResourceDescription{APIVersion: version, ResourceDescription: fmt.Sprintf(L4LBSharedResourcesDesc, lbType.ToString())}).Marshal() From f8353dcd43eceee6b65586b0d3dbb8adef252995 Mon Sep 17 00:00:00 2001 From: Cezary Zawadka Date: Tue, 17 May 2022 16:32:41 +0200 Subject: [PATCH 2/2] Rewrite L4 healthchecks: Apply review comments EnsureL4HealthCheck: replace lengthy return value list in with named struct Improve firewall rule comparison Added debug logs Improved go fmt Renamed function names And many more small ones --- pkg/firewalls/firewalls_l4.go | 34 +++-- pkg/healthchecks/healthchecks_l4.go | 154 ++++++++++++++--------- pkg/healthchecks/healthchecks_l4_test.go | 8 +- pkg/healthchecks/interfaces.go | 17 +++ pkg/loadbalancers/interfaces.go | 11 -- pkg/loadbalancers/l4.go | 33 ++--- pkg/loadbalancers/l4_test.go | 8 +- pkg/loadbalancers/l4netlb.go | 19 +-- 8 files changed, 163 insertions(+), 121 deletions(-) diff --git a/pkg/firewalls/firewalls_l4.go b/pkg/firewalls/firewalls_l4.go index 238095a250..6d493d03ce 100644 --- a/pkg/firewalls/firewalls_l4.go +++ b/pkg/firewalls/firewalls_l4.go @@ -104,18 +104,32 @@ func EnsureL4FirewallRuleDeleted(cloud *gce.Cloud, fwName string) error { } func firewallRuleEqual(a, b *compute.Firewall, skipDescription bool) bool { - fwrEqual := len(a.Allowed) == 1 && - len(a.Allowed) == len(b.Allowed) && - a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol && - utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) && - utils.EqualStringSets(a.SourceRanges, b.SourceRanges) && - utils.EqualStringSets(a.TargetTags, b.TargetTags) + if len(a.Allowed) != len(b.Allowed) { + return false + } + for i := range a.Allowed { + if !allowRulesEqual(a.Allowed[i], b.Allowed[i]) { + return false + } + } - // Don't compare the "description" field for shared firewall rules - if skipDescription { - return fwrEqual + if !utils.EqualStringSets(a.SourceRanges, b.SourceRanges) { + return false + } + + if !utils.EqualStringSets(a.TargetTags, b.TargetTags) { + return false + } + + if !skipDescription && a.Description != b.Description { + return false } - return fwrEqual && a.Description == b.Description + return true +} + +func allowRulesEqual(a *compute.FirewallAllowed, b *compute.FirewallAllowed) bool { + return a.IPProtocol == b.IPProtocol && + utils.EqualStringSets(a.Ports, b.Ports) } func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error { diff --git a/pkg/healthchecks/healthchecks_l4.go b/pkg/healthchecks/healthchecks_l4.go index fa5d6c2b82..b1ba09e3c9 100644 --- a/pkg/healthchecks/healthchecks_l4.go +++ b/pkg/healthchecks/healthchecks_l4.go @@ -47,31 +47,35 @@ const ( ) var ( - // instance is a sinngleton instance, created by InitializeL4 + // instanceLock to prevent duplicate initialization. + instanceLock = &sync.Mutex{} + // instance is a singleton instance, created by InitializeL4 instance *l4HealthChecks - // mutex for preventing multiple initialization - initLock = &sync.Mutex{} ) type l4HealthChecks struct { - mutex sync.Mutex - cloud *gce.Cloud - recorderFactory events.RecorderProducer + // sharedResourceLock serializes operations on the healthcheck and firewall + // resources shared across multiple Services. + sharedResourcesLock sync.Mutex + cloud *gce.Cloud + recorderFactory events.RecorderProducer } -// InitializeL4 creates singleton instance, must be run before GetL4() func +// InitializeL4 creates singleton instance, must be run before L4() func func InitializeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) { - if instance == nil { - initLock.Lock() - defer initLock.Unlock() - - if instance == nil { - instance = &l4HealthChecks{ - cloud: cloud, - recorderFactory: recorderFactory, - } - } + instanceLock.Lock() + defer instanceLock.Unlock() + + if instance != nil { + klog.Error("Multiple L4 Healthchecks initialization attempts") + return + } + + instance = &l4HealthChecks{ + cloud: cloud, + recorderFactory: recorderFactory, } + klog.V(3).Infof("Initialized L4 Healthchecks") } // FakeL4 creates instance of l4HealthChecks> USe for test only. @@ -83,68 +87,91 @@ func FakeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) *l4Health return instance } -// GetL4 returns singleton instance, must be run after InitializeL4 -func GetL4() *l4HealthChecks { +// L4 returns singleton instance, must be run after InitializeL4 +func L4() *l4HealthChecks { return instance } -// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, and the firewall rule required -// for the healthcheck. If healthcheck is shared (external traffic policy 'cluster') then firewall rules will be shared -// regardless of scope param. -// If the healthcheck already exists, it is updated as needed. -func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) { +// EnsureL4HealthCheck and firewall rules exist for the L4 +// LoadBalancer Service. +// +// The healthcheck and firewall will be shared between different K8s +// Services for ExternalTrafficPolicy = Cluster, as the same +// configuration is used across all Services of this type. +// +// Firewall rules are always created at in the Global scope (vs +// Regional). This means that one Firewall rule is created for +// Services of different scope (Global vs Regional). + +func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) *EnsureL4HealthCheckResult { + namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC) hcPath, hcPort := helpers.GetServiceHealthCheckPathPort(svc) + klog.V(3).Infof("Ensuring L4 healthcheck: %s and firewall rule %s from service %s, shared: %v.", hcName, hcFwName, namespacedName.String(), sharedHC) + if sharedHC { hcPath, hcPort = gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort() - // lock out entire EnsureL4HealthCheck process - l4hc.mutex.Lock() - defer l4hc.mutex.Unlock() + // We need to acquire a controller-wide mutex to ensure that in the case of a healthcheck shared between loadbalancers that the sync of the GCE resources is not performed in parallel. + l4hc.sharedResourcesLock.Lock() + defer l4hc.sharedResourcesLock.Unlock() } + klog.V(3).Infof("L4 Healthcheck %s, path: %q, port %d", hcName, hcPath, hcPort) - namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} _, hcLink, err := l4hc.ensureL4HealthCheckInternal(hcName, namespacedName, sharedHC, hcPath, hcPort, scope, l4Type) if err != nil { - return "", "", "", annotations.HealthcheckResource, err + return &EnsureL4HealthCheckResult{ + GceResourceInError: annotations.HealthcheckResource, + Err: err, + } } + + klog.V(3).Infof("Healthcheck created, ensuring firewall rule %s", hcFwName) err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames) if err != nil { - return "", "", "", annotations.FirewallForHealthcheckResource, err + return &EnsureL4HealthCheckResult{ + GceResourceInError: annotations.HealthcheckResource, + Err: err, + } + } + return &EnsureL4HealthCheckResult{ + HCName: hcName, + HCLink: hcLink, + HCFirewallRuleName: hcFwName, } - - return hcLink, hcFwName, hcName, "", err } // DeleteHealthCheck deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete. func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) { - if sharedHC { - // lock out entire DeleteHealthCheck process - l4hc.mutex.Lock() - defer l4hc.mutex.Unlock() - } hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC) namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace} + klog.V(3).Infof("Trying to delete L4 healthcheck: %s and firewall rule %s from service %s, shared: %v", hcName, hcFwName, namespacedName.String(), sharedHC) + if sharedHC { + // We need to acquire a controller-wide mutex to ensure that in the case of a healthcheck shared between loadbalancers that the sync of the GCE resources is not performed in parallel. + l4hc.sharedResourcesLock.Lock() + defer l4hc.sharedResourcesLock.Unlock() + } + err := utils.IgnoreHTTPNotFound(l4hc.deleteHealthCheck(hcName, scope)) if err != nil { + // Ignore deletion error due to health check in use by another resource. if !utils.IsInUsedByError(err) { klog.Errorf("Failed to delete healthcheck for service %s - %v", namespacedName.String(), err) return annotations.HealthcheckResource, err } - // Ignore deletion error due to health check in use by another resource. - // This will be hit if this is a shared healthcheck. - klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName) + klog.V(2).Infof("Failed to delete healthcheck %s: shared health check in use.", hcName) return "", nil } // Health check deleted, now delete the firewall rule return l4hc.deleteHealthCheckFirewall(svc, hcName, hcFwName, sharedHC, l4Type) } -func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) { +func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(hcName string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) { selfLink := "" - key, err := composite.CreateKey(l4hc.cloud, name, scope) + key, err := composite.CreateKey(l4hc.cloud, hcName, scope) if err != nil { - return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", name, svcName.String()) + return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", hcName, svcName.String()) } hc, err := composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA) if err != nil { @@ -156,10 +183,11 @@ func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName typ if scope == meta.Regional { region = l4hc.cloud.Region() } - expectedHC := NewL4HealthCheck(name, svcName, shared, path, port, l4Type, scope, region) + expectedHC := newL4HealthCheck(hcName, svcName, shared, path, port, l4Type, scope, region) + if hc == nil { // Create the healthcheck - klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared) + klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v. Expected healthcheck: %v", hcName, svcName, shared, expectedHC) err = composite.CreateHealthCheck(l4hc.cloud, key, expectedHC) if err != nil { return nil, selfLink, err @@ -170,10 +198,11 @@ func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName typ selfLink = hc.SelfLink if !needToUpdateHealthChecks(hc, expectedHC) { // nothing to do + klog.V(3).Infof("Healthcheck %v already exists", hcName) return hc, selfLink, nil } mergeHealthChecks(hc, expectedHC) - klog.V(2).Infof("Updating healthcheck %s for service %s", name, svcName) + klog.V(2).Infof("Updating healthcheck %s for service %s, updated healthcheck: %v", hcName, svcName, expectedHC) err = composite.UpdateHealthCheck(l4hc.cloud, key, expectedHC) if err != nil { return nil, selfLink, err @@ -181,9 +210,10 @@ func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName typ return expectedHC, selfLink, err } -// ensureFirewall rule for L4 service. -// The firewall rules are the same for ILB and NetLB that use external traffic policy 'local' (sharedHC == true) -// despite healthchecks have different scopes (global vs regional) +// ensureFirewall rule for `svc`. +// +// L4 ILB and L4 NetLB Services with ExternalTrafficPolicy=Cluster use the same firewall +// rule at global scope. func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string, hcPort int32, sharedHC bool, nodeNames []string) error { // Add firewall rule for healthchecks to nodes hcFWRParams := firewalls.FirewallParams{ @@ -213,13 +243,14 @@ func (l4hc *l4HealthChecks) deleteHealthCheckFirewall(svc *corev1.Service, hcNam return annotations.HealthcheckResource, err } if !safeToDelete { - klog.V(2).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName) + klog.V(3).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName) return "", nil } + klog.V(3).Infof("Deleting healthcheck firewall rule named: %s", hcFwName) // Delete healthcheck firewall rule if no healthcheck uses the firewall rule. err = l4hc.deleteFirewall(hcFwName, svc) if err != nil { - klog.Errorf("Failed to delete firewall rule %s for internal loadbalancer service %s, err %v", hcFwName, namespacedName.String(), err) + klog.Errorf("Failed to delete firewall rule %s for loadbalancer service %s, err %v", hcFwName, namespacedName.String(), err) return annotations.FirewallForHealthcheckResource, err } return "", nil @@ -244,18 +275,19 @@ func (l4hc *l4HealthChecks) healthCheckFirewallSafeToDelete(hcName string, share func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) error { err := firewalls.EnsureL4FirewallRuleDeleted(l4hc.cloud, name) - if err != nil { - if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { - recorder := l4hc.recorderFactory.Recorder(svc.Namespace) - recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message) - return nil - } - return err + if err == nil { + return nil + } + // Suppress Firewall XPN error, as this is no retryable and requires action by security admin + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + recorder := l4hc.recorderFactory.Recorder(svc.Namespace) + recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message) + return nil } - return nil + return err } -func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck { +func newL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck { httpSettings := composite.HTTPHealthCheck{ Port: int64(port), RequestPath: path, @@ -282,7 +314,7 @@ func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, pa // mergeHealthChecks reconciles HealthCheck config to be no smaller than // the default values. newHC is assumed to have defaults, -// since it is created by the NewL4HealthCheck call. +// since it is created by the newL4HealthCheck call. // E.g. old health check interval is 2s, new has the default of 8. // The HC interval will be reconciled to 8 seconds. // If the existing health check values are larger than the default interval, diff --git a/pkg/healthchecks/healthchecks_l4_test.go b/pkg/healthchecks/healthchecks_l4_test.go index edd6957417..f0fccb53e1 100644 --- a/pkg/healthchecks/healthchecks_l4_test.go +++ b/pkg/healthchecks/healthchecks_l4_test.go @@ -50,7 +50,7 @@ func TestMergeHealthChecks(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { // healthcheck intervals and thresholds are common for Global and Regional healthchecks. Hence testing only Global case. - wantHC := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "") + wantHC := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "") hc := &composite.HealthCheck{ CheckIntervalSec: tc.checkIntervalSec, TimeoutSec: tc.timeoutSec, @@ -97,8 +97,8 @@ func TestCompareHealthChecks(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { // healthcheck intervals and thresholds are common for Global and Regional healthchecks. Hence testing only Global case. - hc := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "") - wantHC := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "") + hc := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "") + wantHC := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "") if tc.modifier != nil { tc.modifier(hc) } @@ -120,7 +120,7 @@ func TestCreateHealthCheck(t *testing.T) { {meta.Global, ""}, {meta.Regional, "us-central1"}, } { - hc := NewL4HealthCheck("hc", namespaceName, false, "/", 12345, utils.ILB, v.scope, v.region) + hc := newL4HealthCheck("hc", namespaceName, false, "/", 12345, utils.ILB, v.scope, v.region) if hc.Region != v.region { t.Errorf("HealthCheck Region mismatch! %v != %v", hc.Region, v.region) } diff --git a/pkg/healthchecks/interfaces.go b/pkg/healthchecks/interfaces.go index 18f00ee3be..0b9956be13 100644 --- a/pkg/healthchecks/interfaces.go +++ b/pkg/healthchecks/interfaces.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/ingress-gce/pkg/translator" "k8s.io/ingress-gce/pkg/utils" + "k8s.io/ingress-gce/pkg/utils/namer" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" ) @@ -57,3 +58,19 @@ type HealthChecker interface { Delete(name string, scope meta.KeyType) error Get(name string, version meta.Version, scope meta.KeyType) (*translator.HealthCheck, error) } + +// L4HealthChecks defines methods for creating and deleting health checks (and their firewall rules) for l4 services +type L4HealthChecks interface { + // EnsureL4HealthCheck creates health check (and firewall rule) for l4 service + EnsureL4HealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) *EnsureL4HealthCheckResult + // DeleteHealthCheck deletes health check (and firewall rule) for l4 service + DeleteHealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) +} + +type EnsureL4HealthCheckResult struct { + HCName string + HCLink string + HCFirewallRuleName string + GceResourceInError string + Err error +} diff --git a/pkg/loadbalancers/interfaces.go b/pkg/loadbalancers/interfaces.go index c6106cba3e..c0312fb4bc 100644 --- a/pkg/loadbalancers/interfaces.go +++ b/pkg/loadbalancers/interfaces.go @@ -18,10 +18,7 @@ package loadbalancers import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/networking/v1" - "k8s.io/ingress-gce/pkg/utils" - "k8s.io/ingress-gce/pkg/utils/namer" ) // LoadBalancerPool is an interface to manage the cloud resources associated @@ -40,11 +37,3 @@ type LoadBalancerPool interface { // HasUrlMap returns true if an URL map exists in GCE for given ingress. HasUrlMap(ing *v1.Ingress) (bool, error) } - -// L4HealthChecks defines methods for creating and deleting health checks (and their firewall rules) for l4 services -type L4HealthChecks interface { - // EnsureL4HealthCheck creates health check (and firewall rule) for l4 service - EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) - // DeleteHealthCheck deletes health check (and firewall rule) for l4 service - DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) -} diff --git a/pkg/loadbalancers/l4.go b/pkg/loadbalancers/l4.go index a0aba53651..8b346bcef4 100644 --- a/pkg/loadbalancers/l4.go +++ b/pkg/loadbalancers/l4.go @@ -51,7 +51,7 @@ type L4 struct { Service *corev1.Service ServicePort utils.ServicePort NamespacedName types.NamespacedName - l4HealthChecks L4HealthChecks + l4HealthChecks healthchecks.L4HealthChecks } // L4ILBSyncResult contains information about the outcome of an L4 ILB sync. It stores the list of resource name annotations, @@ -74,7 +74,7 @@ func NewL4Handler(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, namer: namer, recorder: recorder, Service: service, - l4HealthChecks: healthchecks.GetL4(), + l4HealthChecks: healthchecks.L4(), } l.NamespacedName = types.NamespacedName{Name: service.Name, Namespace: service.Namespace} l.backendPool = backends.NewPool(l.cloud, l.namer) @@ -122,20 +122,9 @@ func (l *L4) EnsureInternalLoadBalancerDeleted(svc *corev1.Service) *L4ILBSyncRe result.Error = err result.GCEResourceInError = annotations.AddressResource } - // delete fw rules - deleteFwFunc := func(name string) error { - err := firewalls.EnsureL4FirewallRuleDeleted(l.cloud, name) - if err != nil { - if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { - l.recorder.Eventf(l.Service, corev1.EventTypeNormal, "XPN", fwErr.Message) - return nil - } - return err - } - return nil - } + // delete firewall rule allowing load balancer source ranges - err = deleteFwFunc(name) + err = l.deleteFirewall(name) if err != nil { klog.Errorf("Failed to delete firewall rule %s for internal loadbalancer service %s, err %v", name, l.NamespacedName.String(), err) result.GCEResourceInError = annotations.FirewallRuleResource @@ -215,14 +204,14 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) // create healthcheck sharedHC := !helpers.RequestsOnlyLocalTraffic(l.Service) - hcLink, hcFwName, hcName, resourceInErr, err := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, nodeNames) + hcResult := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, nodeNames) - if err != nil { - result.GCEResourceInError = resourceInErr - result.Error = err + if hcResult.Err != nil { + result.GCEResourceInError = hcResult.GceResourceInError + result.Error = hcResult.Err return result } - result.Annotations[annotations.HealthcheckKey] = hcName + result.Annotations[annotations.HealthcheckKey] = hcResult.HCName _, portRanges, _, protocol := utils.GetPortsAndProtocol(l.Service.Spec.Ports) @@ -247,7 +236,7 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) return result } result.Annotations[annotations.FirewallRuleKey] = name - result.Annotations[annotations.FirewallRuleForHealthcheckKey] = hcFwName + result.Annotations[annotations.FirewallRuleForHealthcheckKey] = hcResult.HCFirewallRuleName // Check if protocol has changed for this service. In this case, forwarding rule should be deleted before // the backend service can be updated. @@ -265,7 +254,7 @@ func (l *L4) EnsureInternalLoadBalancer(nodeNames []string, svc *corev1.Service) } // ensure backend service - bs, err := l.backendPool.EnsureL4BackendService(name, hcLink, string(protocol), string(l.Service.Spec.SessionAffinity), + bs, err := l.backendPool.EnsureL4BackendService(name, hcResult.HCLink, string(protocol), string(l.Service.Spec.SessionAffinity), string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA) if err != nil { result.GCEResourceInError = annotations.BackendServiceResource diff --git a/pkg/loadbalancers/l4_test.go b/pkg/loadbalancers/l4_test.go index 6c02d0ac9f..efb19b6c1e 100644 --- a/pkg/loadbalancers/l4_test.go +++ b/pkg/loadbalancers/l4_test.go @@ -220,12 +220,12 @@ func TestEnsureInternalLoadBalancerWithExistingResources(t *testing.T) { // Create the expected resources necessary for an Internal Load Balancer sharedHC := !servicehelper.RequestsOnlyLocalTraffic(svc) - hcLink, _, _, _, err := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, []string{}) + hcResult := l.l4HealthChecks.EnsureL4HealthCheck(l.Service, l.namer, sharedHC, meta.Global, utils.ILB, []string{}) - if err != nil { - t.Errorf("Failed to create healthcheck, err %v", err) + if hcResult.Err != nil { + t.Errorf("Failed to create healthcheck, err %v", hcResult.Err) } - _, err = l.backendPool.EnsureL4BackendService(lbName, hcLink, "TCP", string(l.Service.Spec.SessionAffinity), + _, err := l.backendPool.EnsureL4BackendService(lbName, hcResult.HCLink, "TCP", string(l.Service.Spec.SessionAffinity), string(cloud.SchemeInternal), l.NamespacedName, meta.VersionGA) if err != nil { t.Errorf("Failed to create backendservice, err %v", err) diff --git a/pkg/loadbalancers/l4netlb.go b/pkg/loadbalancers/l4netlb.go index 8a0fd753ab..ad4b68a431 100644 --- a/pkg/loadbalancers/l4netlb.go +++ b/pkg/loadbalancers/l4netlb.go @@ -49,7 +49,7 @@ type L4NetLB struct { Service *corev1.Service ServicePort utils.ServicePort NamespacedName types.NamespacedName - l4HealthChecks L4HealthChecks + l4HealthChecks healthchecks.L4HealthChecks } // L4NetLBSyncResult contains information about the outcome of an L4 NetLB sync. It stores the list of resource name annotations, @@ -73,7 +73,7 @@ func NewL4NetLB(service *corev1.Service, cloud *gce.Cloud, scope meta.KeyType, n Service: service, NamespacedName: types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, backendPool: backends.NewPool(cloud, namer), - l4HealthChecks: healthchecks.GetL4(), + l4HealthChecks: healthchecks.L4(), } portId := utils.ServicePortID{Service: l4netlb.NamespacedName} l4netlb.ServicePort = utils.ServicePort{ @@ -108,14 +108,14 @@ func (l4netlb *L4NetLB) EnsureFrontend(nodeNames []string, svc *corev1.Service) l4netlb.Service = svc sharedHC := !helpers.RequestsOnlyLocalTraffic(svc) - hcLink, hcFwName, hcName, resourceInErr, err := l4netlb.l4HealthChecks.EnsureL4HealthCheck(l4netlb.Service, l4netlb.namer, sharedHC, l4netlb.scope, utils.XLB, nodeNames) + hcResult := l4netlb.l4HealthChecks.EnsureL4HealthCheck(l4netlb.Service, l4netlb.namer, sharedHC, l4netlb.scope, utils.XLB, nodeNames) - if err != nil { - result.GCEResourceInError = resourceInErr - result.Error = fmt.Errorf("Failed to ensure health check %s - %w", hcName, err) + if hcResult.Err != nil { + result.GCEResourceInError = hcResult.GceResourceInError + result.Error = fmt.Errorf("Failed to ensure health check %s - %w", hcResult.HCName, hcResult.Err) return result } - result.Annotations[annotations.HealthcheckKey] = hcName + result.Annotations[annotations.HealthcheckKey] = hcResult.HCName name := l4netlb.ServicePort.BackendName() protocol, res := l4netlb.createFirewalls(name, nodeNames) @@ -123,9 +123,9 @@ func (l4netlb *L4NetLB) EnsureFrontend(nodeNames []string, svc *corev1.Service) return res } result.Annotations[annotations.FirewallRuleKey] = name - result.Annotations[annotations.FirewallRuleForHealthcheckKey] = hcFwName + result.Annotations[annotations.FirewallRuleForHealthcheckKey] = hcResult.HCFirewallRuleName - bs, err := l4netlb.backendPool.EnsureL4BackendService(name, hcLink, protocol, string(l4netlb.Service.Spec.SessionAffinity), string(cloud.SchemeExternal), l4netlb.NamespacedName, meta.VersionGA) + bs, err := l4netlb.backendPool.EnsureL4BackendService(name, hcResult.HCLink, protocol, string(l4netlb.Service.Spec.SessionAffinity), string(cloud.SchemeExternal), l4netlb.NamespacedName, meta.VersionGA) if err != nil { result.GCEResourceInError = annotations.BackendServiceResource result.Error = fmt.Errorf("Failed to ensure backend service %s - %w", name, err) @@ -201,6 +201,7 @@ func (l4netlb *L4NetLB) EnsureLoadBalancerDeleted(svc *corev1.Service) *L4NetLBS if err != nil { result.GCEResourceInError = resourceInError result.Error = err + // continue with deletion of the non-shared Healthcheck regardless of the error, both healthchecks may need to be deleted, } } return result