Skip to content

Commit

Permalink
Rewrite L4 healthchecks creation and deletion
Browse files Browse the repository at this point in the history
- create a common singleton-like struct fot l4 health checks
- new struct holds mutex for common resources (healthchecks and their firewall rules used for
- [bugfix] delete shared healtcheck firewall rules safely -  cross-check between ILB and NLB healthchecks is firewall rules are in use

Logging
New log line idicating firewall rule not deleted due to cross-check: "Failed to delete health check firewall rule %s: health check in use."

Testing
- healthcheck management is mostly covered (by existing tests), they required little update.
- added test cases for sahred firewall rule deletion(lack of), named TestHealthCheckFirewallDeletionWithILB and TestHealthCheckFirewallDeletionWithNetLB
- run test manual tests
  • Loading branch information
cezarygerard committed May 9, 2022
1 parent 2781c36 commit a10c879
Show file tree
Hide file tree
Showing 13 changed files with 480 additions and 229 deletions.
3 changes: 3 additions & 0 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
flag "github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/ingress-gce/pkg/frontendconfig"
"k8s.io/ingress-gce/pkg/healthchecks"
"k8s.io/ingress-gce/pkg/ingparams"
"k8s.io/ingress-gce/pkg/l4lb"
"k8s.io/ingress-gce/pkg/psc"
Expand Down Expand Up @@ -274,6 +275,8 @@ func runControllers(ctx *ingctx.ControllerContext) {

fwc := firewalls.NewFirewallController(ctx, flags.F.NodePortRanges.Values())

healthchecks.InitializeL4(ctx.Cloud, ctx)

if flags.F.RunL4Controller {
l4Controller := l4lb.NewILBController(ctx, stopCh)
go l4Controller.Run()
Expand Down
28 changes: 15 additions & 13 deletions pkg/firewalls/firewalls_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ limitations under the License.
package firewalls

import (
"strings"
"sync"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/compute/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
"strings"
)

// FirewallParams holds all data needed to create firewall for L4 LB
Expand All @@ -50,7 +48,7 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam
if err != nil {
return err
}
fwDesc, err := utils.MakeL4LBServiceDescription(nsName, params.IP, meta.VersionGA, sharedRule, params.L4Type)
fwDesc, err := utils.MakeL4LBFirewallDescription(nsName, params.IP, meta.VersionGA, sharedRule)
if err != nil {
klog.Warningf("EnsureL4FirewallRule(%v): failed to generate description for L4 %s rule, err: %v", params.Name, params.L4Type.ToString(), err)
}
Expand Down Expand Up @@ -78,7 +76,9 @@ func EnsureL4FirewallRule(cloud *gce.Cloud, nsName string, params *FirewallParam
}
return err
}
if firewallRuleEqual(expectedFw, existingFw) {

// Don't compare the "description" field for shared firewall rules
if firewallRuleEqual(expectedFw, existingFw, sharedRule) {
return nil
}
klog.V(2).Infof("EnsureL4FirewallRule(%v): updating L4 %s firewall", params.Name, params.L4Type.ToString())
Expand All @@ -103,13 +103,19 @@ func EnsureL4FirewallRuleDeleted(cloud *gce.Cloud, fwName string) error {
return nil
}

func firewallRuleEqual(a, b *compute.Firewall) bool {
return a.Description == b.Description &&
len(a.Allowed) == 1 && len(a.Allowed) == len(b.Allowed) &&
func firewallRuleEqual(a, b *compute.Firewall, skipDescription bool) bool {
fwrEqual := len(a.Allowed) == 1 &&
len(a.Allowed) == len(b.Allowed) &&
a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol &&
utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) &&
utils.EqualStringSets(a.SourceRanges, b.SourceRanges) &&
utils.EqualStringSets(a.TargetTags, b.TargetTags)

// Don't compare the "description" field for shared firewall rules
if skipDescription {
return fwrEqual
}
return fwrEqual && a.Description == b.Description
}

func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error {
Expand All @@ -126,12 +132,8 @@ func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud
}

// EnsureL4LBFirewallForHc creates or updates firewall rule for shared or non-shared health check to nodes
func EnsureL4LBFirewallForHc(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, sharedResourcesLock *sync.Mutex, recorder record.EventRecorder) error {
func EnsureL4LBFirewallForHc(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error {
params.SourceRanges = gce.L4LoadBalancerSrcRanges()
if shared {
sharedResourcesLock.Lock()
defer sharedResourcesLock.Unlock()
}
return ensureFirewall(svc, shared, params, cloud, recorder)
}

Expand Down
191 changes: 171 additions & 20 deletions pkg/healthchecks/healthchecks_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,25 @@ package healthchecks

import (
"fmt"
"strconv"
"sync"

cloudprovider "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cloud-provider/service/helpers"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/events"
"k8s.io/ingress-gce/pkg/firewalls"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
)

const (

// L4 Load Balancer parameters
gceHcCheckIntervalSeconds = int64(8)
gceHcTimeoutSeconds = int64(1)
Expand All @@ -42,44 +46,125 @@ const (
gceHcUnhealthyThreshold = int64(3)
)

// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, based on the parameters provided.
var (
// instance is a sinngleton instance, created by InitializeL4
instance *l4HealthChecks
// mutex for preventing multiple initialization
initLock = &sync.Mutex{}
)

type l4HealthChecks struct {
mutex sync.Mutex
cloud *gce.Cloud
recorderFactory events.RecorderProducer
}

// InitializeL4 creates singleton instance, must be run before GetL4() func
func InitializeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) {
if instance == nil {
initLock.Lock()
defer initLock.Unlock()

if instance == nil {
instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
}
}
}

// FakeL4 creates instance of l4HealthChecks> USe for test only.
func FakeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) *l4HealthChecks {
instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
return instance
}

// GetL4 returns singleton instance, must be run after InitializeL4
func GetL4() *l4HealthChecks {
return instance
}

// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, and the firewall rule required
// for the healthcheck. If healthcheck is shared (external traffic policy 'cluster') then firewall rules will be shared
// regardless of scope param.
// If the healthcheck already exists, it is updated as needed.
func EnsureL4HealthCheck(cloud *gce.Cloud, svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, string, int32, string, error) {
func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) {
hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
hcPath, hcPort := gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort()
if !sharedHC {
hcPath, hcPort = helpers.GetServiceHealthCheckPathPort(svc)
hcPath, hcPort := helpers.GetServiceHealthCheckPathPort(svc)
if sharedHC {
hcPath, hcPort = gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort()
// lock out entire EnsureL4HealthCheck process
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
}

namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
_, hcLink, err := l4hc.ensureL4HealthCheckInternal(hcName, namespacedName, sharedHC, hcPath, hcPort, scope, l4Type)
if err != nil {
return "", "", "", annotations.HealthcheckResource, err
}
err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames)
if err != nil {
return "", "", "", annotations.FirewallForHealthcheckResource, err
}
nn := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
_, hcLink, err := ensureL4HealthCheckInternal(cloud, hcName, nn, sharedHC, hcPath, hcPort, scope, l4Type)
return hcLink, hcFwName, hcPort, hcName, err

return hcLink, hcFwName, hcName, "", err
}

func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
// DeleteHealthCheck deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete.
func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) {
if sharedHC {
// lock out entire DeleteHealthCheck process
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
}

hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
err := utils.IgnoreHTTPNotFound(l4hc.deleteHealthCheck(hcName, scope))
if err != nil {
if !utils.IsInUsedByError(err) {
klog.Errorf("Failed to delete healthcheck for service %s - %v", namespacedName.String(), err)
return annotations.HealthcheckResource, err
}
// Ignore deletion error due to health check in use by another resource.
// This will be hit if this is a shared healthcheck.
klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName)
return "", nil
}
// Health check deleted, now delete the firewall rule
return l4hc.deleteHealthCheckFirewall(svc, hcName, hcFwName, sharedHC, l4Type)
}

func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
selfLink := ""
key, err := composite.CreateKey(cloud, name, scope)
key, err := composite.CreateKey(l4hc.cloud, name, scope)
if err != nil {
return nil, selfLink, fmt.Errorf("Failed to create composite key for healthcheck %s - %w", name, err)
return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", name, svcName.String())
}
hc, err := composite.GetHealthCheck(cloud, key, meta.VersionGA)
hc, err := composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA)
if err != nil {
if !utils.IsNotFoundError(err) {
return nil, selfLink, err
}
}
var region string
if scope == meta.Regional {
region = cloud.Region()
region = l4hc.cloud.Region()
}
expectedHC := NewL4HealthCheck(name, svcName, shared, path, port, l4Type, scope, region)
if hc == nil {
// Create the healthcheck
klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared)
err = composite.CreateHealthCheck(cloud, key, expectedHC)
err = composite.CreateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
}
selfLink = cloudprovider.SelfLink(meta.VersionGA, cloud.ProjectID(), "healthChecks", key)
selfLink = cloudprovider.SelfLink(meta.VersionGA, l4hc.cloud.ProjectID(), "healthChecks", key)
return expectedHC, selfLink, nil
}
selfLink = hc.SelfLink
Expand All @@ -89,19 +174,85 @@ func ensureL4HealthCheckInternal(cloud *gce.Cloud, name string, svcName types.Na
}
mergeHealthChecks(hc, expectedHC)
klog.V(2).Infof("Updating healthcheck %s for service %s", name, svcName)
err = composite.UpdateHealthCheck(cloud, key, expectedHC)
err = composite.UpdateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
}
return expectedHC, selfLink, err
}

