From 5fbca8adf8b088befe2b72f1d3ddfcd54016e718 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Mon, 5 Feb 2024 12:25:16 -0800 Subject: [PATCH] Reflect changes to server selector in opaqueness (#12031) Fixes #11995 If a Server is marking a Pod's port as opaque and then the Server's podSelector is updated to no longer select that Pod, then the Pod's port should no longer be marked as opaque. However, this update does not result in any updates from the destination API's Get stream and the port remains marked as opaque. We fix this by updating the endpoint watcher's handling of Server updates to consider both the old and the new Server. Signed-off-by: Alex Leong --- controller/api/destination/server_test.go | 95 ++++++++++++++++++- controller/api/destination/test_util.go | 28 +++++- .../destination/watcher/endpoints_watcher.go | 94 ++++++++++-------- controller/k8s/test_helper.go | 4 +- 4 files changed, 178 insertions(+), 43 deletions(-) diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index ead7caab96b5a..94277164cad0c 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -26,6 +26,7 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local" const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local" const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local" const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local" +const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local" const clusterIP = "172.17.12.0" const clusterIPOpaque = "172.17.12.1" const podIP1 = "172.17.0.12" @@ -133,6 +134,98 @@ func TestGet(t *testing.T) { } }) + t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) { + server, client := getServerWithClient(t) + defer server.clusterStore.UnregisterGauges() + + stream := &bufferingGetStream{ + updates: make(chan *pb.Update, 50), + MockServerStream: util.NewMockServerStream(), + } + defer stream.Cancel() + errs := make(chan error) + + path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80) + + // server.Get blocks until the grpc stream is complete so we call it + // in a goroutine and watch stream.updates for updates. + go func() { + err := server.Get(&pb.GetDestination{ + Scheme: "k8s", + Path: path, + }, stream) + if err != nil { + errs <- err + } + }() + + select { + case err := <-errs: + t.Fatalf("Got error: %s", err) + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { + t.Fatalf("Expected opaque transport for %s but was nil", path) + } + } + + // Update the Server's pod selector so that it no longer selects the + // pod. This should result in the proxy protocol no longer being marked + // as opaque. + srv, err := client.ServerV1beta1().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + // PodSelector is updated to NOT select the pod + srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"} + _, err = client.ServerV1beta1().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil { + t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport()) + } + case err := <-errs: + t.Fatalf("Got error: %s", err) + } + + // Update the Server's pod selector so that it once again selects the + // pod. This should result in the proxy protocol once again being marked + // as opaque. + srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"} + + _, err = client.ServerV1beta1().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { + t.Fatalf("Expected opaque transport for %s but was nil", path) + } + case err := <-errs: + t.Fatalf("Got error: %s", err) + } + }) + t.Run("Remote discovery", func(t *testing.T) { server := makeServer(t) defer server.clusterStore.UnregisterGauges() @@ -638,7 +731,7 @@ func TestGetProfiles(t *testing.T) { } // Server is created, setting the port to opaque - (*l5dClient).ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{ + l5dClient.ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{ ObjectMeta: metav1.ObjectMeta{ Name: "srv-hostport-mapping-2", Namespace: "ns", diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 73961b001fdc0..2853188de8758 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -17,7 +17,7 @@ func makeServer(t *testing.T) *server { return srv } -func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) { +func getServerWithClient(t *testing.T) (*server, l5dcrdclient.Interface) { meshedPodResources := []string{` apiVersion: v1 kind: Namespace @@ -299,6 +299,32 @@ status: policyResources := []string{ ` apiVersion: v1 +kind: Service +metadata: + name: policy-test + namespace: ns +spec: + type: LoadBalancer + clusterIP: 172.17.12.2 + ports: + - port: 80`, + ` +apiVersion: v1 +kind: Endpoints +metadata: + name: policy-test + namespace: ns +subsets: +- addresses: + - ip: 172.17.0.16 + targetRef: + kind: Pod + name: pod-policyResources + namespace: ns + ports: + - port: 80`, + ` +apiVersion: v1 kind: Pod metadata: labels: diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 19ec804febd93..adf35ac2a2b1a 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -528,11 +528,14 @@ func (ew *EndpointsWatcher) addServer(obj interface{}) { defer ew.Unlock() server := obj.(*v1beta1.Server) for _, sp := range ew.publishers { - sp.updateServer(server, true) + sp.updateServer(nil, server) } } func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) { + ew.Lock() + defer ew.Unlock() + oldServer := oldObj.(*v1beta1.Server) newServer := newObj.(*v1beta1.Server) oldUpdated := latestUpdated(oldServer.ManagedFields) @@ -542,7 +545,9 @@ func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) serverInformerLag.Observe(delta.Seconds()) } - ew.addServer(newObj) + for _, sp := range ew.publishers { + sp.updateServer(oldServer, newServer) + } } func (ew *EndpointsWatcher) deleteServer(obj interface{}) { @@ -550,7 +555,7 @@ func (ew *EndpointsWatcher) deleteServer(obj interface{}) { defer ew.Unlock() server := obj.(*v1beta1.Server) for _, sp := range ew.publishers { - sp.updateServer(server, false) + sp.updateServer(server, nil) } } @@ -720,17 +725,12 @@ func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname) } -func (sp *servicePublisher) updateServer(server *v1beta1.Server, isAdd bool) { +func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta1.Server) { sp.Lock() defer sp.Unlock() - selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector) - if err != nil { - sp.log.Errorf("failed to create Selector: %s", err) - return - } for _, pp := range sp.ports { - pp.updateServer(server, selector, isAdd) + pp.updateServer(oldServer, newServer) } } @@ -1199,38 +1199,19 @@ func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) { pp.metrics.setSubscribers(len(pp.listeners)) } - -func (pp *portPublisher) updateServer(server *v1beta1.Server, selector labels.Selector, isAdd bool) { +func (pp *portPublisher) updateServer(oldServer, newServer *v1beta1.Server) { updated := false for id, address := range pp.addresses.Addresses { - if address.Pod != nil && selector.Matches(labels.Set(address.Pod.Labels)) { - var portMatch bool - switch server.Spec.Port.Type { - case intstr.Int: - if server.Spec.Port.IntVal == int32(address.Port) { - portMatch = true - } - case intstr.String: - for _, c := range address.Pod.Spec.Containers { - for _, p := range c.Ports { - if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal { - portMatch = true - } - } - } - default: - continue + + if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) { + if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol { + address.OpaqueProtocol = true + } else { + address.OpaqueProtocol = false } - if portMatch { - if isAdd && server.Spec.ProxyProtocol == opaqueProtocol { - address.OpaqueProtocol = true - } else { - address.OpaqueProtocol = false - } - if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol { - pp.addresses.Addresses[id] = address - updated = true - } + if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol { + pp.addresses.Addresses[id] = address + updated = true } } } @@ -1242,6 +1223,41 @@ func (pp *portPublisher) updateServer(server *v1beta1.Server, selector labels.Se } } +func (pp *portPublisher) isAddressSelected(address Address, server *v1beta1.Server) bool { + if server == nil { + return false + } + + if address.Pod != nil { + selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector) + if err != nil { + pp.log.Errorf("failed to create Selector: %s", err) + return false + } + + if !selector.Matches(labels.Set(address.Pod.Labels)) { + return false + } + + switch server.Spec.Port.Type { + case intstr.Int: + if server.Spec.Port.IntVal == int32(address.Port) { + return true + } + case intstr.String: + for _, c := range address.Pod.Spec.Containers { + for _, p := range c.Ports { + if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal { + return true + } + } + } + } + + } + return false +} + //////////// /// util /// //////////// diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 725cb65da2456..bd9f6e4204bbf 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -23,13 +23,13 @@ func NewFakeAPI(configs ...string) (*API, error) { // NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like // NewFakeAPI, but it also returns the mock client for linkerd CRDs -func NewFakeAPIWithL5dClient(configs ...string) (*API, *l5dcrdclient.Interface, error) { +func NewFakeAPIWithL5dClient(configs ...string) (*API, l5dcrdclient.Interface, error) { clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...) if err != nil { return nil, nil, err } - return NewFakeClusterScopedAPI(clientSet, l5dClientSet), &l5dClientSet, nil + return NewFakeClusterScopedAPI(clientSet, l5dClientSet), l5dClientSet, nil } // NewFakeClusterScopedAPI provides a mock Kubernetes API for testing.