Skip to content

Commit

Permalink
Reflect changes to server selector in opaqueness (#12031)
Browse files Browse the repository at this point in the history
Fixes #11995

If a Server is marking a Pod's port as opaque and then the Server's podSelector is updated to no longer select that Pod, then the Pod's port should no longer be marked as opaque. However, this update does not result in any updates from the destination API's Get stream and the port remains marked as opaque.

We fix this by updating the endpoint watcher's handling of Server updates to consider both the old and the new Server.

Signed-off-by: Alex Leong <[email protected]>
  • Loading branch information
adleong authored Feb 5, 2024
1 parent f6bf419 commit 53287d4
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 70 deletions.
95 changes: 94 additions & 1 deletion controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0"
const clusterIPv6 = "2001:db8::88"
const clusterIPOpaque = "172.17.12.1"
Expand Down Expand Up @@ -158,6 +159,98 @@ func TestGet(t *testing.T) {
}
})

t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
server, client := getServerWithClient(t)
defer server.clusterStore.UnregisterGauges()

stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)

path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80)

// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
errs <- err
}
}()

select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
}

// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}

// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"}

_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
})

t.Run("Remote discovery", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
Expand Down Expand Up @@ -863,7 +956,7 @@ func TestGetProfiles(t *testing.T) {
}

// Server is created, setting the port to opaque
(*l5dClient).ServerV1beta2().Servers("ns").Create(context.Background(), &v1beta2.Server{
l5dClient.ServerV1beta2().Servers("ns").Create(context.Background(), &v1beta2.Server{
ObjectMeta: metav1.ObjectMeta{
Name: "srv-hostport-mapping-2",
Namespace: "ns",
Expand Down
63 changes: 62 additions & 1 deletion controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func makeServer(t *testing.T) *server {
return srv
}

func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) {
func getServerWithClient(t *testing.T) (*server, l5dcrdclient.Interface) {
meshedPodResources := []string{`
apiVersion: v1
kind: Namespace
Expand Down Expand Up @@ -303,6 +303,32 @@ status:
policyResources := []string{
`
apiVersion: v1
kind: Service
metadata:
name: policy-test
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.2
ports:
- port: 80`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: policy-test
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.16
targetRef:
kind: Pod
name: pod-policyResources
namespace: ns
ports:
- port: 80`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
Expand Down Expand Up @@ -337,6 +363,15 @@ spec:
podSelector:
matchLabels:
app: policy-test
port: 80
proxyProtocol: opaque`,
`
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: srv-external-workload
namespace: ns
spec:
externalWorkloadSelector:
matchLabels:
app: external-workload-policy-test
Expand Down Expand Up @@ -491,6 +526,32 @@ spec:
status:
conditions:
ready: true`,
`
apiVersion: v1
kind: Service
metadata:
name: policy-test-external-workload
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.3
ports:
- port: 80`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: policy-test-external-workload
namespace: ns
subsets:
- addresses:
- ip: 200.1.1.2
targetRef:
kind: ExternalWorkload
name: policy-test-workload
namespace: ns
ports:
- port: 80`,
}
extenalNameResources := []string{
`
Expand Down
137 changes: 71 additions & 66 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,14 @@ func (ew *EndpointsWatcher) addServer(obj interface{}) {
defer ew.Unlock()
server := obj.(*v1beta2.Server)
for _, sp := range ew.publishers {
sp.updateServer(server, true)
sp.updateServer(nil, server)
}
}

func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) {
ew.Lock()
defer ew.Unlock()

oldServer := oldObj.(*v1beta2.Server)
newServer := newObj.(*v1beta2.Server)
oldUpdated := latestUpdated(oldServer.ManagedFields)
Expand All @@ -525,15 +528,17 @@ func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{})
serverInformerLag.Observe(delta.Seconds())
}

ew.addServer(newObj)
for _, sp := range ew.publishers {
sp.updateServer(oldServer, newServer)
}
}

func (ew *EndpointsWatcher) deleteServer(obj interface{}) {
ew.Lock()
defer ew.Unlock()
server := obj.(*v1beta2.Server)
for _, sp := range ew.publishers {
sp.updateServer(server, false)
sp.updateServer(server, nil)
}
}

Expand Down Expand Up @@ -703,12 +708,12 @@ func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus
return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname)
}

func (sp *servicePublisher) updateServer(server *v1beta2.Server, isAdd bool) {
func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta2.Server) {
sp.Lock()
defer sp.Unlock()

for _, pp := range sp.ports {
pp.updateServer(server, isAdd)
pp.updateServer(oldServer, newServer)
}
}

Expand Down Expand Up @@ -1237,70 +1242,12 @@ func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) {

pp.metrics.setSubscribers(len(pp.listeners))
}
func (pp *portPublisher) updateServer(server *v1beta2.Server, isAdd bool) {
func (pp *portPublisher) updateServer(oldServer, newServer *v1beta2.Server) {
updated := false
for id, address := range pp.addresses.Addresses {
portMatch := false
if address.Pod != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return
}

if !selector.Matches(labels.Set(address.Pod.Labels)) {
continue
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
portMatch = true
}
case intstr.String:
for _, c := range address.Pod.Spec.Containers {
for _, p := range c.Ports {
if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}
}
}
default:
continue
}

} else if address.ExternalWorkload != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return
}

if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
continue
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
portMatch = true
}
case intstr.String:
for _, p := range address.ExternalWorkload.Spec.Ports {
if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}
}
default:
continue
}

} else {
continue
}

if portMatch {
if isAdd && server.Spec.ProxyProtocol == opaqueProtocol {
if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) {
if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol {
address.OpaqueProtocol = true
} else {
address.OpaqueProtocol = false
Expand All @@ -1319,6 +1266,64 @@ func (pp *portPublisher) updateServer(server *v1beta2.Server, isAdd bool) {
}
}

func (pp *portPublisher) isAddressSelected(address Address, server *v1beta2.Server) bool {
if server == nil {
return false
}

if address.Pod != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return false
}

if !selector.Matches(labels.Set(address.Pod.Labels)) {
return false
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
return true
}
case intstr.String:
for _, c := range address.Pod.Spec.Containers {
for _, p := range c.Ports {
if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
return true
}
}
}
}

} else if address.ExternalWorkload != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return false
}

if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
return false
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
return true
}
case intstr.String:
for _, p := range address.ExternalWorkload.Spec.Ports {
if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
return true
}
}
}
}
return false
}

////////////
/// util ///
////////////
Expand Down
Loading

0 comments on commit 53287d4

Please sign in to comment.