func DeleteHealthCheck(cloud *gce.Cloud, name string, scope meta.KeyType) error {
key, err := composite.CreateKey(cloud, name, scope)
// ensureFirewall rule for L4 service.
// The firewall rules are the same for ILB and NetLB that use external traffic policy 'local' (sharedHC == true)
// despite healthchecks have different scopes (global vs regional)
func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string, hcPort int32, sharedHC bool, nodeNames []string) error {
// Add firewall rule for healthchecks to nodes
hcFWRParams := firewalls.FirewallParams{
PortRanges: []string{strconv.Itoa(int(hcPort))},
SourceRanges: gce.L4LoadBalancerSrcRanges(),
Protocol: string(corev1.ProtocolTCP),
Name: hcFwName,
NodeNames: nodeNames,
}
return firewalls.EnsureL4LBFirewallForHc(svc, sharedHC, &hcFWRParams, l4hc.cloud, l4hc.recorderFactory.Recorder(svc.Namespace))
}

func (l4hc *l4HealthChecks) deleteHealthCheck(name string, scope meta.KeyType) error {
key, err := composite.CreateKey(l4hc.cloud, name, scope)
if err != nil {
return fmt.Errorf("Failed to create composite key for healthcheck %s - %w", name, err)
}
return composite.DeleteHealthCheck(cloud, key, meta.VersionGA)
return composite.DeleteHealthCheck(l4hc.cloud, key, meta.VersionGA)
}

