Skip to content

Commit

Permalink
Extend unit test for HostPort subscriptions (#11439)
Browse files Browse the repository at this point in the history
Followup to #11334 (comment)

This extends the test introduced in #11334 to excercise upgrading a
Server associated to a pod's HostPort, and observing how the stream
updates the OpaqueProtocol field.

Helper functions were refactored a bit to allow retrieving the
l5dCRDClientSet used when building the fake API.
  • Loading branch information
alpeb authored Oct 2, 2023
1 parent 30ecb57 commit c67985d
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 24 deletions.
108 changes: 89 additions & 19 deletions controller/api/destination/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"github.com/linkerd/linkerd2-proxy-api/go/net"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/api/util"
"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta1"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/testutil"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

const fullyQualifiedName = "name1.ns.svc.mycluster.local"
Expand Down Expand Up @@ -170,7 +173,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Returns server profile", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedName, port, "ns:other")
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedName, port, "ns:other")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedName {
Expand All @@ -189,7 +193,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return service profile when using json token", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedName, port, `{"ns":"other"}`)
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"other"}`)
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedName {
Expand All @@ -204,7 +209,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Returns client profile", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedName, port, `{"ns":"client-ns"}`)
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedName, port, `{"ns":"client-ns"}`)
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
routes := profile.GetRoutes()
Expand All @@ -219,7 +225,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile when using cluster IP", func(t *testing.T) {
stream, server := profileStream(t, clusterIP, port, "")
server := makeServer(t)
stream := profileStream(t, server, clusterIP, port, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedName {
Expand All @@ -237,7 +244,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedPodDNS, port, "ns:ns")
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedPodDNS, port, "ns:ns")
defer stream.Cancel()

epAddr, err := toAddress(podIPStatefulSet, port)
Expand Down Expand Up @@ -277,7 +285,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with endpoint when using pod IP", func(t *testing.T) {
stream, server := profileStream(t, podIP1, port, "ns:ns")
server := makeServer(t)
stream := profileStream(t, server, podIP1, port, "ns:ns")
defer stream.Cancel()

epAddr, err := toAddress(podIP1, port)
Expand Down Expand Up @@ -317,7 +326,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) {
stream, server := profileStream(t, "172.0.0.0", 1234, "")
server := makeServer(t)
stream := profileStream(t, server, "172.0.0.0", 1234, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.RetryBudget == nil {
Expand All @@ -328,7 +338,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with no protocol hint when pod does not have label", func(t *testing.T) {
stream, server := profileStream(t, podIP2, port, "")
server := makeServer(t)
stream := profileStream(t, server, podIP2, port, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.Endpoint == nil {
Expand All @@ -342,7 +353,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) {
stream, server := profileStream(t, clusterIPOpaque, opaquePort, "")
server := makeServer(t)
stream := profileStream(t, server, clusterIPOpaque, opaquePort, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedNameOpaque {
Expand All @@ -356,7 +368,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) {
stream, server := profileStream(t, podIPOpaque, opaquePort, "")
server := makeServer(t)
stream := profileStream(t, server, podIPOpaque, opaquePort, "")
defer stream.Cancel()

epAddr, err := toAddress(podIPOpaque, opaquePort)
Expand Down Expand Up @@ -396,7 +409,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return opaque protocol profile when using service name with opaque port annotation", func(t *testing.T) {
stream, server := profileStream(t, fullyQualifiedNameOpaqueService, opaquePort, "")
server := makeServer(t)
stream := profileStream(t, server, fullyQualifiedNameOpaqueService, opaquePort, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.FullyQualifiedName != fullyQualifiedNameOpaqueService {
Expand All @@ -410,7 +424,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) {
stream, server := profileStream(t, podIPSkipped, skippedPort, "")
server := makeServer(t)
stream := profileStream(t, server, podIPSkipped, skippedPort, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
addr := profile.GetEndpoint()
Expand All @@ -428,7 +443,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) {
stream, server := profileStream(t, podIPPolicy, 80, "")
server := makeServer(t)
stream := profileStream(t, server, podIPPolicy, 80, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.Endpoint == nil {
Expand All @@ -448,7 +464,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) {
stream, server := profileStream(t, externalIP, 3306, "")
server := makeServer(t)
stream := profileStream(t, server, externalIP, 3306, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if !profile.OpaqueProtocol {
Expand All @@ -459,7 +476,8 @@ func TestGetProfiles(t *testing.T) {
})

t.Run("Return profile with non-opaque protocol when using an arbitrary port with an external IP", func(t *testing.T) {
stream, server := profileStream(t, externalIP, 80, "")
server := makeServer(t)
stream := profileStream(t, server, externalIP, 80, "")
defer stream.Cancel()
profile := assertSingleProfile(t, stream.Updates())
if profile.OpaqueProtocol {
Expand All @@ -472,7 +490,8 @@ func TestGetProfiles(t *testing.T) {
t.Run("Return profile for host port pods", func(t *testing.T) {
hostPort := uint32(7777)
containerPort := uint32(80)
stream, server := profileStream(t, externalIP, hostPort, "")
server, l5dClient := getServerWithClient(t)
stream := profileStream(t, server, externalIP, hostPort, "")
defer stream.Cancel()

// HostPort maps to pod.
Expand Down Expand Up @@ -517,9 +536,21 @@ func TestGetProfiles(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "hostport-mapping-2",
Namespace: "ns",
Labels: map[string]string{
"app": "hostport-mapping-2",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: pkgk8s.ProxyContainerName,
Env: []corev1.EnvVar{
{
Name: "LINKERD2_PROXY_INBOUND_LISTEN_ADDR",
Value: "0.0.0.0:4143",
},
},
},
{
Name: "nginx",
Image: "nginx",
Expand Down Expand Up @@ -566,6 +597,46 @@ func TestGetProfiles(t *testing.T) {
if dstPod != "hostport-mapping-2" {
t.Fatalf("Expected dst_pod to be %s got %s", "hostport-mapping-2", dstPod)
}
if profile.OpaqueProtocol {
t.Fatal("Expected OpaqueProtocol=false")
}

// Server is created, setting the port to opaque
(*l5dClient).ServerV1beta1().Servers("ns").Create(context.Background(), &v1beta1.Server{
ObjectMeta: metav1.ObjectMeta{
Name: "srv-hostport-mapping-2",
Namespace: "ns",
},
Spec: v1beta1.ServerSpec{
PodSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "hostport-mapping-2",
},
},
Port: intstr.IntOrString{
Type: intstr.String,
StrVal: "nginx-7777",
},
ProxyProtocol: "opaque",
},
}, metav1.CreateOptions{})

var updates []*pb.DestinationProfile
err = testutil.RetryFor(time.Second*10, func() error {
updates = stream.Updates()
if len(updates) < 4 {
return fmt.Errorf("expected 4 updates, got %d", len(updates))
}
return nil
})
if err != nil {
t.Fatal(err)
}

profile = stream.Updates()[3]
if !profile.OpaqueProtocol {
t.Fatal("Expected OpaqueProtocol=true")
}

server.clusterStore.UnregisterGauges()
})
Expand Down Expand Up @@ -713,10 +784,9 @@ func assertSingleUpdate(t *testing.T, updates []*pb.Update) *pb.Update {
return updates[0]
}

func profileStream(t *testing.T, host string, port uint32, token string) (*bufferingGetProfileStream, *server) {
func profileStream(t *testing.T, server *server, host string, port uint32, token string) *bufferingGetProfileStream {
t.Helper()

server := makeServer(t)
stream := &bufferingGetProfileStream{
updates: []*pb.DestinationProfile{},
MockServerStream: util.NewMockServerStream(),
Expand All @@ -735,5 +805,5 @@ func profileStream(t *testing.T, host string, port uint32, token string) (*buffe
// Give GetProfile some slack
time.Sleep(50 * time.Millisecond)

return stream, server
return stream
}
12 changes: 9 additions & 3 deletions controller/api/destination/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/api/util"
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
"github.com/linkerd/linkerd2/controller/k8s"
logging "github.com/sirupsen/logrus"
)

func makeServer(t *testing.T) *server {
srv, _ := getServerWithClient(t)
return srv
}

func getServerWithClient(t *testing.T) (*server, *l5dcrdclient.Interface) {
meshedPodResources := []string{`
apiVersion: v1
kind: Namespace
Expand Down Expand Up @@ -445,9 +451,9 @@ spec:
res = append(res, hostPortMapping...)
res = append(res, mirrorServiceResources...)
res = append(res, destinationCredentialsResources...)
k8sAPI, err := k8s.NewFakeAPI(res...)
k8sAPI, l5dClient, err := k8s.NewFakeAPIWithL5dClient(res...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
t.Fatalf("NewFakeAPIWithL5dClient returned an error: %s", err)
}
metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
if err != nil {
Expand Down Expand Up @@ -513,7 +519,7 @@ spec:
metadataAPI,
log,
make(<-chan struct{}),
}
}, l5dClient
}

