From 21dfd0898601f0aa98e804cd29af8248f51fa53f Mon Sep 17 00:00:00 2001 From: Alejandro Pedraza Date: Wed, 6 Sep 2023 10:35:14 -0500 Subject: [PATCH] dst: Stop overriding Host IP with Pod IP on HostPort lookup (#11328) * stopgap fix for hostport staleness Problem: When there's a pod with a `hostPort` entry, `GetProfile` requests targetting the host's IP and that `hostPort` return an endpoint profile with that pod's IP and `containerPort`. If that pod vanishes and another one in that same host with that same `hostPort` comes up, the existing `GetProfile` streams won't get updated with the new pod information (metadata, identity, protocol). That breaks the connectivity of the client proxy relying on that stream. Partial Solution: It should be less surprising for those `GetProfile` requests to return an endpoint profile with the same host IP and port requested, and leave to the cluster's CNI to peform the translation to the corresponding pod IP and `containerPort`. This PR performs that change, but continuing returning the corresponding pod's information alongside. If the pod associated to that host IP and port changes, the client proxy won't loose connectivity, but the pod's information won't get updated (that'll be fixed in a separate PR). A new unit test validating this has been added, which will be expanded to validate the changed pod information when that gets implemented. Details of Change: - We no longer do the HostPort->ContainerPort conversion, so the `getPortForPod` function was dropped. - The `getPodByIp` function will now be split in two: `getPodByPodIP` and `getPodByHostIP`, the latter being called only if the former doesn't return anything. - The `createAddress` function is now simplified in that it just uses the passed IP to build the address. The passed IP will depend on which of the two functions just mentioned returned the pod (host IP or pod IP) --- controller/api/destination/server.go | 98 +++++++---------------- controller/api/destination/server_test.go | 54 ++++++------- controller/api/destination/test_util.go | 1 + 3 files changed, 54 insertions(+), 99 deletions(-) diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index a220270e0de63..e7bf70cdd6c26 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -222,11 +222,22 @@ func (s *server) getProfileByIP( if svcID == nil { // If the IP does not map to a service, check if it maps to a pod - pod, err := getPodByIP(s.k8sAPI, ip.String(), port, s.log) + var pod *corev1.Pod + targetIP := ip.String() + pod, err = getPodByPodIP(s.k8sAPI, targetIP, port, s.log) if err != nil { return err } - address, err := s.createAddress(pod, ip.String(), port) + if pod != nil { + targetIP = pod.Status.PodIP + } else { + pod, err = getPodByHostIP(s.k8sAPI, targetIP, port, s.log) + if err != nil { + return err + } + } + + address, err := s.createAddress(pod, targetIP, port) if err != nil { return fmt.Errorf("failed to create address: %w", err) } @@ -396,7 +407,7 @@ func (s *server) subscribeToServiceWithoutContext( return nil } -// Resolves a profile for a single endpoitn, sending updates to the provided +// Resolves a profile for a single endpoint, sending updates to the provided // stream. // // This function does not return until the stream is closed. @@ -441,60 +452,18 @@ func (s *server) subscribeToEndpointProfile( return nil } -// getPortForPod returns the port that a `pod` is listening on. -// -// Proxies usually receive traffic targeting `podIp:containerPort`. -// However, they may be receiving traffic on `nodeIp:nodePort`. In this -// case, we convert the port to the containerPort for discovery. In k8s parlance, -// this is the 'HostPort' mapping. -func (s *server) getPortForPod(pod *corev1.Pod, targetIP string, port uint32) (uint32, error) { - if pod == nil { - return port, fmt.Errorf("getPortForPod passed a nil pod") - } - - if net.ParseIP(targetIP) == nil { - return port, fmt.Errorf("failed to parse hostIP into net.IP: %s", targetIP) - } - - if containsIP(pod.Status.PodIPs, targetIP) { - return port, nil - } - - if targetIP == pod.Status.HostIP { - for _, container := range pod.Spec.Containers { - for _, containerPort := range container.Ports { - if uint32(containerPort.HostPort) == port { - return uint32(containerPort.ContainerPort), nil - } - } - } - } - - s.log.Warnf("unable to find container port as host (%s) matches neither PodIP nor HostIP (%s)", targetIP, pod) - return port, nil -} - func (s *server) createAddress(pod *corev1.Pod, targetIP string, port uint32) (watcher.Address, error) { - var ip, ownerKind, ownerName string + var ownerKind, ownerName string var err error if pod != nil { ownerKind, ownerName, err = s.metadataAPI.GetOwnerKindAndName(context.Background(), pod, true) if err != nil { return watcher.Address{}, err } - - port, err = s.getPortForPod(pod, targetIP, port) - if err != nil { - return watcher.Address{}, fmt.Errorf("failed to find Port for Pod: %w", err) - } - - ip = pod.Status.PodIP - } else { - ip = targetIP } address := watcher.Address{ - IP: ip, + IP: targetIP, Port: port, Pod: pod, OwnerName: ownerName, @@ -594,19 +563,16 @@ func (s *server) getEndpointByHostname(k8sAPI *k8s.API, hostname string, svcID w return nil, fmt.Errorf("no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname) } -// getPodByIP returns a pod that maps to the given IP address. The pod can either -// be in the host network or the pod network. If the pod is in the host -// network, then it must have a container port that exposes `port` as a host -// port. -func getPodByIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) { - // First we check if the address maps to a pod in the host network. - addr := net.JoinHostPort(podIP, fmt.Sprintf("%d", port)) +// getPodByHostIP returns a pod that maps to the given IP address in the host +// network. It must have a container port that exposes `port` as a host port. +func getPodByHostIP(k8sAPI *k8s.API, hostIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) { + addr := net.JoinHostPort(hostIP, fmt.Sprintf("%d", port)) hostIPPods, err := getIndexedPods(k8sAPI, watcher.HostIPIndex, addr) if err != nil { return nil, status.Error(codes.Unknown, err.Error()) } if len(hostIPPods) == 1 { - log.Debugf("found %s:%d on the host network", podIP, port) + log.Debugf("found %s:%d on the host network", hostIP, port) return hostIPPods[0], nil } if len(hostIPPods) > 1 { @@ -614,12 +580,15 @@ func getPodByIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry) for _, pod := range hostIPPods { conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name)) } - log.Warnf("found conflicting %s:%d endpoint on the host network: %s", podIP, port, strings.Join(conflictingPods, ",")) - return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), podIP, port) + log.Warnf("found conflicting %s:%d endpoint on the host network: %s", hostIP, port, strings.Join(conflictingPods, ",")) + return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), hostIP, port) } - // The address did not map to a pod in the host network, so now we check - // if the IP maps to a pod IP in the pod network. + return nil, nil +} + +// getPodByPodIP returns a pod that maps to the given IP address in the pod network +func getPodByPodIP(k8sAPI *k8s.API, podIP string, port uint32, log *logging.Entry) (*corev1.Pod, error) { podIPPods, err := getIndexedPods(k8sAPI, watcher.PodIPIndex, podIP) if err != nil { return nil, status.Error(codes.Unknown, err.Error()) @@ -808,14 +777,3 @@ func getPodSkippedInboundPortsAnnotations(pod *corev1.Pod) (map[uint32]struct{}, return util.ParsePorts(annotation) } - -// Given a list of PodIP, determine is `targetIP` is a member -func containsIP(podIPs []corev1.PodIP, targetIP string) bool { - for _, ip := range podIPs { - if ip.IP == targetIP { - return true - } - } - - return false -} diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index b9e2edb8e7f06..0b25a2af40bdc 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -23,7 +23,6 @@ const clusterIP = "172.17.12.0" const clusterIPOpaque = "172.17.12.1" const podIP1 = "172.17.0.12" const podIP2 = "172.17.0.13" -const podIP3 = "172.17.0.17" const podIPOpaque = "172.17.0.14" const podIPSkipped = "172.17.0.15" const podIPPolicy = "172.17.0.16" @@ -375,6 +374,27 @@ func TestGetProfiles(t *testing.T) { t.Fatalf("Expected port %d to be a non-opaque protocol, but it was opaque", 80) } }) + + t.Run("Return profile for host port pods", func(t *testing.T) { + hostPort := uint32(7777) + stream := profileStream(t, externalIP, hostPort, "") + + // HostPort maps to pod. + profile := assertSingleProfile(t, stream.updates) + dstPod := profile.Endpoint.MetricLabels["pod"] + if dstPod != "hostport-mapping" { + t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping", dstPod) + } + + ip, err := addr.ParseProxyIPV4(externalIP) + if err != nil { + t.Fatalf("Error parsing IP: %s", err) + } + addr := profile.Endpoint.Addr + if addr.Ip.String() != ip.String() && addr.Port != hostPort { + t.Fatalf("Expected endpoint addr to be %s port:%d got %s", ip, hostPort, addr) + } + }) } func TestTokenStructure(t *testing.T) { @@ -434,30 +454,6 @@ func toAddress(path string, port uint32) (*net.TcpAddress, error) { }, nil } -func TestHostPortMapping(t *testing.T) { - hostPort := uint32(7777) - containerPort := uint32(80) - server := makeServer(t) - - pod, err := getPodByIP(server.k8sAPI, externalIP, hostPort, server.log) - if err != nil { - t.Fatalf("error retrieving pod by external IP %s", err) - } - - address, err := server.createAddress(pod, externalIP, hostPort) - if err != nil { - t.Fatalf("error calling createAddress() %s", err) - } - - if address.IP != podIP3 { - t.Fatalf("expected podIP (%s), received other IP (%s)", podIP3, address.IP) - } - - if address.Port != containerPort { - t.Fatalf("expected containerPort (%d) but received port (%d) instead", containerPort, address.Port) - } -} - func TestIpWatcherGetSvcID(t *testing.T) { name := "service" namespace := "test" @@ -566,7 +562,7 @@ status: k8sAPI.Sync(nil) // Get host IP pod that is mapped to the port `hostPort1` - pod, err := getPodByIP(k8sAPI, hostIP, hostPort1, logging.WithFields(nil)) + pod, err := getPodByHostIP(k8sAPI, hostIP, hostPort1, logging.WithFields(nil)) if err != nil { t.Fatalf("failed to get pod: %s", err) } @@ -579,7 +575,7 @@ status: // Get host IP pod that is mapped to the port `hostPort2`; this tests // that the indexer properly adds multiple containers from a single // pod. - pod, err = getPodByIP(k8sAPI, hostIP, hostPort2, logging.WithFields(nil)) + pod, err = getPodByHostIP(k8sAPI, hostIP, hostPort2, logging.WithFields(nil)) if err != nil { t.Fatalf("failed to get pod: %s", err) } @@ -590,7 +586,7 @@ status: t.Fatalf("expected pod name to be %s, but got %s", expectedPodName, pod.Name) } // Get host IP pod with unmapped host port - pod, err = getPodByIP(k8sAPI, hostIP, 12347, logging.WithFields(nil)) + pod, err = getPodByHostIP(k8sAPI, hostIP, 12347, logging.WithFields(nil)) if err != nil { t.Fatalf("expected no error when getting host IP pod with unmapped host port, but got: %s", err) } @@ -598,7 +594,7 @@ status: t.Fatal("expected no pod to be found with unmapped host port") } // Get pod IP pod and expect an error - _, err = getPodByIP(k8sAPI, podIP, 12346, logging.WithFields(nil)) + _, err = getPodByPodIP(k8sAPI, podIP, 12346, logging.WithFields(nil)) if err == nil { t.Fatal("expected error when getting by pod IP and unmapped host port, but got none") } diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index d96975497aa6e..8e8cf3e412ccf 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -336,6 +336,7 @@ kind: Pod apiVersion: v1 metadata: name: hostport-mapping + namespace: ns status: phase: Running hostIP: 192.168.1.20