Skip to content

Commit

Permalink
Reflect changes to server selector in opaqueness (#12031)
Browse files Browse the repository at this point in the history
Fixes #11995

If a Server is marking a Pod's port as opaque and then the Server's podSelector is updated to no longer select that Pod, then the Pod's port should no longer be marked as opaque. However, this update does not result in any updates from the destination API's Get stream and the port remains marked as opaque.

We fix this by updating the endpoint watcher's handling of Server updates to consider both the old and the new Server.

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong committed Feb 17, 2024
1 parent 53b5702 commit 5fbca8a
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 43 deletions.
95 changes: 94 additions & 1 deletion controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0"
const clusterIPOpaque = "172.17.12.1"
const podIP1 = "172.17.0.12"
Expand Down Expand Up @@ -133,6 +134,98 @@ func TestGet(t *testing.T) {
}
})

t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
server, client := getServerWithClient(t)
defer server.clusterStore.UnregisterGauges()

stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)

path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80)

// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
errs <- err
}
}()

select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
}

// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta1().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta1().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}

// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"}

_, err = client.ServerV1beta1().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
})

t.Run("Remote discovery", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
Expand Down Expand Up @@ -638,7 +731,7 @@ func TestGetProfiles(t *testing.T) {
}

// Server is created, setting the port to opaque
(*l5dClient).ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{
l5dClient.ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{
ObjectMeta: metav1.ObjectMeta{
Name: "srv-hostport-mapping-2",
Namespace: "ns",
Expand Down
28 changes: 27 additions & 1 deletion controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func makeServer(t *testing.T) *server {
return srv
}

func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) {
func getServerWithClient(t *testing.T) (*server, l5dcrdclient.Interface) {
meshedPodResources := []string{`
apiVersion: v1
kind: Namespace
Expand Down Expand Up @@ -299,6 +299,32 @@ status:
policyResources := []string{
`
apiVersion: v1
kind: Service
metadata:
name: policy-test
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.2
ports:
- port: 80`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: policy-test
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.16
targetRef:
kind: Pod
name: pod-policyResources
namespace: ns
ports:
- port: 80`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
Expand Down
94 changes: 55 additions & 39 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,14 @@ func (ew *EndpointsWatcher) addServer(obj interface{}) {
defer ew.Unlock()
server := obj.(*v1beta1.Server)
for _, sp := range ew.publishers {
sp.updateServer(server, true)
sp.updateServer(nil, server)
}
}

func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) {
ew.Lock()
defer ew.Unlock()

oldServer := oldObj.(*v1beta1.Server)
newServer := newObj.(*v1beta1.Server)
oldUpdated := latestUpdated(oldServer.ManagedFields)
Expand All @@ -542,15 +545,17 @@ func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{})
serverInformerLag.Observe(delta.Seconds())
}

ew.addServer(newObj)
for _, sp := range ew.publishers {
sp.updateServer(oldServer, newServer)
}
}

func (ew *EndpointsWatcher) deleteServer(obj interface{}) {
ew.Lock()
defer ew.Unlock()
server := obj.(*v1beta1.Server)
for _, sp := range ew.publishers {
sp.updateServer(server, false)
sp.updateServer(server, nil)
}
}

Expand Down Expand Up @@ -720,17 +725,12 @@ func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus
return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname)
}

func (sp *servicePublisher) updateServer(server *v1beta1.Server, isAdd bool) {
func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta1.Server) {
sp.Lock()
defer sp.Unlock()

selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
sp.log.Errorf("failed to create Selector: %s", err)
return
}
for _, pp := range sp.ports {
pp.updateServer(server, selector, isAdd)
pp.updateServer(oldServer, newServer)
}
}

Expand Down Expand Up @@ -1199,38 +1199,19 @@ func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) {

pp.metrics.setSubscribers(len(pp.listeners))
}

func (pp *portPublisher) updateServer(server *v1beta1.Server, selector labels.Selector, isAdd bool) {
func (pp *portPublisher) updateServer(oldServer, newServer *v1beta1.Server) {
updated := false
for id, address := range pp.addresses.Addresses {
if address.Pod != nil && selector.Matches(labels.Set(address.Pod.Labels)) {
var portMatch bool
switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
portMatch = true
}
case intstr.String:
for _, c := range address.Pod.Spec.Containers {
for _, p := range c.Ports {
if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}
}
}
default:
continue

if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) {
if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol {
address.OpaqueProtocol = true
} else {
address.OpaqueProtocol = false
}
if portMatch {
if isAdd && server.Spec.ProxyProtocol == opaqueProtocol {
address.OpaqueProtocol = true
} else {
address.OpaqueProtocol = false
}
if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol {
pp.addresses.Addresses[id] = address
updated = true
}
if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol {
pp.addresses.Addresses[id] = address
updated = true
}
}
}
Expand All @@ -1242,6 +1223,41 @@ func (pp *portPublisher) updateServer(server *v1beta1.Server, selector labels.Se
}
}

func (pp *portPublisher) isAddressSelected(address Address, server *v1beta1.Server) bool {
if server == nil {
return false
}

if address.Pod != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return false
}

if !selector.Matches(labels.Set(address.Pod.Labels)) {
return false
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
return true
}
case intstr.String:
for _, c := range address.Pod.Spec.Containers {
for _, p := range c.Ports {
if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
return true
}
}
}
}

}
return false
}

////////////
/// util ///
////////////
Expand Down
4 changes: 2 additions & 2 deletions controller/k8s/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ func NewFakeAPI(configs ...string) (*API, error) {

// NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like
// NewFakeAPI, but it also returns the mock client for linkerd CRDs
func NewFakeAPIWithL5dClient(configs ...string) (*API, *l5dcrdclient.Interface, error) {
func NewFakeAPIWithL5dClient(configs ...string) (*API, l5dcrdclient.Interface, error) {
clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...)
if err != nil {
return nil, nil, err
}

return NewFakeClusterScopedAPI(clientSet, l5dClientSet), &l5dClientSet, nil
return NewFakeClusterScopedAPI(clientSet, l5dClientSet), l5dClientSet, nil
}

// NewFakeClusterScopedAPI provides a mock Kubernetes API for testing.
Expand Down

0 comments on commit 5fbca8a

Please sign in to comment.