Skip to content

Commit

Permalink
Merge pull request #2093 from gauravkghildiyal/ipv6-in-readiness
Browse files Browse the repository at this point in the history
Consume IPv6 Health status for NEG pod readiness
  • Loading branch information
k8s-ci-robot authored May 5, 2023
2 parents fafd33b + 9973afc commit 7cad8a5
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 33 deletions.
1 change: 1 addition & 0 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func NewController(
podInformer.GetIndexer(),
cloud,
manager,
enableDualStackNEG,
logger,
)
} else {
Expand Down
58 changes: 36 additions & 22 deletions pkg/neg/readiness/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,24 @@ type poller struct {
patcher podStatusPatcher
negCloud negtypes.NetworkEndpointGroupCloud

// Enables support for Dual-Stack NEGs within the NEG Controller.
enableDualStackNEG bool

clock clock.Clock

logger klog.Logger
}

func NewPoller(podLister cache.Indexer, lookup NegLookup, patcher podStatusPatcher, negCloud negtypes.NetworkEndpointGroupCloud, logger klog.Logger) *poller {
func NewPoller(podLister cache.Indexer, lookup NegLookup, patcher podStatusPatcher, negCloud negtypes.NetworkEndpointGroupCloud, enableDualStackNEG bool, logger klog.Logger) *poller {
return &poller{
pollMap: make(map[negMeta]*pollTarget),
podLister: podLister,
lookup: lookup,
patcher: patcher,
negCloud: negCloud,
clock: clock.RealClock{},
logger: logger.WithName("Poller"),
pollMap: make(map[negMeta]*pollTarget),
podLister: podLister,
lookup: lookup,
patcher: patcher,
negCloud: negCloud,
enableDualStackNEG: enableDualStackNEG,
clock: clock.RealClock{},
logger: logger.WithName("Poller"),
}
}

Expand Down Expand Up @@ -168,15 +172,24 @@ func (p *poller) Poll(key negMeta) (retry bool, err error) {
return true, err
}

return p.processHealthStatus(key, res)
retry, err = p.processHealthStatus(key, res)
if retry {
<-p.clock.After(hcRetryDelay)
}
return
}

// processHealthStatus updates Pod readiness gates based on the input health status response.
// processHealthStatus processes the healthStatuses of the NEG endpoints and
// updates the [readiness gates] of the pods.
//
// We update the pod (using the patcher) in ANY of the following cases:
// 1. If the endpoint is considered healthy by ANY GCE Backend Service.
// 2. If the endpoint belongs to a NEG which is not associated with any GCE
// Backend Service.
//
// True is returned if retry is needed.
//
// We update the pod (using the patcher) when:
// 1. if the endpoint considered healthy with one of the backend service health check
// 2. if the NEG is not associated with any health checks
// It returns true if retry is needed.
// [readiness gates]: https://cloud.google.com/kubernetes-engine/docs/concepts/container-native-load-balancing#pod_readiness
func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.NetworkEndpointWithHealthStatus) (bool, error) {
p.lock.Lock()
defer p.lock.Unlock()
Expand Down Expand Up @@ -213,14 +226,17 @@ func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.Ne
Port: strconv.FormatInt(healthStatus.NetworkEndpoint.Port, 10),
Node: healthStatus.NetworkEndpoint.Instance,
}
if p.enableDualStackNEG {
ne.IPv6 = healthStatus.NetworkEndpoint.Ipv6Address
}

podName, ok := p.getPod(key, ne)
if !ok {
// The pod is not in interest. Skip
continue
}

bsKey := getHealthyBackendService(healthStatus, p.logger)
bsKey := getHealthyBackendService(healthStatus, p.enableDualStackNEG, p.logger)
if bsKey == nil {
unhealthyPods = append(unhealthyPods, podName)
continue
Expand Down Expand Up @@ -255,16 +271,14 @@ func (p *poller) processHealthStatus(key negMeta, healthStatuses []*composite.Ne
}
}

if retry {
<-p.clock.After(hcRetryDelay)
}

// If we didn't patch all of the endpoints, we must keep polling for health status
return retry, utilerrors.NewAggregate(errList)
}

