Skip to content

Commit

Permalink
Upgrade test: peering control plane traffic through mesh gateway (#16091
Browse files Browse the repository at this point in the history
)
  • Loading branch information
huikang authored Jan 27, 2023
1 parent 5fa9ab2 commit ffb8178
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 50 deletions.
114 changes: 108 additions & 6 deletions test/integration/consul-container/libs/assert/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package assert

import (
"fmt"
"io"
"net/url"
"strconv"
"strings"
"testing"
"time"

"github.com/hashicorp/go-cleanhttp"

"github.com/hashicorp/consul/sdk/testutil/retry"
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -24,7 +28,7 @@ func GetEnvoyListenerTCPFilters(t *testing.T, adminPort int) {
}

retry.RunWith(failer(), t, func(r *retry.R) {
dump, err = libservice.GetEnvoyConfigDump(adminPort, "")
dump, err = GetEnvoyOutput(adminPort, "config_dump", map[string]string{})
if err != nil {
r.Fatal("could not fetch envoy configuration")
}
Expand Down Expand Up @@ -61,18 +65,86 @@ func AssertUpstreamEndpointStatus(t *testing.T, adminPort int, clusterName, heal
}

retry.RunWith(failer(), t, func(r *retry.R) {
clusters, err = libservice.GetEnvoyClusters(adminPort)
clusters, err = GetEnvoyOutput(adminPort, "clusters", map[string]string{"format": "json"})
if err != nil {
r.Fatal("could not fetch envoy configuration")
r.Fatal("could not fetch envoy clusters")
}

filter := fmt.Sprintf(`.cluster_statuses[] | select(.name|contains("%s")) | [.host_statuses[].health_status.eds_health_status] | [select(.[] == "%s")] | length`, clusterName, healthStatus)
results, err := utils.JQFilter(clusters, filter)
require.NoError(r, err, "could not parse envoy configuration")
require.NoErrorf(r, err, "could not found cluster name %s", clusterName)
require.Equal(r, count, len(results))
})
}

// AssertEnvoyMetricAtMost assert the filered metric by prefix and metric is >= count
func AssertEnvoyMetricAtMost(t *testing.T, adminPort int, prefix, metric string, count int) {
var (
stats string
err error
)
failer := func() *retry.Timer {
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
}

retry.RunWith(failer(), t, func(r *retry.R) {
stats, err = GetEnvoyOutput(adminPort, "stats", nil)
if err != nil {
r.Fatal("could not fetch envoy stats")
}
lines := strings.Split(stats, "\n")
err = processMetrics(lines, prefix, metric, func(v int) bool {
return v <= count
})
require.NoError(r, err)
})
}

func processMetrics(metrics []string, prefix, metric string, condition func(v int) bool) error {
for _, line := range metrics {
if strings.Contains(line, prefix) &&
strings.Contains(line, metric) {

metric := strings.Split(line, ":")
fmt.Println(metric[1])

v, err := strconv.Atoi(strings.TrimSpace(metric[1]))
if err != nil {
return fmt.Errorf("err parse metric value %s: %s", metric[1], err)
}

if condition(v) {
return nil
}
}
}
return fmt.Errorf("error processing stats")
}

// AssertEnvoyMetricAtLeast assert the filered metric by prefix and metric is <= count
func AssertEnvoyMetricAtLeast(t *testing.T, adminPort int, prefix, metric string, count int) {
var (
stats string
err error
)
failer := func() *retry.Timer {
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
}

retry.RunWith(failer(), t, func(r *retry.R) {
stats, err = GetEnvoyOutput(adminPort, "stats", nil)
if err != nil {
r.Fatal("could not fetch envoy stats")
}
lines := strings.Split(stats, "\n")

err = processMetrics(lines, prefix, metric, func(v int) bool {
return v >= count
})
require.NoError(r, err)
})
}

// GetEnvoyHTTPrbacFilters validates that proxy was configured with an http connection manager
// this assertion is currently unused current tests use http protocol
func GetEnvoyHTTPrbacFilters(t *testing.T, port int) {
Expand All @@ -85,7 +157,7 @@ func GetEnvoyHTTPrbacFilters(t *testing.T, port int) {
}

retry.RunWith(failer(), t, func(r *retry.R) {
dump, err = libservice.GetEnvoyConfigDump(port, "")
dump, err = GetEnvoyOutput(port, "config_dump", map[string]string{})
if err != nil {
r.Fatal("could not fetch envoy configuration")
}
Expand Down Expand Up @@ -117,3 +189,33 @@ func sanitizeResult(s string) []string {
result := strings.Split(strings.ReplaceAll(s, `,`, " "), " ")
return append(result[:0], result[1:]...)
}

func GetEnvoyOutput(port int, path string, query map[string]string) (string, error) {
client := cleanhttp.DefaultClient()
var u url.URL
u.Host = fmt.Sprintf("localhost:%d", port)
u.Scheme = "http"
if path != "" {
u.Path = path
}
q := u.Query()
for k, v := range query {
q.Add(k, v)
}
if query != nil {
u.RawQuery = q.Encode()
}

res, err := client.Get(u.String())
if err != nil {
return "", err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return "", err
}

return string(body), nil
}
4 changes: 4 additions & 0 deletions test/integration/consul-container/libs/service/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (g ConnectContainer) GetAddr() (string, int) {
return g.ip, g.appPort
}

func (g ConnectContainer) Restart() error {
return fmt.Errorf("Restart Unimplemented by ConnectContainer")
}

func (g ConnectContainer) GetLogs() (string, error) {
rc, err := g.container.Logs(context.Background())
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions test/integration/consul-container/libs/service/examples.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (g exampleContainer) GetAddr() (string, int) {
return g.ip, g.httpPort
}

func (g exampleContainer) Restart() error {
return fmt.Errorf("Restart Unimplemented by ConnectContainer")
}

func (g exampleContainer) GetLogs() (string, error) {
rc, err := g.container.Logs(context.Background())
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions test/integration/consul-container/libs/service/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ func (g gatewayContainer) GetAdminAddr() (string, int) {
return "localhost", g.adminPort
}

func (g gatewayContainer) Restart() error {
_, err := g.container.State(context.Background())
if err != nil {
return fmt.Errorf("error get gateway state %s", err)
}

err = g.container.Stop(context.Background(), nil)
if err != nil {
return fmt.Errorf("error stop gateway %s", err)
}
err = g.container.Start(context.Background())
if err != nil {
return fmt.Errorf("error start gateway %s", err)
}
return nil
}

func NewGatewayService(ctx context.Context, name string, kind string, node libcluster.Agent) (Service, error) {
nodeConfig := node.GetConfig()
if nodeConfig.ScratchDir == "" {
Expand Down
39 changes: 0 additions & 39 deletions test/integration/consul-container/libs/service/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package service
import (
"context"
"fmt"
"io"

"github.com/hashicorp/go-cleanhttp"

"github.com/hashicorp/consul/api"

Expand Down Expand Up @@ -124,39 +121,3 @@ func CreateAndRegisterStaticClientSidecar(

return clientConnectProxy, nil
}

func GetEnvoyConfigDump(port int, filter string) (string, error) {
client := cleanhttp.DefaultClient()
url := fmt.Sprintf("http://localhost:%d/config_dump?%s", port, filter)

res, err := client.Get(url)
if err != nil {
return "", err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return "", err
}

return string(body), nil
}

func GetEnvoyClusters(port int) (string, error) {
client := cleanhttp.DefaultClient()
url := fmt.Sprintf("http://localhost:%d/clusters?format=json", port)

res, err := client.Get(url)
if err != nil {
return "", err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return "", err
}

return string(body), nil
}
1 change: 1 addition & 0 deletions test/integration/consul-container/libs/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ type Service interface {
GetServiceName() string
Start() (err error)
Terminate() error
Restart() error
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type BuiltCluster struct {
Context *libcluster.BuildContext
Service libservice.Service
Container *libservice.ConnectContainer
Gateway libservice.Service
}

// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of
Expand All @@ -50,6 +51,7 @@ func BasicPeeringTwoClustersSetup(

// Register an static-server service in acceptingCluster and export to dialing cluster
var serverSidecarService libservice.Service
var acceptingClusterGateway libservice.Service
{
clientNode := acceptingCluster.Clients()[0]

Expand All @@ -62,10 +64,15 @@ func BasicPeeringTwoClustersSetup(
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")

require.NoError(t, serverSidecarService.Export("default", AcceptingPeerName, acceptingClient))

// Create the mesh gateway for dataplane traffic
acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
require.NoError(t, err)
}

// Register an static-client service in dialing cluster and set upstream to static-server service
var clientSidecarService *libservice.ConnectContainer
var dialingClusterGateway libservice.Service
{
clientNode := dialingCluster.Clients()[0]

Expand All @@ -75,6 +82,10 @@ func BasicPeeringTwoClustersSetup(
require.NoError(t, err)

libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy")

// Create the mesh gateway for dataplane traffic
dialingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
require.NoError(t, err)
}

_, adminPort := clientSidecarService.GetAdminAddr()
Expand All @@ -87,12 +98,14 @@ func BasicPeeringTwoClustersSetup(
Context: acceptingCtx,
Service: serverSidecarService,
Container: nil,
Gateway: acceptingClusterGateway,
},
&BuiltCluster{
Cluster: dialingCluster,
Context: dialingCtx,
Service: nil,
Container: clientSidecarService,
Gateway: dialingClusterGateway,
}
}

Expand Down Expand Up @@ -204,9 +217,5 @@ func NewPeeringCluster(
require.NoError(t, err)
require.True(t, ok)

// Create the mesh gateway for dataplane traffic
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
require.NoError(t, err)

return cluster, ctx, client
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func verifySidecarHasTwoRootCAs(t *testing.T, sidecar libservice.Service) {
}

retry.RunWith(failer(), t, func(r *retry.R) {
dump, err := libservice.GetEnvoyConfigDump(adminPort, "include_eds")
dump, err := libassert.GetEnvoyOutput(adminPort, "config_dump", map[string]string{})
require.NoError(r, err, "could not fetch envoy configuration")

// Make sure there are two certs in the sidecar
Expand Down
Loading

0 comments on commit ffb8178

Please sign in to comment.