diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index ead7caab96b5a..94277164cad0c 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -26,6 +26,7 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local" const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local" const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local" const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local" +const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local" const clusterIP = "172.17.12.0" const clusterIPOpaque = "172.17.12.1" const podIP1 = "172.17.0.12" @@ -133,6 +134,98 @@ func TestGet(t *testing.T) { } }) + t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) { + server, client := getServerWithClient(t) + defer server.clusterStore.UnregisterGauges() + + stream := &bufferingGetStream{ + updates: make(chan *pb.Update, 50), + MockServerStream: util.NewMockServerStream(), + } + defer stream.Cancel() + errs := make(chan error) + + path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80) + + // server.Get blocks until the grpc stream is complete so we call it + // in a goroutine and watch stream.updates for updates. + go func() { + err := server.Get(&pb.GetDestination{ + Scheme: "k8s", + Path: path, + }, stream) + if err != nil { + errs <- err + } + }() + + select { + case err := <-errs: + t.Fatalf("Got error: %s", err) + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { + t.Fatalf("Expected opaque transport for %s but was nil", path) + } + } + + // Update the Server's pod selector so that it no longer selects the + // pod. This should result in the proxy protocol no longer being marked + // as opaque. + srv, err := client.ServerV1beta1().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + // PodSelector is updated to NOT select the pod + srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"} + _, err = client.ServerV1beta1().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil { + t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport()) + } + case err := <-errs: + t.Fatalf("Got error: %s", err) + } + + // Update the Server's pod selector so that it once again selects the + // pod. This should result in the proxy protocol once again being marked + // as opaque. + srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"} + + _, err = client.ServerV1beta1().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + select { + case update := <-stream.updates: + addrs := update.GetAdd().Addrs + if len(addrs) == 0 { + t.Fatalf("Expected len(addrs) to be > 0") + } + + if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil { + t.Fatalf("Expected opaque transport for %s but was nil", path) + } + case err := <-errs: + t.Fatalf("Got error: %s", err) + } + }) + t.Run("Remote discovery", func(t *testing.T) { server := makeServer(t) defer server.clusterStore.UnregisterGauges() @@ -638,7 +731,7 @@ func TestGetProfiles(t *testing.T) { } // Server is created, setting the port to opaque - (*l5dClient).ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{ + l5dClient.ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{ ObjectMeta: metav1.ObjectMeta{ Name: "srv-hostport-mapping-2", Namespace: "ns", diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 73961b001fdc0..2853188de8758 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -17,7 +17,7 @@ func makeServer(t *testing.T) *server { return srv } -func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) { +func getServerWithClient(t *testing.T) (*server, l5dcrdclient.Interface) { meshedPodResources := []string{` apiVersion: v1 kind: Namespace @@ -299,6 +299,32 @@ status: policyResources := []string{ ` apiVersion: v1 +kind: Service +metadata: + name: policy-test + namespace: ns +spec: + type: LoadBalancer + clusterIP: 172.17.12.2 + ports: + - port: 80`, + ` +apiVersion: v1 +kind: Endpoints +metadata: + name: policy-test + namespace: ns +subsets: +- addresses: + - ip: 172.17.0.16 + targetRef: + kind: Pod + name: pod-policyResources + namespace: ns + ports: + - port: 80`, + ` +apiVersion: v1 kind: Pod metadata: labels: diff --git a/controller/api/destination/watcher/endpoints_watcher.go b/controller/api/destination/watcher/endpoints_watcher.go index 19ec804febd93..adf35ac2a2b1a 100644 --- a/controller/api/destination/watcher/endpoints_watcher.go +++ b/controller/api/destination/watcher/endpoints_watcher.go @@ -528,11 +528,14 @@ func (ew *EndpointsWatcher) addServer(obj interface{}) { defer ew.Unlock() server := obj.(*v1beta1.Server) for _, sp := range ew.publishers { - sp.updateServer(server, true) + sp.updateServer(nil, server) } } func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) { + ew.Lock() + defer ew.Unlock() + oldServer := oldObj.(*v1beta1.Server) newServer := newObj.(*v1beta1.Server) oldUpdated := latestUpdated(oldServer.ManagedFields) @@ -542,7 +545,9 @@ func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) serverInformerLag.Observe(delta.Seconds()) } - ew.addServer(newObj) + for _, sp := range ew.publishers { + sp.updateServer(oldServer, newServer) + } } func (ew *EndpointsWatcher) deleteServer(obj interface{}) { @@ -550,7 +555,7 @@ func (ew *EndpointsWatcher) deleteServer(obj interface{}) { defer ew.Unlock() server := obj.(*v1beta1.Server) for _, sp := range ew.publishers { - sp.updateServer(server, false) + sp.updateServer(server, nil) } } @@ -720,17 +725,12 @@ func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname) } -func (sp *servicePublisher) updateServer(server *v1beta1.Server, isAdd bool) { +func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta1.Server) { sp.Lock() defer sp.Unlock() - selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector) - if err != nil { - sp.log.Errorf("failed to create Selector: %s", err) - return - } for _, pp := range sp.ports { - pp.updateServer(server, selector, isAdd) + pp.updateServer(oldServer, newServer) } } @@ -1199,38 +1199,19 @@ func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) { pp.metrics.setSubscribers(len(pp.listeners)) } - -func (pp *portPublisher) updateServer(server *v1beta1.Server, selector labels.Selector, isAdd bool) { +func (pp *portPublisher) updateServer(oldServer, newServer *v1beta1.Server) { updated := false for id, address := range pp.addresses.Addresses { - if address.Pod != nil && selector.Matches(labels.Set(address.Pod.Labels)) { - var portMatch bool - switch server.Spec.Port.Type { - case intstr.Int: - if server.Spec.Port.IntVal == int32(address.Port) { - portMatch = true - } - case intstr.String: - for _, c := range address.Pod.Spec.Containers { - for _, p := range c.Ports { - if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal { - portMatch = true - } - } - } - default: - continue + + if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) { + if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol { + address.OpaqueProtocol = true + } else { + address.OpaqueProtocol = false } - if portMatch { - if isAdd && server.Spec.ProxyProtocol == opaqueProtocol { - address.OpaqueProtocol = true - } else { - address.OpaqueProtocol = false - } - if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol { - pp.addresses.Addresses[id] = address - updated = true - } + if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol { + pp.addresses.Addresses[id] = address + updated = true } } } @@ -1242,6 +1223,41 @@ func (pp *portPublisher) updateServer(server *v1beta1.Server, selector labels.Se } } +func (pp *portPublisher) isAddressSelected(address Address, server *v1beta1.Server) bool { + if server == nil { + return false + } + + if address.Pod != nil { + selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector) + if err != nil { + pp.log.Errorf("failed to create Selector: %s", err) + return false + } + + if !selector.Matches(labels.Set(address.Pod.Labels)) { + return false + } + + switch server.Spec.Port.Type { + case intstr.Int: + if server.Spec.Port.IntVal == int32(address.Port) { + return true + } + case intstr.String: + for _, c := range address.Pod.Spec.Containers { + for _, p := range c.Ports { + if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal { + return true + } + } + } + } + + } + return false +} + //////////// /// util /// //////////// diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 725cb65da2456..bd9f6e4204bbf 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -23,13 +23,13 @@ func NewFakeAPI(configs ...string) (*API, error) { // NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like // NewFakeAPI, but it also returns the mock client for linkerd CRDs -func NewFakeAPIWithL5dClient(configs ...string) (*API, *l5dcrdclient.Interface, error) { +func NewFakeAPIWithL5dClient(configs ...string) (*API, l5dcrdclient.Interface, error) { clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...) if err != nil { return nil, nil, err } - return NewFakeClusterScopedAPI(clientSet, l5dClientSet), &l5dClientSet, nil + return NewFakeClusterScopedAPI(clientSet, l5dClientSet), l5dClientSet, nil } // NewFakeClusterScopedAPI provides a mock Kubernetes API for testing.