Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reflect changes to server selector in opaqueness #12031

Merged
merged 5 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 94 additions & 1 deletion controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const fullyQualifiedNameOpaque = "name3.ns.svc.mycluster.local"
const fullyQualifiedNameOpaqueService = "name4.ns.svc.mycluster.local"
const fullyQualifiedNameSkipped = "name5.ns.svc.mycluster.local"
const fullyQualifiedPodDNS = "pod-0.statefulset-svc.ns.svc.mycluster.local"
const fullyQualifiedNamePolicy = "policy-test.ns.svc.mycluster.local"
const clusterIP = "172.17.12.0"
const clusterIPv6 = "2001:db8::88"
const clusterIPOpaque = "172.17.12.1"
Expand Down Expand Up @@ -158,6 +159,98 @@ func TestGet(t *testing.T) {
}
})

t.Run("Return endpoint opaque protocol controlled by a server", func(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add the same test for external workloads ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great question. I tried this and then spent forever racking my brain trying to figure out why it wasn't working. it turns out the server_tests all run with the endpoint_watcher in Endpoints mode, not EndpointSlices mode, and external workloads are not compatible with Endpoints.

Given that EndpointSlices mode is the default, we'll want to rewrite these tests to use it, but not in this PR.

server, client := getServerWithClient(t)
defer server.clusterStore.UnregisterGauges()

stream := &bufferingGetStream{
updates: make(chan *pb.Update, 50),
MockServerStream: util.NewMockServerStream(),
}
defer stream.Cancel()
errs := make(chan error)

path := fmt.Sprintf("%s:%d", fullyQualifiedNamePolicy, 80)

// server.Get blocks until the grpc stream is complete so we call it
// in a goroutine and watch stream.updates for updates.
go func() {
err := server.Get(&pb.GetDestination{
Scheme: "k8s",
Path: path,
}, stream)
if err != nil {
errs <- err
}
}()

select {
case err := <-errs:
t.Fatalf("Got error: %s", err)
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
}

// Update the Server's pod selector so that it no longer selects the
// pod. This should result in the proxy protocol no longer being marked
// as opaque.
srv, err := client.ServerV1beta2().Servers("ns").Get(context.Background(), "srv", metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// PodSelector is updated to NOT select the pod
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "FOOBAR"}
_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() != nil {
t.Fatalf("Expected opaque transport to be nil for %s but was %+v", path, *addrs[0].GetProtocolHint().GetOpaqueTransport())
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}

// Update the Server's pod selector so that it once again selects the
// pod. This should result in the proxy protocol once again being marked
// as opaque.
srv.Spec.PodSelector.MatchLabels = map[string]string{"app": "policy-test"}

_, err = client.ServerV1beta2().Servers("ns").Update(context.Background(), srv, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

select {
case update := <-stream.updates:
addrs := update.GetAdd().Addrs
if len(addrs) == 0 {
t.Fatalf("Expected len(addrs) to be > 0")
}

if addrs[0].GetProtocolHint().GetOpaqueTransport() == nil {
t.Fatalf("Expected opaque transport for %s but was nil", path)
}
case err := <-errs:
t.Fatalf("Got error: %s", err)
}
})

t.Run("Remote discovery", func(t *testing.T) {
server := makeServer(t)
defer server.clusterStore.UnregisterGauges()
Expand Down Expand Up @@ -863,7 +956,7 @@ func TestGetProfiles(t *testing.T) {
}

// Server is created, setting the port to opaque
(*l5dClient).ServerV1beta2().Servers("ns").Create(context.Background(), &v1beta2.Server{
l5dClient.ServerV1beta2().Servers("ns").Create(context.Background(), &v1beta2.Server{
ObjectMeta: metav1.ObjectMeta{
Name: "srv-hostport-mapping-2",
Namespace: "ns",
Expand Down
63 changes: 62 additions & 1 deletion controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func makeServer(t *testing.T) *server {
return srv
}

func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) {
func getServerWithClient(t *testing.T) (*server, l5dcrdclient.Interface) {
meshedPodResources := []string{`
apiVersion: v1
kind: Namespace
Expand Down Expand Up @@ -303,6 +303,32 @@ status:
policyResources := []string{
`
apiVersion: v1
kind: Service
metadata:
name: policy-test
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.2
ports:
- port: 80`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: policy-test
namespace: ns
subsets:
- addresses:
- ip: 172.17.0.16
targetRef:
kind: Pod
name: pod-policyResources
namespace: ns
ports:
- port: 80`,
`
apiVersion: v1
kind: Pod
metadata:
labels:
Expand Down Expand Up @@ -337,6 +363,15 @@ spec:
podSelector:
matchLabels:
app: policy-test
port: 80
proxyProtocol: opaque`,
`
apiVersion: policy.linkerd.io/v1beta2
kind: Server
metadata:
name: srv-external-workload
namespace: ns
spec:
externalWorkloadSelector:
matchLabels:
app: external-workload-policy-test
Expand Down Expand Up @@ -491,6 +526,32 @@ spec:
status:
conditions:
ready: true`,
`
apiVersion: v1
kind: Service
metadata:
name: policy-test-external-workload
namespace: ns
spec:
type: LoadBalancer
clusterIP: 172.17.12.3
ports:
- port: 80`,
`
apiVersion: v1
kind: Endpoints
metadata:
name: policy-test-external-workload
namespace: ns
subsets:
- addresses:
- ip: 200.1.1.2
targetRef:
kind: ExternalWorkload
name: policy-test-workload
namespace: ns
ports:
- port: 80`,
}
extenalNameResources := []string{
`
Expand Down
137 changes: 71 additions & 66 deletions controller/api/destination/watcher/endpoints_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,14 @@ func (ew *EndpointsWatcher) addServer(obj interface{}) {
defer ew.Unlock()
server := obj.(*v1beta2.Server)
for _, sp := range ew.publishers {
sp.updateServer(server, true)
sp.updateServer(nil, server)
}
}

func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) {
ew.Lock()
defer ew.Unlock()

oldServer := oldObj.(*v1beta2.Server)
newServer := newObj.(*v1beta2.Server)
oldUpdated := latestUpdated(oldServer.ManagedFields)
Expand All @@ -525,15 +528,17 @@ func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{})
serverInformerLag.Observe(delta.Seconds())
}

ew.addServer(newObj)
for _, sp := range ew.publishers {
sp.updateServer(oldServer, newServer)
}
}

func (ew *EndpointsWatcher) deleteServer(obj interface{}) {
ew.Lock()
defer ew.Unlock()
server := obj.(*v1beta2.Server)
for _, sp := range ew.publishers {
sp.updateServer(server, false)
sp.updateServer(server, nil)
}
}

Expand Down Expand Up @@ -703,12 +708,12 @@ func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus
return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname)
}

func (sp *servicePublisher) updateServer(server *v1beta2.Server, isAdd bool) {
func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta2.Server) {
sp.Lock()
defer sp.Unlock()

for _, pp := range sp.ports {
pp.updateServer(server, isAdd)
pp.updateServer(oldServer, newServer)
}
}

Expand Down Expand Up @@ -1237,70 +1242,12 @@ func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) {

pp.metrics.setSubscribers(len(pp.listeners))
}
func (pp *portPublisher) updateServer(server *v1beta2.Server, isAdd bool) {
func (pp *portPublisher) updateServer(oldServer, newServer *v1beta2.Server) {
updated := false
for id, address := range pp.addresses.Addresses {
portMatch := false
if address.Pod != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return
}

if !selector.Matches(labels.Set(address.Pod.Labels)) {
continue
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
portMatch = true
}
case intstr.String:
for _, c := range address.Pod.Spec.Containers {
for _, p := range c.Ports {
if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}
}
}
default:
continue
}

} else if address.ExternalWorkload != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return
}

if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
continue
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
portMatch = true
}
case intstr.String:
for _, p := range address.ExternalWorkload.Spec.Ports {
if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
portMatch = true
}
}
default:
continue
}

} else {
continue
}

if portMatch {
if isAdd && server.Spec.ProxyProtocol == opaqueProtocol {
if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) {
if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we maybe simplify this by saying:

if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) {
   if pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol {
   }
}

iiuc isAddressSelected will return false if the server it is applied to is nil so it seems redundant to check it in this if condition too.

address.OpaqueProtocol = true
} else {
address.OpaqueProtocol = false
Expand All @@ -1319,6 +1266,64 @@ func (pp *portPublisher) updateServer(server *v1beta2.Server, isAdd bool) {
}
}

func (pp *portPublisher) isAddressSelected(address Address, server *v1beta2.Server) bool {
if server == nil {
return false
}

if address.Pod != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return false
}

if !selector.Matches(labels.Set(address.Pod.Labels)) {
return false
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
return true
}
case intstr.String:
for _, c := range address.Pod.Spec.Containers {
for _, p := range c.Ports {
if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
return true
}
}
}
}

} else if address.ExternalWorkload != nil {
selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
if err != nil {
pp.log.Errorf("failed to create Selector: %s", err)
return false
}

if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
return false
}

switch server.Spec.Port.Type {
case intstr.Int:
if server.Spec.Port.IntVal == int32(address.Port) {
return true
}
case intstr.String:
for _, p := range address.ExternalWorkload.Spec.Ports {
if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
return true
}
}
}
}
return false
}

////////////
/// util ///
////////////
Expand Down
Loading
Loading