Skip to content

Commit

Permalink
fix: nodePort #2704 - publish ready endpoints
Browse files Browse the repository at this point in the history
fix: lowercase local functions
chore: reuse nodes, cut logs
  • Loading branch information
nefelim4ag committed Aug 29, 2023
1 parent 79196a1 commit f696c51
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 1 deletion.
49 changes: 48 additions & 1 deletion source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,30 @@ func extractLoadBalancerTargets(svc *v1.Service, resolveLoadBalancerHostname boo
return targets
}

func isPodStatusReady(status v1.PodStatus) bool {
_, condition := getPodCondition(&status, v1.PodReady)
return condition != nil && condition.Status == v1.ConditionTrue
}

func getPodCondition(status *v1.PodStatus, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if status == nil {
return -1, nil
}
return getPodConditionFromList(status.Conditions, conditionType)
}

func getPodConditionFromList(conditions []v1.PodCondition, conditionType v1.PodConditionType) (int, *v1.PodCondition) {
if conditions == nil {
return -1, nil
}
for i := range conditions {
if conditions[i].Type == conditionType {
return i, &conditions[i]
}
}
return -1, nil
}

func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targets, error) {
var (
internalIPs endpoint.Targets
Expand All @@ -615,19 +639,42 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe
return nil, err
}

var nodesReady []*v1.Node
var nodesRunning []*v1.Node
for _, v := range pods {
if v.Status.Phase == v1.PodRunning {
node, err := sc.nodeInformer.Lister().Get(v.Spec.NodeName)
if err != nil {
log.Debugf("Unable to find node where Pod %s is running", v.Spec.Hostname)
continue
}

if _, ok := nodesMap[node]; !ok {
nodesMap[node] = *new(struct{})
nodes = append(nodes, node)
nodesRunning = append(nodesRunning, node)

if isPodStatusReady(v.Status) {
nodesReady = append(nodesReady, node)
// Check pod not terminating
if v.GetDeletionTimestamp() == nil {
nodes = append(nodes, node)
}
}
}
}
}

if len(nodes) > 0 {
// Works same as service endpoints
} else if len(nodesReady) > 0 {
// 2 level of panic modes as safe guard, because old wrong behavior can be used by someone
// Publish all endpoints not always a bad thing
log.Debugf("All pods in terminating state, use ready")
nodes = nodesReady
} else {
log.Debugf("All pods not ready, use all running")
nodes = nodesRunning
}
default:
nodes, err = sc.nodeInformer.Lister().List(labels.Everything())
if err != nil {
Expand Down
50 changes: 50 additions & 0 deletions source/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,7 @@ func TestServiceSourceNodePortServices(t *testing.T) {
podNames []string
nodeIndex []int
phases []v1.PodPhase
conditions []v1.PodCondition
labelSelector labels.Selector
}{
{
Expand Down Expand Up @@ -1817,6 +1818,7 @@ func TestServiceSourceNodePortServices(t *testing.T) {
podNames: []string{"pod-0"},
nodeIndex: []int{1},
phases: []v1.PodPhase{v1.PodRunning},
conditions: []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}},
},
{
title: "annotated NodePort services with ExternalTrafficPolicy=Local and multiple pods on a single node return an endpoint with unique IP addresses of the cluster's nodes where pods is running only",
Expand Down Expand Up @@ -1859,6 +1861,53 @@ func TestServiceSourceNodePortServices(t *testing.T) {
podNames: []string{"pod-0", "pod-1"},
nodeIndex: []int{1, 1},
phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning},
conditions: []v1.PodCondition{
{Type: v1.PodReady, Status: v1.ConditionTrue},
{Type: v1.PodReady, Status: v1.ConditionTrue},
},
},
{
title: "annotated NodePort services with ExternalTrafficPolicy=Local return pods in Ready & Running state",
svcNamespace: "testing",
svcName: "foo",
svcType: v1.ServiceTypeNodePort,
svcTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
labels: map[string]string{},
annotations: map[string]string{
hostnameAnnotationKey: "foo.example.org.",
},
expected: []*endpoint.Endpoint{
{DNSName: "_foo._tcp.foo.example.org", Targets: endpoint.Targets{"0 50 30192 foo.example.org"}, RecordType: endpoint.RecordTypeSRV},
{DNSName: "foo.example.org", Targets: endpoint.Targets{"54.10.11.1"}, RecordType: endpoint.RecordTypeA},
},
nodes: []*v1.Node{{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.1"},
{Type: v1.NodeInternalIP, Address: "10.0.1.1"},
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{
{Type: v1.NodeExternalIP, Address: "54.10.11.2"},
{Type: v1.NodeInternalIP, Address: "10.0.1.2"},
},
},
}},
podNames: []string{"pod-0", "pod-1"},
nodeIndex: []int{0, 1},
phases: []v1.PodPhase{v1.PodRunning, v1.PodRunning},
conditions: []v1.PodCondition{
{Type: v1.PodReady, Status: v1.ConditionTrue},
{Type: v1.PodReady, Status: v1.ConditionFalse},
},
},
{
title: "access=private annotation NodePort services return an endpoint with private IP addresses of the cluster's nodes",
Expand Down Expand Up @@ -2153,6 +2202,7 @@ func TestServiceSourceNodePortServices(t *testing.T) {
},
Status: v1.PodStatus{
Phase: tc.phases[i],
Conditions: []v1.PodCondition{tc.conditions[i]},
},
}

Expand Down

0 comments on commit f696c51

Please sign in to comment.