type bufferingGetStream struct {
Expand Down
22 changes: 20 additions & 2 deletions controller/k8s/test_helper.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package k8s

import (
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
"github.com/linkerd/linkerd2/pkg/k8s"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
clientsetscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/metadata/fake"
)
Expand All @@ -16,10 +18,26 @@ func NewFakeAPI(configs ...string) (*API, error) {
return nil, err
}

return NewFakeClusterScopedAPI(clientSet, spClientSet), nil
}

// NewFakeAPIWithL5dClient provides a mock Kubernetes API for testing like
// NewFakeAPI, but it also returns the mock client for linkerd CRDs
func NewFakeAPIWithL5dClient(configs ...string) (*API, *l5dcrdclient.Interface, error) {
clientSet, _, _, l5dClientSet, err := k8s.NewFakeClientSets(configs...)
if err != nil {
return nil, nil, err
}

return NewFakeClusterScopedAPI(clientSet, l5dClientSet), &l5dClientSet, nil
}

// NewFakeClusterScopedAPI provides a mock Kubernetes API for testing.
func NewFakeClusterScopedAPI(clientSet kubernetes.Interface, l5dClientSet l5dcrdclient.Interface) *API {
return NewClusterScopedAPI(
clientSet,
nil,
spClientSet,
l5dClientSet,
"fake",
CJ,
CM,
Expand All @@ -39,7 +57,7 @@ func NewFakeAPI(configs ...string) (*API, error) {
ES,
Srv,
Secret,
), nil
)
}

// NewFakeMetadataAPI provides a mock Kubernetes API for testing.
Expand Down

0 comments on commit c67985d

Please sign in to comment.