Skip to content

Commit

Permalink
Rewrite L4 healthchecks: Apply review comments
Browse files Browse the repository at this point in the history
EnsureL4HealthCheck: replace lengthy return value list in with named struct
Improve firewall rule comparison
Added debug logs
Improved go fmt
Renamed function names
And many more small ones
  • Loading branch information
cezarygerard committed May 27, 2022
1 parent 3bebea9 commit 217b108
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 119 deletions.
34 changes: 24 additions & 10 deletions pkg/firewalls/firewalls_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,32 @@ func EnsureL4FirewallRuleDeleted(cloud *gce.Cloud, fwName string) error {
}

func firewallRuleEqual(a, b *compute.Firewall, skipDescription bool) bool {
fwrEqual := len(a.Allowed) == 1 &&
len(a.Allowed) == len(b.Allowed) &&
a.Allowed[0].IPProtocol == b.Allowed[0].IPProtocol &&
utils.EqualStringSets(a.Allowed[0].Ports, b.Allowed[0].Ports) &&
utils.EqualStringSets(a.SourceRanges, b.SourceRanges) &&
utils.EqualStringSets(a.TargetTags, b.TargetTags)
if len(a.Allowed) != len(b.Allowed) {
return false
}
for i := range a.Allowed {
if !allowRulesEqual(a.Allowed[i], b.Allowed[i]) {
return false
}
}

// Don't compare the "description" field for shared firewall rules
if skipDescription {
return fwrEqual
if !utils.EqualStringSets(a.SourceRanges, b.SourceRanges) {
return false
}

if !utils.EqualStringSets(a.TargetTags, b.TargetTags) {
return false
}

if !skipDescription && a.Description != b.Description {
return false
}
return fwrEqual && a.Description == b.Description
return true
}

func allowRulesEqual(a *compute.FirewallAllowed, b *compute.FirewallAllowed) bool {
return a.IPProtocol == b.IPProtocol &&
utils.EqualStringSets(a.Ports, b.Ports)
}

func ensureFirewall(svc *v1.Service, shared bool, params *FirewallParams, cloud *gce.Cloud, recorder record.EventRecorder) error {
Expand Down
150 changes: 91 additions & 59 deletions pkg/healthchecks/healthchecks_l4.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,35 @@ const (
)

var (
// instance is a sinngleton instance, created by InitializeL4
// instanceLock to prevent duplicate initialization.
instanceLock = &sync.Mutex{}
// instance is a singleton instance, created by InitializeL4
instance *l4HealthChecks
// mutex for preventing multiple initialization
initLock = &sync.Mutex{}
)

type l4HealthChecks struct {
mutex sync.Mutex
cloud *gce.Cloud
recorderFactory events.RecorderProducer
// sharedResourceLock serializes operations on the healthcheck and firewall
// resources shared across multiple Services.
sharedResourcesLock sync.Mutex
cloud *gce.Cloud
recorderFactory events.RecorderProducer
}

// InitializeL4 creates singleton instance, must be run before GetL4() func
// InitializeL4 creates singleton instance, must be run before L4() func
func InitializeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) {
if instance == nil {
initLock.Lock()
defer initLock.Unlock()

if instance == nil {
instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
}
instanceLock.Lock()
defer instanceLock.Unlock()

if instance != nil {
klog.Error("Multiple L4 Healthchecks initialization attempts")
return
}

instance = &l4HealthChecks{
cloud: cloud,
recorderFactory: recorderFactory,
}
klog.V(3).Infof("Initialized L4 Healthchecks")
}

// FakeL4 creates instance of l4HealthChecks> USe for test only.
Expand All @@ -83,68 +87,91 @@ func FakeL4(cloud *gce.Cloud, recorderFactory events.RecorderProducer) *l4Health
return instance
}

// GetL4 returns singleton instance, must be run after InitializeL4
func GetL4() *l4HealthChecks {
// L4 returns singleton instance, must be run after InitializeL4
func L4() *l4HealthChecks {
return instance
}

// EnsureL4HealthCheck creates a new HTTP health check for an L4 LoadBalancer service, and the firewall rule required
// for the healthcheck. If healthcheck is shared (external traffic policy 'cluster') then firewall rules will be shared
// regardless of scope param.
// If the healthcheck already exists, it is updated as needed.
func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error) {
// EnsureL4HealthCheck and firewall rules exist for the L4
// LoadBalancer Service.
//
// The healthcheck and firewall will be shared between different K8s
// Services for ExternalTrafficPolicy = Cluster, as the same
// configuration is used across all Services of this type.
//
// Firewall rules are always created at in the Global scope (vs
// Regional). This means that one Firewall rule is created for
// Services of different scope (Global vs Regional).

func (l4hc *l4HealthChecks) EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) *EnsureL4HealthCheckResult {
hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
hcPath, hcPort := helpers.GetServiceHealthCheckPathPort(svc)
klog.V(3).Infof("Ensuring L4 healthcheck: %s and firewall rule %s from service %s/%s, shared: %v.", hcName, hcFwName, svc.Name, svc.Namespace, sharedHC)

if sharedHC {
hcPath, hcPort = gce.GetNodesHealthCheckPath(), gce.GetNodesHealthCheckPort()
// lock out entire EnsureL4HealthCheck process
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
// We need to acquire a controller-wide mutex to ensure that in the case of a healthcheck shared between loadbalancers that the sync of the GCE resources is not performed in parallel.
l4hc.sharedResourcesLock.Lock()
defer l4hc.sharedResourcesLock.Unlock()
}
klog.V(3).Infof("L4 Healthcheck %s, path: %q, port %d", hcName, hcPath, hcPort)

namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}
_, hcLink, err := l4hc.ensureL4HealthCheckInternal(hcName, namespacedName, sharedHC, hcPath, hcPort, scope, l4Type)
if err != nil {
return "", "", "", annotations.HealthcheckResource, err
return &EnsureL4HealthCheckResult{
GceResourceInError: annotations.HealthcheckResource,
Err: err,
}
}

klog.V(3).Infof("Healthcheck created, ensuring firewall rule %s", hcFwName)
err = l4hc.ensureFirewall(svc, hcFwName, hcPort, sharedHC, nodeNames)
if err != nil {
return "", "", "", annotations.FirewallForHealthcheckResource, err
return &EnsureL4HealthCheckResult{
GceResourceInError: annotations.HealthcheckResource,
Err: err,
}
}
return &EnsureL4HealthCheckResult{
HCName: hcName,
HCLink: hcLink,
HCFirewallRuleName: hcFwName,
}

return hcLink, hcFwName, hcName, "", err
}

