Skip to content

Commit

Permalink
dst: Stop overriding Host IP with Pod IP on HostPort lookup (#11328)
Browse files Browse the repository at this point in the history
* stopgap fix for hostport staleness

Problem:

When there's a pod with a `hostPort` entry, `GetProfile` requests
targetting the host's IP and that `hostPort` return an endpoint profile
with that pod's IP and `containerPort`. If that pod vanishes and another
one in that same host with that same `hostPort` comes up, the existing
`GetProfile` streams won't get updated with the new pod information
(metadata, identity, protocol).

That breaks the connectivity of the client proxy relying on that stream.

Partial Solution:

It should be less surprising for those `GetProfile` requests to return
an endpoint profile with the same host IP and port requested, and leave
to the cluster's CNI to peform the translation to the corresponding pod
IP and `containerPort`.

This PR performs that change, but continuing returning the corresponding
pod's information alongside.

If the pod associated to that host IP and port changes, the client proxy
won't loose connectivity, but the pod's information won't get updated
(that'll be fixed in a separate PR).

A new unit test validating this has been added, which will be expanded
to validate the changed pod information when that gets implemented.

Details of Change:

- We no longer do the HostPort->ContainerPort conversion, so the
  `getPortForPod` function was dropped.
- The `getPodByIp` function will now be split in two: `getPodByPodIP`
  and `getPodByHostIP`, the latter being called only if the former
  doesn't return anything.
- The `createAddress` function is now simplified in that it just uses
  the passed IP to build the address. The passed IP will depend on which
  of the two functions just mentioned returned the pod (host IP or pod
  IP)
  • Loading branch information
alpeb authored and mateiidavid committed Sep 20, 2023
1 parent 7538f7a commit 21dfd08
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 99 deletions.
98 changes: 28 additions & 70 deletions controller/api/destination/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,22 @@ func (s *server) getProfileByIP(

if svcID == nil {
// If the IP does not map to a service, check if it maps to a pod
pod, err := getPodByIP(s.k8sAPI, ip.String(), port, s.log)
var pod *corev1.Pod
targetIP := ip.String()
pod, err = getPodByPodIP(s.k8sAPI, targetIP, port, s.log)
if err != nil {
return err
}
address, err := s.createAddress(pod, ip.String(), port)
if pod != nil {
targetIP = pod.Status.PodIP
} else {
pod, err = getPodByHostIP(s.k8sAPI, targetIP, port, s.log)
if err != nil {
return err
}
}

address, err := s.createAddress(pod, targetIP, port)
if err != nil {
return fmt.Errorf("failed to create address: %w", err)
}
Expand Down Expand Up @@ -396,7 +407,7 @@ func (s *server) subscribeToServiceWithoutContext(
return nil
}

// Resolves a profile for a single endpoitn, sending updates to the provided
// Resolves a profile for a single endpoint, sending updates to the provided
// stream.
//
// This function does not return until the stream is closed.
Expand Down Expand Up @@ -441,60 +452,18 @@ func (s *server) subscribeToEndpointProfile(
return nil
}

// getPortForPod returns the port that a `pod` is listening on.
//
// Proxies usually receive traffic targeting `podIp:containerPort`.
// However, they may be receiving traffic on `nodeIp:nodePort`. In this
// case, we convert the port to the containerPort for discovery. In k8s parlance,
// this is the 'HostPort' mapping.
func (s *server) getPortForPod(pod *corev1.Pod, targetIP string, port uint32) (uint32, error) {
if pod == nil {
return port, fmt.Errorf("getPortForPod passed a nil pod")
}

if net.ParseIP(targetIP) == nil {
return port, fmt.Errorf("failed to parse hostIP into net.IP: %s", targetIP)
}

if containsIP(pod.Status.PodIPs, targetIP) {
return port, nil
}

if targetIP == pod.Status.HostIP {
for _, container := range pod.Spec.Containers {
for _, containerPort := range container.Ports {
if uint32(containerPort.HostPort) == port {
return uint32(containerPort.ContainerPort), nil
}
}
}
}

s.log.Warnf("unable to find container port as host (%s) matches neither PodIP nor HostIP (%s)", targetIP, pod)
return port, nil
}

func (s *server) createAddress(pod *corev1.Pod, targetIP string, port uint32) (watcher.Address, error) {
var ip, ownerKind, ownerName string
var ownerKind, ownerName string
var err error
if pod != nil {
ownerKind, ownerName, err = s.metadataAPI.GetOwnerKindAndName(context.Background(), pod, true)
if err != nil {
return watcher.Address{}, err
}

port, err = s.getPortForPod(pod, targetIP, port)
if err != nil {
return watcher.Address{}, fmt.Errorf("failed to find Port for Pod: %w", err)
}

ip = pod.Status.PodIP
} else {
ip = targetIP
}

address := watcher.Address{
IP: ip,
IP: targetIP,
Port: port,
Pod: pod,
OwnerName: ownerName,
Expand Down Expand Up @@ -594,32 +563,32 @@ func (s *server) getEndpointByHostname(k8sAPI *k8s.API, hostname string, svcID w
return nil, fmt.Errorf("no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname)
}

// getPodByIP returns a pod that maps to the given IP address. The pod can either
// be in the host network or the pod network. If the pod is in the host
// network, then it must have a container port that exposes `port` as a host
// port.
func getPodByIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) {
// First we check if the address maps to a pod in the host network.
addr := net.JoinHostPort(podIP, fmt.Sprintf("%d", port))
// getPodByHostIP returns a pod that maps to the given IP address in the host
// network. It must have a container port that exposes `port` as a host port.
func getPodByHostIP(k8sAPI *k8s.API, hostIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) {
addr := net.JoinHostPort(hostIP, fmt.Sprintf("%d", port))
hostIPPods, err := getIndexedPods(k8sAPI, watcher.HostIPIndex, addr)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
if len(hostIPPods) == 1 {
log.Debugf("found %s:%d on the host network", podIP, port)
log.Debugf("found %s:%d on the host network", hostIP, port)
return hostIPPods[0], nil
}
if len(hostIPPods) > 1 {
conflictingPods := []string{}
for _, pod := range hostIPPods {
conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
}
log.Warnf("found conflicting %s:%d endpoint on the host network: %s", podIP, port, strings.Join(conflictingPods, ","))
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), podIP, port)
log.Warnf("found conflicting %s:%d endpoint on the host network: %s", hostIP, port, strings.Join(conflictingPods, ","))
return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), hostIP, port)
}

// The address did not map to a pod in the host network, so now we check
// if the IP maps to a pod IP in the pod network.
return nil, nil
}

// getPodByPodIP returns a pod that maps to the given IP address in the pod network
func getPodByPodIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) {
podIPPods, err := getIndexedPods(k8sAPI, watcher.PodIPIndex, podIP)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
Expand Down Expand Up @@ -808,14 +777,3 @@ func getPodSkippedInboundPortsAnnotations(pod *corev1.Pod) (map[uint32]struct{},

return util.ParsePorts(annotation)
}

// Given a list of PodIP, determine is `targetIP` is a member
func containsIP(podIPs []corev1.PodIP, targetIP string) bool {
for _, ip := range podIPs {
if ip.IP == targetIP {
return true
}
}

return false
}
54 changes: 25 additions & 29 deletions controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const clusterIP = "172.17.12.0"
const clusterIPOpaque = "172.17.12.1"
const podIP1 = "172.17.0.12"
const podIP2 = "172.17.0.13"
const podIP3 = "172.17.0.17"
const podIPOpaque = "172.17.0.14"
const podIPSkipped = "172.17.0.15"
const podIPPolicy = "172.17.0.16"
Expand Down Expand Up @@ -375,6 +374,27 @@ func TestGetProfiles(t *testing.T) {
t.Fatalf("Expected port %d to be a non-opaque protocol, but it was opaque", 80)
}
})

t.Run("Return profile for host port pods", func(t *testing.T) {
hostPort := uint32(7777)
stream := profileStream(t, externalIP, hostPort, "")

// HostPort maps to pod.
profile := assertSingleProfile(t, stream.updates)
dstPod := profile.Endpoint.MetricLabels["pod"]
if dstPod != "hostport-mapping" {
t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping", dstPod)
}

ip, err := addr.ParseProxyIPV4(externalIP)
if err != nil {
t.Fatalf("Error parsing IP: %s", err)
}
addr := profile.Endpoint.Addr
if addr.Ip.String() != ip.String() && addr.Port != hostPort {
t.Fatalf("Expected endpoint addr to be %s port:%d got %s", ip, hostPort, addr)
}
})
}