func (l4hc *l4HealthChecks) deleteHealthCheckFirewall(svc *corev1.Service, hcName, hcFwName string, sharedHC bool, l4Type utils.L4LBType) (string, error) {
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}

safeToDelete, err := l4hc.healthCheckFirewallSafeToDelete(hcName, sharedHC, l4Type)
if err != nil {
klog.Errorf("Failed to delete health check firewall rule %s for service %s - %v", hcFwName, namespacedName.String(), err)
return annotations.HealthcheckResource, err
}
if !safeToDelete {
klog.V(2).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName)
return "", nil
}
// Delete healthcheck firewall rule if no healthcheck uses the firewall rule.
err = l4hc.deleteFirewall(hcFwName, svc)
if err != nil {
klog.Errorf("Failed to delete firewall rule %s for internal loadbalancer service %s, err %v", hcFwName, namespacedName.String(), err)
return annotations.FirewallForHealthcheckResource, err
}
return "", nil
}

func (l4hc *l4HealthChecks) healthCheckFirewallSafeToDelete(hcName string, sharedHC bool, l4Type utils.L4LBType) (bool, error) {
if !sharedHC {
return true, nil
}
var scopeToCheck meta.KeyType
scopeToCheck = meta.Regional
if l4Type == utils.XLB {
scopeToCheck = meta.Global
}
key, err := composite.CreateKey(l4hc.cloud, hcName, scopeToCheck)
if err != nil {
return false, fmt.Errorf("Failed to create composite key for healthcheck %s - %w", hcName, err)
}
_, err = composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA)
return utils.IsNotFoundError(err), nil
}

func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) error {
err := firewalls.EnsureL4FirewallRuleDeleted(l4hc.cloud, name)
if err != nil {
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
recorder := l4hc.recorderFactory.Recorder(svc.Namespace)
recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
return nil
}
return err
}
return nil
}

func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck {
Expand Down
4 changes: 2 additions & 2 deletions pkg/l4lb/l4controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (l4c *L4Controller) shouldProcessService(service *v1.Service, l4 *loadbalan
// processServiceCreateOrUpdate ensures load balancer resources for the given service, as needed.
// Returns an error if processing the service update failed.
func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Service) *loadbalancers.L4ILBSyncResult {
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace), &l4c.sharedResourcesLock)
l4 := loadbalancers.NewL4Handler(service, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(service.Namespace))
if !l4c.shouldProcessService(service, l4) {
return nil
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (l4c *L4Controller) processServiceCreateOrUpdate(key string, service *v1.Se
}

func (l4c *L4Controller) processServiceDeletion(key string, svc *v1.Service) *loadbalancers.L4ILBSyncResult {
l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(svc.Namespace), &l4c.sharedResourcesLock)
l4 := loadbalancers.NewL4Handler(svc, l4c.ctx.Cloud, meta.Regional, l4c.namer, l4c.ctx.Recorder(svc.Namespace))
l4c.ctx.Recorder(svc.Namespace).Eventf(svc, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer for %s", key)
result := l4.EnsureInternalLoadBalancerDeleted(svc)
if result.Error != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/l4lb/l4controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package l4lb
import (
context2 "context"
"fmt"
"k8s.io/ingress-gce/pkg/healthchecks"
"testing"
"time"

Expand Down Expand Up @@ -70,6 +71,7 @@ func newServiceController(t *testing.T, fakeGCE *gce.Cloud) *L4Controller {
for _, n := range nodes {
ctx.NodeInformer.GetIndexer().Add(n)
}
healthchecks.FakeL4(ctx.Cloud, ctx)
return NewILBController(ctx, stopCh)
}

Expand Down
Loading

0 comments on commit a10c879

Please sign in to comment.