// DeleteHealthCheck deletes health check (and firewall rule) for l4 service. Checks if shared resources are safe to delete.
func (l4hc *l4HealthChecks) DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error) {
if sharedHC {
// lock out entire DeleteHealthCheck process
l4hc.mutex.Lock()
defer l4hc.mutex.Unlock()
}

hcName, hcFwName := namer.L4HealthCheck(svc.Namespace, svc.Name, sharedHC)
namespacedName := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}

klog.V(3).Infof("Trying to delete L4 healthcheck: %s and firewall rule %s from service %s/%s, shared: %v", hcName, hcFwName, svc.Name, svc.Namespace, sharedHC)
if sharedHC {
// We need to acquire a controller-wide mutex to ensure that in the case of a healthcheck shared between loadbalancers that the sync of the GCE resources is not performed in parallel.
l4hc.sharedResourcesLock.Lock()
defer l4hc.sharedResourcesLock.Unlock()
}

err := utils.IgnoreHTTPNotFound(l4hc.deleteHealthCheck(hcName, scope))
if err != nil {
// Ignore deletion error due to health check in use by another resource.
if !utils.IsInUsedByError(err) {
klog.Errorf("Failed to delete healthcheck for service %s - %v", namespacedName.String(), err)
return annotations.HealthcheckResource, err
}
// Ignore deletion error due to health check in use by another resource.
// This will be hit if this is a shared healthcheck.
klog.V(2).Infof("Failed to delete healthcheck %s: health check in use.", hcName)
klog.V(2).Infof("Failed to delete healthcheck %s: shared health check in use.", hcName)
return "", nil
}
// Health check deleted, now delete the firewall rule
return l4hc.deleteHealthCheckFirewall(svc, hcName, hcFwName, sharedHC, l4Type)
}

