diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 49c1a5933a..6e1eb288c4 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -184,6 +184,7 @@ func NewController( podInformer.GetIndexer(), cloud, manager, + enableDualStackNEG, logger, ) } else { diff --git a/pkg/neg/readiness/poller.go b/pkg/neg/readiness/poller.go index f3c1a0491c..36f71aff81 100644 --- a/pkg/neg/readiness/poller.go +++ b/pkg/neg/readiness/poller.go @@ -86,20 +86,24 @@ type poller struct { patcher podStatusPatcher negCloud negtypes.NetworkEndpointGroupCloud + // Enables support for Dual-Stack NEGs within the NEG Controller. + enableDualStackNEG bool + clock clock.Clock logger klog.Logger } -func NewPoller(podLister cache.Indexer, lookup NegLookup, patcher podStatusPatcher, negCloud negtypes.NetworkEndpointGroupCloud, logger klog.Logger) *poller { +func NewPoller(podLister cache.Indexer, lookup NegLookup, patcher podStatusPatcher, negCloud negtypes.NetworkEndpointGroupCloud, enableDualStackNEG bool, logger klog.Logger) *poller { return &poller{ - pollMap: make(map[negMeta]*pollTarget), - podLister: podLister, - lookup: lookup, - patcher: patcher, - negCloud: negCloud, - clock: clock.RealClock{}, - logger: logger.WithName("Poller"), + pollMap: make(map[negMeta]*pollTarget), + podLister: podLister, + lookup: lookup, + patcher: patcher, + negCloud: negCloud, + enableDualStackNEG: enableDualStackNEG, + clock: clock.RealClock{}, + logger: logger.WithName("Poller"), } } @@ -168,15 +172,24 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) { return true, err } - return p.processHealthStatus(key, res) + retry, err = p.processHealthStatus(key, res) + if retry { + <-p.clock.After(hcRetryDelay) + } + return } -// processHealthStatus updates Pod readiness gates based on the input health status response. +// processHealthStatus processes the healthStatuses of the NEG endpoints and +// updates the [readiness gates] of the pods. +// +// We update the pod (using the patcher) in ANY of the following cases: +// 1. If the endpoint is considered healthy by ANY GCE Backend Service. +// 2. If the endpoint belongs to a NEG which is not associated with any GCE +// Backend Service. +// +// True is returned if retry is needed. // -// We update the pod (using the patcher) when: -// 1. if the endpoint considered healthy with one of the backend service health check -// 2. if the NEG is not associated with any health checks -// It returns true if retry is needed. +// [readiness gates]: https://cloud.google.com/kubernetes-engine/docs/concepts/container-native-load-balancing#pod_readiness func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.NetworkEndpointWithHealthStatus) (bool, error) { p.lock.Lock() defer p.lock.Unlock() @@ -213,6 +226,9 @@ func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.Ne Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10), Node: healthStatus.NetworkEndpoint.Instance, } + if p.enableDualStackNEG { + ne.IPv6 = healthStatus.NetworkEndpoint.Ipv6Address + } podName, ok := p.getPod(key, ne) if !ok { @@ -220,7 +236,7 @@ func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.Ne continue } - bsKey := getHealthyBackendService(healthStatus, p.logger) + bsKey := getHealthyBackendService(healthStatus, p.enableDualStackNEG, p.logger) if bsKey == nil { unhealthyPods = append(unhealthyPods, podName) continue @@ -255,16 +271,14 @@ func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.Ne } } - if retry { - <-p.clock.After(hcRetryDelay) - } - // If we didn't patch all of the endpoints, we must keep polling for health status return retry, utilerrors.NewAggregate(errList) } -// getHealthyBackendService returns one of the first backend service key where the endpoint is considered healthy. -func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus, logger klog.Logger) *meta.Key { +// getHealthyBackendService returns one of the first backend service key where +// the endpoint is considered healthy. An endpoint is considered healthy if +// either the IPv4 OR IPv6 endpoint's healthstatus reports HEALTHY. +func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus, enableDualStackNEG bool, logger klog.Logger) *meta.Key { for _, hs := range healthStatus.Healths { if hs == nil { logger.Error(nil, "Health status is nil in health status of network endpoint", "healthStatus", healthStatus) @@ -275,7 +289,7 @@ func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthS continue } - if hs.HealthState == healthyState { + if hs.HealthState == healthyState || (enableDualStackNEG && hs.Ipv6HealthState == healthyState) { id, err := cloud.ParseResourceURL(hs.BackendService.BackendService) if err != nil { logger.Error(err, "Failed to parse backend service reference from a Network Endpoint health status", "healthStatus", healthStatus) diff --git a/pkg/neg/readiness/poller_test.go b/pkg/neg/readiness/poller_test.go index b88bdfc0fa..fa756518d4 100644 --- a/pkg/neg/readiness/poller_test.go +++ b/pkg/neg/readiness/poller_test.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "net" - "reflect" "strconv" "testing" "time" @@ -28,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/filter" "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "github.com/google/go-cmp/cmp" "google.golang.org/api/compute/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/cloud-provider-gcp/providers/gce" @@ -54,15 +54,15 @@ func (p *testPatcher) syncPod(pod string, negKey, bsKey *meta.Key) error { func (p *testPatcher) Eval(t *testing.T, pod string, negKey, bsKey *meta.Key) { if p.lastPod != pod { - t.Errorf("expect pod = %q, but got %q", pod, p.lastPod) + t.Errorf("got pod=%q; want=%q", p.lastPod, pod) } - if !reflect.DeepEqual(p.lastNegKey, negKey) { - t.Errorf("expect neg key = %v, but got %v", negKey, p.lastNegKey) + if diff := cmp.Diff(negKey, p.lastNegKey); diff != "" { + t.Errorf("diff found in expected NEG; (-want +got):\n%s", diff) } - if !reflect.DeepEqual(p.lastBsKey, bsKey) { - t.Errorf("expect backend service key = %v, but got %v", bsKey, p.lastBsKey) + if diff := cmp.Diff(bsKey, p.lastBsKey); diff != "" { + t.Errorf("diff found in expected BackendService; (-want +got):\n%s", diff) } } @@ -550,7 +550,7 @@ func TestPoll(t *testing.T) { pollAndValidate(step, true, true, 6, true, false) } -func TestProcessHealthStatus(t *testing.T) { +func TestProcessHealthStatus_shouldNotCrashWhenMissingKeyFromPollMap(t *testing.T) { t.Parallel() poller := newFakePoller() @@ -562,7 +562,8 @@ func TestProcessHealthStatus(t *testing.T) { } res := []*composite.NetworkEndpointWithHealthStatus{} - // processHealthStatus should not crash when pollMap does not have corresponding key. + // processHealthStatus should not crash when pollMap does not have + // corresponding key. retry, err := poller.processHealthStatus(key, res) if retry != false { t.Errorf("expect retry == false, but got %v", retry) @@ -571,3 +572,145 @@ func TestProcessHealthStatus(t *testing.T) { t.Errorf("expect err == nil, but got %v", err) } } + +func TestProcessHealthStatus_dualStackNEGs(t *testing.T) { + bsName := "bsName1" + backendServiceURL := fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/foo/global/backendServices/%v", bsName) + namespace := "ns1" + podName := "podName1" + + networkEndpoint := func(ipv4Address, ipv6Address string) negtypes.NetworkEndpoint { + return negtypes.NetworkEndpoint{IP: ipv4Address, IPv6: ipv6Address, Port: "0"} + } + + testCases := []struct { + desc string + healthStatus *composite.NetworkEndpointWithHealthStatus + endpointPodMap negtypes.EndpointPodMap + shouldUpdateReadinessGate bool + enableDualStackNEG bool + }{ + { + desc: "IPv6 endpoint should be ignored when enableDualStackNEG is false", + healthStatus: &composite.NetworkEndpointWithHealthStatus{ + NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"}, + Healths: []*composite.HealthStatusForNetworkEndpoint{{ + BackendService: &composite.BackendServiceReference{BackendService: backendServiceURL}, + HealthState: healthyState, + Ipv6HealthState: healthyState, + }}, + }, + endpointPodMap: negtypes.EndpointPodMap{ + // This endpoint isn't an exact match of the endpoint in health status + // but should still get updated since IPv6 endpoint should get ignored. + networkEndpoint("10.0.0.1", ""): {Namespace: namespace, Name: podName}, + }, + shouldUpdateReadinessGate: true, + enableDualStackNEG: false, + }, + { + desc: "only IPv6 endpoint healthy, but enableDualStackNEG is false so readiness gate should not update", + healthStatus: &composite.NetworkEndpointWithHealthStatus{ + NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1"}, + Healths: []*composite.HealthStatusForNetworkEndpoint{{ + Ipv6HealthState: healthyState, // Only IPv6 healthy. + }}, + }, + endpointPodMap: negtypes.EndpointPodMap{ + networkEndpoint("10.0.0.1", ""): {Namespace: namespace, Name: podName}, + }, + shouldUpdateReadinessGate: false, + enableDualStackNEG: false, + }, + { + desc: "no readiness gate updated since no pod found matching endpoint", + healthStatus: &composite.NetworkEndpointWithHealthStatus{ + NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"}, + Healths: []*composite.HealthStatusForNetworkEndpoint{{ + HealthState: healthyState, + Ipv6HealthState: healthyState, + }}, + }, + endpointPodMap: negtypes.EndpointPodMap{ + // This endpoint isn't an exact match of the endpoint in health status + // and hence there should be no update. + networkEndpoint("10.0.0.1", ""): {Namespace: namespace, Name: podName}, + }, + shouldUpdateReadinessGate: false, + enableDualStackNEG: true, + }, + { + desc: "both IPv4 and IPv6 healthy, readiness gate should get updated", + healthStatus: &composite.NetworkEndpointWithHealthStatus{ + NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"}, + Healths: []*composite.HealthStatusForNetworkEndpoint{{ + // Both IPv4 and IPv6 healthy. + HealthState: healthyState, + Ipv6HealthState: healthyState, + }}, + }, + endpointPodMap: negtypes.EndpointPodMap{ + networkEndpoint("10.0.0.1", "a::b"): {Namespace: namespace, Name: podName}, + }, + shouldUpdateReadinessGate: true, + enableDualStackNEG: true, + }, + { + desc: "only IPv4 healthy, readiness gate should get updated", + healthStatus: &composite.NetworkEndpointWithHealthStatus{ + NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"}, + Healths: []*composite.HealthStatusForNetworkEndpoint{{ + HealthState: healthyState, // Only IPv4 healthy. + }}, + }, + endpointPodMap: negtypes.EndpointPodMap{ + networkEndpoint("10.0.0.1", "a::b"): {Namespace: namespace, Name: podName}, + }, + shouldUpdateReadinessGate: true, + enableDualStackNEG: true, + }, + { + desc: "only IPv6 healthy, readiness gate should get updated", + healthStatus: &composite.NetworkEndpointWithHealthStatus{ + NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"}, + Healths: []*composite.HealthStatusForNetworkEndpoint{{ + Ipv6HealthState: healthyState, // Only IPv6 healthy. + }}, + }, + endpointPodMap: negtypes.EndpointPodMap{ + networkEndpoint("10.0.0.1", "a::b"): {Namespace: namespace, Name: podName}, + }, + shouldUpdateReadinessGate: true, + enableDualStackNEG: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + neg := negMeta{SyncerKey: negtypes.NegSyncerKey{}, Name: "negName", Zone: "zone1"} + + // Set common fields shared by all input values in testCases. + tc.healthStatus.Healths[0].BackendService = &composite.BackendServiceReference{BackendService: backendServiceURL} + + poller := newFakePoller() + poller.pollMap[neg] = &pollTarget{ + endpointMap: tc.endpointPodMap, + polling: true, + } + poller.enableDualStackNEG = tc.enableDualStackNEG + + poller.processHealthStatus(neg, []*composite.NetworkEndpointWithHealthStatus{tc.healthStatus}) + + patcher := poller.patcher.(*testPatcher) + if !tc.shouldUpdateReadinessGate && patcher.count > 0 { + t.Errorf("Readiness gates updated for %v; want no readiness gate updated", patcher.lastPod) + } + if tc.shouldUpdateReadinessGate && patcher.count == 0 { + t.Errorf("No readiness gate updated; want readiness gates for %v to get updated", keyFunc(namespace, podName)) + } + if tc.shouldUpdateReadinessGate && patcher.count > 0 { + patcher.Eval(t, keyFunc(namespace, podName), meta.ZonalKey(neg.Name, neg.Zone), meta.GlobalKey(bsName)) + } + }) + } +} diff --git a/pkg/neg/readiness/reflector.go b/pkg/neg/readiness/reflector.go index 7ae48c9d32..7f13c8c9ca 100644 --- a/pkg/neg/readiness/reflector.go +++ b/pkg/neg/readiness/reflector.go @@ -76,7 +76,7 @@ type readinessReflector struct { logger klog.Logger } -func NewReadinessReflector(kubeClient kubernetes.Interface, podLister cache.Indexer, negCloud negtypes.NetworkEndpointGroupCloud, lookup NegLookup, logger klog.Logger) Reflector { +func NewReadinessReflector(kubeClient kubernetes.Interface, podLister cache.Indexer, negCloud negtypes.NetworkEndpointGroupCloud, lookup NegLookup, enableDualStackNEG bool, logger klog.Logger) Reflector { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ @@ -94,7 +94,7 @@ func NewReadinessReflector(kubeClient kubernetes.Interface, podLister cache.Inde queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), logger: logger, } - poller := NewPoller(podLister, lookup, reflector, negCloud, logger) + poller := NewPoller(podLister, lookup, reflector, negCloud, enableDualStackNEG, logger) reflector.poller = poller return reflector } diff --git a/pkg/neg/readiness/reflector_test.go b/pkg/neg/readiness/reflector_test.go index 3218f36e3a..9f5f82a996 100644 --- a/pkg/neg/readiness/reflector_test.go +++ b/pkg/neg/readiness/reflector_test.go @@ -48,7 +48,7 @@ func (f *fakeLookUp) ReadinessGateEnabled(syncerKey negtypes.NegSyncerKey) bool } func newTestReadinessReflector(testContext *negtypes.TestContext) *readinessReflector { - reflector := NewReadinessReflector(testContext.KubeClient, testContext.PodInformer.GetIndexer(), negtypes.NewAdapter(testContext.Cloud), &fakeLookUp{}, klog.TODO()) + reflector := NewReadinessReflector(testContext.KubeClient, testContext.PodInformer.GetIndexer(), negtypes.NewAdapter(testContext.Cloud), &fakeLookUp{}, false, klog.TODO()) ret := reflector.(*readinessReflector) return ret }