func TestTokenStructure(t *testing.T) {
Expand Down Expand Up @@ -434,30 +454,6 @@ func toAddress(path string, port uint32) (*net.TcpAddress, error) {
}, nil
}

func TestHostPortMapping(t *testing.T) {
hostPort := uint32(7777)
containerPort := uint32(80)
server := makeServer(t)

pod, err := getPodByIP(server.k8sAPI, externalIP, hostPort, server.log)
if err != nil {
t.Fatalf("error retrieving pod by external IP %s", err)
}

address, err := server.createAddress(pod, externalIP, hostPort)
if err != nil {
t.Fatalf("error calling createAddress() %s", err)
}

if address.IP != podIP3 {
t.Fatalf("expected podIP (%s), received other IP (%s)", podIP3, address.IP)
}

if address.Port != containerPort {
t.Fatalf("expected containerPort (%d) but received port (%d) instead", containerPort, address.Port)
}
}

func TestIpWatcherGetSvcID(t *testing.T) {
name := "service"
namespace := "test"
Expand Down Expand Up @@ -566,7 +562,7 @@ status:

k8sAPI.Sync(nil)
// Get host IP pod that is mapped to the port `hostPort1`
pod, err := getPodByIP(k8sAPI, hostIP, hostPort1, logging.WithFields(nil))
pod, err := getPodByHostIP(k8sAPI, hostIP, hostPort1, logging.WithFields(nil))
if err != nil {
t.Fatalf("failed to get pod: %s", err)
}
Expand All @@ -579,7 +575,7 @@ status:
// Get host IP pod that is mapped to the port `hostPort2`; this tests
// that the indexer properly adds multiple containers from a single
// pod.
pod, err = getPodByIP(k8sAPI, hostIP, hostPort2, logging.WithFields(nil))
pod, err = getPodByHostIP(k8sAPI, hostIP, hostPort2, logging.WithFields(nil))
if err != nil {
t.Fatalf("failed to get pod: %s", err)
}
Expand All @@ -590,15 +586,15 @@ status:
t.Fatalf("expected pod name to be %s, but got %s", expectedPodName, pod.Name)
}
// Get host IP pod with unmapped host port
pod, err = getPodByIP(k8sAPI, hostIP, 12347, logging.WithFields(nil))
pod, err = getPodByHostIP(k8sAPI, hostIP, 12347, logging.WithFields(nil))
if err != nil {
t.Fatalf("expected no error when getting host IP pod with unmapped host port, but got: %s", err)
}
if pod != nil {
t.Fatal("expected no pod to be found with unmapped host port")
}
// Get pod IP pod and expect an error
_, err = getPodByIP(k8sAPI, podIP, 12346, logging.WithFields(nil))
_, err = getPodByPodIP(k8sAPI, podIP, 12346, logging.WithFields(nil))
if err == nil {
t.Fatal("expected error when getting by pod IP and unmapped host port, but got none")
}
Expand Down
1 change: 1 addition & 0 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ kind: Pod
apiVersion: v1
metadata:
name: hostport-mapping
namespace: ns
status:
phase: Running
hostIP: 192.168.1.20
Expand Down

0 comments on commit 21dfd08

Please sign in to comment.