func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(hcName string, svcName types.NamespacedName, shared bool, path string, port int32, scope meta.KeyType, l4Type utils.L4LBType) (*composite.HealthCheck, string, error) {
selfLink := ""
key, err := composite.CreateKey(l4hc.cloud, name, scope)
key, err := composite.CreateKey(l4hc.cloud, hcName, scope)
if err != nil {
return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", name, svcName.String())
return nil, selfLink, fmt.Errorf("Failed to create key for healthcheck with name %s for service %s", hcName, svcName.String())
}
hc, err := composite.GetHealthCheck(l4hc.cloud, key, meta.VersionGA)
if err != nil {
Expand All @@ -156,10 +183,11 @@ func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName typ
if scope == meta.Regional {
region = l4hc.cloud.Region()
}
expectedHC := NewL4HealthCheck(name, svcName, shared, path, port, l4Type, scope, region)
expectedHC := newL4HealthCheck(hcName, svcName, shared, path, port, l4Type, scope, region)

if hc == nil {
// Create the healthcheck
klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v", name, svcName, shared)
klog.V(2).Infof("Creating healthcheck %s for service %s, shared = %v. Expected healthcheck: %v", hcName, svcName, shared, expectedHC)
err = composite.CreateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
Expand All @@ -170,20 +198,22 @@ func (l4hc *l4HealthChecks) ensureL4HealthCheckInternal(name string, svcName typ
selfLink = hc.SelfLink
if !needToUpdateHealthChecks(hc, expectedHC) {
// nothing to do
klog.V(3).Infof("Healthcheck %v already exists", hcName)
return hc, selfLink, nil
}
mergeHealthChecks(hc, expectedHC)
klog.V(2).Infof("Updating healthcheck %s for service %s", name, svcName)
klog.V(2).Infof("Updating healthcheck %s for service %s, updated healthcheck: %v", hcName, svcName, expectedHC)
err = composite.UpdateHealthCheck(l4hc.cloud, key, expectedHC)
if err != nil {
return nil, selfLink, err
}
return expectedHC, selfLink, err
}

// ensureFirewall rule for L4 service.
// The firewall rules are the same for ILB and NetLB that use external traffic policy 'local' (sharedHC == true)
// despite healthchecks have different scopes (global vs regional)
// ensureFirewall rule for `svc`.
//
// L4 ILB and L4 NetLB Services with ExternalTrafficPolicy=Cluster use the same firewall
// rule at global scope.
func (l4hc *l4HealthChecks) ensureFirewall(svc *corev1.Service, hcFwName string, hcPort int32, sharedHC bool, nodeNames []string) error {
// Add firewall rule for healthchecks to nodes
hcFWRParams := firewalls.FirewallParams{
Expand Down Expand Up @@ -213,9 +243,10 @@ func (l4hc *l4HealthChecks) deleteHealthCheckFirewall(svc *corev1.Service, hcNam
return annotations.HealthcheckResource, err
}
if !safeToDelete {
klog.V(2).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName)
klog.V(3).Infof("Failed to delete health check firewall rule %s: health check in use.", hcName)
return "", nil
}
klog.V(3).Infof("Deleting healthcheck firewall rule named: %s", hcFwName)
// Delete healthcheck firewall rule if no healthcheck uses the firewall rule.
err = l4hc.deleteFirewall(hcFwName, svc)
if err != nil {
Expand Down Expand Up @@ -244,18 +275,19 @@ func (l4hc *l4HealthChecks) healthCheckFirewallSafeToDelete(hcName string, share

func (l4hc *l4HealthChecks) deleteFirewall(name string, svc *corev1.Service) error {
err := firewalls.EnsureL4FirewallRuleDeleted(l4hc.cloud, name)
if err != nil {
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
recorder := l4hc.recorderFactory.Recorder(svc.Namespace)
recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
return nil
}
return err
if err == nil {
return nil
}
// Suppress Firewall XPN error, as this is no retryable and requires action by security admin
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
recorder := l4hc.recorderFactory.Recorder(svc.Namespace)
recorder.Eventf(svc, corev1.EventTypeNormal, "XPN", fwErr.Message)
return nil
}
return nil
return err
}

func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck {
func newL4HealthCheck(name string, svcName types.NamespacedName, shared bool, path string, port int32, l4Type utils.L4LBType, scope meta.KeyType, region string) *composite.HealthCheck {
httpSettings := composite.HTTPHealthCheck{
Port: int64(port),
RequestPath: path,
Expand All @@ -282,7 +314,7 @@ func NewL4HealthCheck(name string, svcName types.NamespacedName, shared bool, pa

// mergeHealthChecks reconciles HealthCheck config to be no smaller than
// the default values. newHC is assumed to have defaults,
// since it is created by the NewL4HealthCheck call.
// since it is created by the newL4HealthCheck call.
// E.g. old health check interval is 2s, new has the default of 8.
// The HC interval will be reconciled to 8 seconds.
// If the existing health check values are larger than the default interval,
Expand Down
8 changes: 4 additions & 4 deletions pkg/healthchecks/healthchecks_l4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestMergeHealthChecks(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
// healthcheck intervals and thresholds are common for Global and Regional healthchecks. Hence testing only Global case.
wantHC := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
wantHC := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
hc := &composite.HealthCheck{
CheckIntervalSec: tc.checkIntervalSec,
TimeoutSec: tc.timeoutSec,
Expand Down Expand Up @@ -97,8 +97,8 @@ func TestCompareHealthChecks(t *testing.T) {
} {
t.Run(tc.desc, func(t *testing.T) {
// healthcheck intervals and thresholds are common for Global and Regional healthchecks. Hence testing only Global case.
hc := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
wantHC := NewL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
hc := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
wantHC := newL4HealthCheck("hc", types.NamespacedName{Name: "svc", Namespace: "default"}, false, "/", 12345, utils.ILB, meta.Global, "")
if tc.modifier != nil {
tc.modifier(hc)
}
Expand All @@ -120,7 +120,7 @@ func TestCreateHealthCheck(t *testing.T) {
{meta.Global, ""},
{meta.Regional, "us-central1"},
} {
hc := NewL4HealthCheck("hc", namespaceName, false, "/", 12345, utils.ILB, v.scope, v.region)
hc := newL4HealthCheck("hc", namespaceName, false, "/", 12345, utils.ILB, v.scope, v.region)
if hc.Region != v.region {
t.Errorf("HealthCheck Region mismatch! %v != %v", hc.Region, v.region)
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/healthchecks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/ingress-gce/pkg/translator"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
)
Expand Down Expand Up @@ -57,3 +58,19 @@ type HealthChecker interface {
Delete(name string, scope meta.KeyType) error
Get(name string, version meta.Version, scope meta.KeyType) (*translator.HealthCheck, error)
}

// L4HealthChecks defines methods for creating and deleting health checks (and their firewall rules) for l4 services
type L4HealthChecks interface {
// EnsureL4HealthCheck creates health check (and firewall rule) for l4 service
EnsureL4HealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) *EnsureL4HealthCheckResult
// DeleteHealthCheck deletes health check (and firewall rule) for l4 service
DeleteHealthCheck(svc *v1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error)
}

type EnsureL4HealthCheckResult struct {
HCName string
HCLink string
HCFirewallRuleName string
GceResourceInError string
Err error
}
11 changes: 0 additions & 11 deletions pkg/loadbalancers/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ package loadbalancers

import (
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/networking/v1"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/namer"
)

// LoadBalancerPool is an interface to manage the cloud resources associated
Expand All @@ -40,11 +37,3 @@ type LoadBalancerPool interface {
// HasUrlMap returns true if an URL map exists in GCE for given ingress.
HasUrlMap(ing *v1.Ingress) (bool, error)
}

// L4HealthChecks defines methods for creating and deleting health checks (and their firewall rules) for l4 services
type L4HealthChecks interface {
// EnsureL4HealthCheck creates health check (and firewall rule) for l4 service
EnsureL4HealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType, nodeNames []string) (string, string, string, string, error)
// DeleteHealthCheck deletes health check (and firewall rule) for l4 service
DeleteHealthCheck(svc *corev1.Service, namer namer.L4ResourcesNamer, sharedHC bool, scope meta.KeyType, l4Type utils.L4LBType) (string, error)
}
Loading

0 comments on commit 217b108

Please sign in to comment.