// getHealthyBackendService returns one of the first backend service key where the endpoint is considered healthy.
func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus, logger klog.Logger) *meta.Key {
// getHealthyBackendService returns one of the first backend service key where
// the endpoint is considered healthy. An endpoint is considered healthy if
// either the IPv4 OR IPv6 endpoint's healthstatus reports HEALTHY.
func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthStatus, enableDualStackNEG bool, logger klog.Logger) *meta.Key {
for _, hs := range healthStatus.Healths {
if hs == nil {
logger.Error(nil, "Health status is nil in health status of network endpoint", "healthStatus", healthStatus)
Expand All @@ -275,7 +289,7 @@ func getHealthyBackendService(healthStatus *composite.NetworkEndpointWithHealthS
continue
}

if hs.HealthState == healthyState {
if hs.HealthState == healthyState || (enableDualStackNEG && hs.Ipv6HealthState == healthyState) {
id, err := cloud.ParseResourceURL(hs.BackendService.BackendService)
if err != nil {
logger.Error(err, "Failed to parse backend service reference from a Network Endpoint health status", "healthStatus", healthStatus)
Expand Down
159 changes: 151 additions & 8 deletions pkg/neg/readiness/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"context"
"fmt"
"net"
"reflect"
"strconv"
"testing"
"time"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/filter"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/google/go-cmp/cmp"
"google.golang.org/api/compute/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/cloud-provider-gcp/providers/gce"
Expand All @@ -54,15 +54,15 @@ func (p *testPatcher) syncPod(pod string, negKey, bsKey *meta.Key) error {

func (p *testPatcher) Eval(t *testing.T, pod string, negKey, bsKey *meta.Key) {
if p.lastPod != pod {
t.Errorf("expect pod = %q, but got %q", pod, p.lastPod)
t.Errorf("got pod=%q; want=%q", p.lastPod, pod)
}

if !reflect.DeepEqual(p.lastNegKey, negKey) {
t.Errorf("expect neg key = %v, but got %v", negKey, p.lastNegKey)
if diff := cmp.Diff(negKey, p.lastNegKey); diff != "" {
t.Errorf("diff found in expected NEG; (-want +got):\n%s", diff)
}

if !reflect.DeepEqual(p.lastBsKey, bsKey) {
t.Errorf("expect backend service key = %v, but got %v", bsKey, p.lastBsKey)
if diff := cmp.Diff(bsKey, p.lastBsKey); diff != "" {
t.Errorf("diff found in expected BackendService; (-want +got):\n%s", diff)
}
}

Expand Down Expand Up @@ -550,7 +550,7 @@ func TestPoll(t *testing.T) {
pollAndValidate(step, true, true, 6, true, false)
}

func TestProcessHealthStatus(t *testing.T) {
func TestProcessHealthStatus_shouldNotCrashWhenMissingKeyFromPollMap(t *testing.T) {
t.Parallel()
poller := newFakePoller()

Expand All @@ -562,7 +562,8 @@ func TestProcessHealthStatus(t *testing.T) {
}
res := []*composite.NetworkEndpointWithHealthStatus{}

// processHealthStatus should not crash when pollMap does not have corresponding key.
// processHealthStatus should not crash when pollMap does not have
// corresponding key.
retry, err := poller.processHealthStatus(key, res)
if retry != false {
t.Errorf("expect retry == false, but got %v", retry)
Expand All @@ -571,3 +572,145 @@ func TestProcessHealthStatus(t *testing.T) {
t.Errorf("expect err == nil, but got %v", err)
}
}

func TestProcessHealthStatus_dualStackNEGs(t *testing.T) {
bsName := "bsName1"
backendServiceURL := fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/foo/global/backendServices/%v", bsName)
namespace := "ns1"
podName := "podName1"

networkEndpoint := func(ipv4Address, ipv6Address string) negtypes.NetworkEndpoint {
return negtypes.NetworkEndpoint{IP: ipv4Address, IPv6: ipv6Address, Port: "0"}
}

testCases := []struct {
desc string
healthStatus *composite.NetworkEndpointWithHealthStatus
endpointPodMap negtypes.EndpointPodMap
shouldUpdateReadinessGate bool
enableDualStackNEG bool
}{
{
desc: "IPv6 endpoint should be ignored when enableDualStackNEG is false",
healthStatus: &composite.NetworkEndpointWithHealthStatus{
NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"},
Healths: []*composite.HealthStatusForNetworkEndpoint{{
BackendService: &composite.BackendServiceReference{BackendService: backendServiceURL},
HealthState: healthyState,
Ipv6HealthState: healthyState,
}},
},
endpointPodMap: negtypes.EndpointPodMap{
// This endpoint isn't an exact match of the endpoint in health status
// but should still get updated since IPv6 endpoint should get ignored.
networkEndpoint("10.0.0.1", ""): {Namespace: namespace, Name: podName},
},
shouldUpdateReadinessGate: true,
enableDualStackNEG: false,
},
{
desc: "only IPv6 endpoint healthy, but enableDualStackNEG is false so readiness gate should not update",
healthStatus: &composite.NetworkEndpointWithHealthStatus{
NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1"},
Healths: []*composite.HealthStatusForNetworkEndpoint{{
Ipv6HealthState: healthyState, // Only IPv6 healthy.
}},
},
endpointPodMap: negtypes.EndpointPodMap{
networkEndpoint("10.0.0.1", ""): {Namespace: namespace, Name: podName},
},
shouldUpdateReadinessGate: false,
enableDualStackNEG: false,
},
{
desc: "no readiness gate updated since no pod found matching endpoint",
healthStatus: &composite.NetworkEndpointWithHealthStatus{
NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"},
Healths: []*composite.HealthStatusForNetworkEndpoint{{
HealthState: healthyState,
Ipv6HealthState: healthyState,
}},
},
endpointPodMap: negtypes.EndpointPodMap{
// This endpoint isn't an exact match of the endpoint in health status
// and hence there should be no update.
networkEndpoint("10.0.0.1", ""): {Namespace: namespace, Name: podName},
},
shouldUpdateReadinessGate: false,
enableDualStackNEG: true,
},
{
desc: "both IPv4 and IPv6 healthy, readiness gate should get updated",
healthStatus: &composite.NetworkEndpointWithHealthStatus{
NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"},
Healths: []*composite.HealthStatusForNetworkEndpoint{{
// Both IPv4 and IPv6 healthy.
HealthState: healthyState,
Ipv6HealthState: healthyState,
}},
},
endpointPodMap: negtypes.EndpointPodMap{
networkEndpoint("10.0.0.1", "a::b"): {Namespace: namespace, Name: podName},
},
shouldUpdateReadinessGate: true,
enableDualStackNEG: true,
},
{
desc: "only IPv4 healthy, readiness gate should get updated",
healthStatus: &composite.NetworkEndpointWithHealthStatus{
NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"},
Healths: []*composite.HealthStatusForNetworkEndpoint{{
HealthState: healthyState, // Only IPv4 healthy.
}},
},
endpointPodMap: negtypes.EndpointPodMap{
networkEndpoint("10.0.0.1", "a::b"): {Namespace: namespace, Name: podName},
},
shouldUpdateReadinessGate: true,
enableDualStackNEG: true,
},
{
desc: "only IPv6 healthy, readiness gate should get updated",
healthStatus: &composite.NetworkEndpointWithHealthStatus{
NetworkEndpoint: &composite.NetworkEndpoint{IpAddress: "10.0.0.1", Ipv6Address: "a::b"},
Healths: []*composite.HealthStatusForNetworkEndpoint{{
Ipv6HealthState: healthyState, // Only IPv6 healthy.
}},
},
endpointPodMap: negtypes.EndpointPodMap{
networkEndpoint("10.0.0.1", "a::b"): {Namespace: namespace, Name: podName},
},
shouldUpdateReadinessGate: true,
enableDualStackNEG: true,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
neg := negMeta{SyncerKey: negtypes.NegSyncerKey{}, Name: "negName", Zone: "zone1"}

// Set common fields shared by all input values in testCases.
tc.healthStatus.Healths[0].BackendService = &composite.BackendServiceReference{BackendService: backendServiceURL}

poller := newFakePoller()
poller.pollMap[neg] = &pollTarget{
endpointMap: tc.endpointPodMap,
polling: true,
}
poller.enableDualStackNEG = tc.enableDualStackNEG

poller.processHealthStatus(neg, []*composite.NetworkEndpointWithHealthStatus{tc.healthStatus})

patcher := poller.patcher.(*testPatcher)
if !tc.shouldUpdateReadinessGate && patcher.count > 0 {
t.Errorf("Readiness gates updated for %v; want no readiness gate updated", patcher.lastPod)
}
if tc.shouldUpdateReadinessGate && patcher.count == 0 {
t.Errorf("No readiness gate updated; want readiness gates for %v to get updated", keyFunc(namespace, podName))
}
if tc.shouldUpdateReadinessGate && patcher.count > 0 {
patcher.Eval(t, keyFunc(namespace, podName), meta.ZonalKey(neg.Name, neg.Zone), meta.GlobalKey(bsName))
}
})
}
}
4 changes: 2 additions & 2 deletions pkg/neg/readiness/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type readinessReflector struct {
logger klog.Logger
}

func NewReadinessReflector(kubeClient kubernetes.Interface, podLister cache.Indexer, negCloud negtypes.NetworkEndpointGroupCloud, lookup NegLookup, logger klog.Logger) Reflector {
func NewReadinessReflector(kubeClient kubernetes.Interface, podLister cache.Indexer, negCloud negtypes.NetworkEndpointGroupCloud, lookup NegLookup, enableDualStackNEG bool, logger klog.Logger) Reflector {
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{
Expand All @@ -94,7 +94,7 @@ func NewReadinessReflector(kubeClient kubernetes.Interface, podLister cache.Inde
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
logger: logger,
}
poller := NewPoller(podLister, lookup, reflector, negCloud, logger)
poller := NewPoller(podLister, lookup, reflector, negCloud, enableDualStackNEG, logger)
reflector.poller = poller
return reflector
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/readiness/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (f *fakeLookUp) ReadinessGateEnabled(syncerKey negtypes.NegSyncerKey) bool
}

func newTestReadinessReflector(testContext *negtypes.TestContext) *readinessReflector {
reflector := NewReadinessReflector(testContext.KubeClient, testContext.PodInformer.GetIndexer(), negtypes.NewAdapter(testContext.Cloud), &fakeLookUp{}, klog.TODO())
reflector := NewReadinessReflector(testContext.KubeClient, testContext.PodInformer.GetIndexer(), negtypes.NewAdapter(testContext.Cloud), &fakeLookUp{}, false, klog.TODO())
ret := reflector.(*readinessReflector)
return ret
}
Expand Down

0 comments on commit 7cad8a5

Please sign in to comment.