Skip to content

Commit

Permalink
e2e: all server types wait for readiness
Browse files Browse the repository at this point in the history
Make both managed and unmanaged kcp servers wait for readiness.

Signed-off-by: Andy Goldstein <[email protected]>
  • Loading branch information
ncdc committed Jan 31, 2023
1 parent f2dba96 commit 650772a
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 144 deletions.
50 changes: 0 additions & 50 deletions test/e2e/framework/accessory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@ import (
"context"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"github.com/egymgmbh/go-prefix-writer/prefixer"

"k8s.io/apimachinery/pkg/util/wait"
)

// NewAccessory creates a new accessory process.
Expand Down Expand Up @@ -104,48 +99,3 @@ func (a *Accessory) Run(t *testing.T, opts ...RunOption) error {
}()
return nil
}

// Ready blocks until the server is healthy and ready.
func Ready(ctx context.Context, t *testing.T, port string) bool {
t.Helper()

wg := sync.WaitGroup{}
wg.Add(2)
for _, endpoint := range []string{"/healthz", "/readyz"} {
go func(endpoint string) {
defer wg.Done()
waitForEndpoint(ctx, t, port, endpoint)
}(endpoint)
}
wg.Wait()
return !t.Failed()
}

func waitForEndpoint(ctx context.Context, t *testing.T, port, endpoint string) {
t.Helper()

var lastError error
if err := wait.PollImmediateWithContext(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, func(ctx context.Context) (bool, error) {
url := fmt.Sprintf("http://[::1]:%s%s", port, endpoint)
resp, err := http.Get(url) //nolint:noctx
if err != nil {
lastError = fmt.Errorf("error contacting %s: %w", url, err)
return false, nil
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
lastError = fmt.Errorf("error reading response from %s: %w", url, err)
return false, nil
}
if resp.StatusCode != http.StatusOK {
lastError = fmt.Errorf("unready response from %s: %v", url, string(body))
return false, nil
}

t.Logf("success contacting %s", url)
return true, nil
}); err != nil && lastError != nil {
t.Error(lastError)
}
}
191 changes: 97 additions & 94 deletions test/e2e/framework/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ func SharedKcpServer(t *testing.T) RunningServer {
t.Logf("shared kcp server will target configuration %q", kubeconfig)
server, err := newPersistentKCPServer(serverName, kubeconfig, TestConfig.ShardKubeconfig(), filepath.Join(RepositoryDir(), ".kcp"))
require.NoError(t, err, "failed to create persistent server fixture")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
t.Cleanup(cancel)
err = WaitForReady(ctx, t, server.RootShardSystemMasterBaseConfig(t), true)
require.NoError(t, err, "error waiting for readiness")

return server
}

Expand Down Expand Up @@ -255,7 +262,14 @@ func newKcpFixture(t *testing.T, cfgs ...kcpConfig) *kcpFixture {
// Wait for the server to become ready
go func(s *kcpServer, i int) {
defer wg.Done()
err := s.Ready(!cfgs[i].RunInProcess)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := s.loadCfg()
require.NoError(t, err, "error loading config")

err = WaitForReady(ctx, t, s.RootShardSystemMasterBaseConfig(t), !cfgs[i].RunInProcess)
require.NoError(t, err, "kcp server %s never became ready: %v", s.name, err)
}(srv, i)
}
Expand Down Expand Up @@ -726,50 +740,6 @@ func (c *kcpServer) RawConfig() (clientcmdapi.Config, error) {
return c.cfg.RawConfig()
}

// Ready blocks until the server is healthy and ready. Before returning,
// goroutines are started to ensure that the test is failed if the server
// does not remain so.
func (c *kcpServer) Ready(keepMonitoring bool) error {
if err := c.loadCfg(); err != nil {
return err
}
if c.ctx.Err() != nil {
// cancelling the context will preempt derivative calls but not this
// main Ready() body, so we check before continuing that we are live
return fmt.Errorf("failed to wait for readiness: %w", c.ctx.Err())
}
cfg, err := c.config("base")
if err != nil {
return fmt.Errorf("failed to read client configuration: %w", err)
}
if cfg.NegotiatedSerializer == nil {
cfg.NegotiatedSerializer = kubernetesscheme.Codecs.WithoutConversion()
}
client, err := rest.UnversionedRESTClientFor(cfg)
if err != nil {
return fmt.Errorf("failed to create unversioned client: %w", err)
}

wg := sync.WaitGroup{}
wg.Add(2)
for _, endpoint := range []string{"/livez", "/readyz"} {
go func(endpoint string) {
defer wg.Done()
c.waitForEndpoint(client, endpoint)
}(endpoint)
}
wg.Wait()

if keepMonitoring {
for _, endpoint := range []string{"/livez", "/readyz"} {
go func(endpoint string) {
c.monitorEndpoint(client, endpoint)
}(endpoint)
}
}
return nil
}

func (c *kcpServer) loadCfg() error {
var lastError error
if err := wait.PollImmediateWithContext(c.ctx, 100*time.Millisecond, 1*time.Minute, func(ctx context.Context) (bool, error) {
Expand Down Expand Up @@ -800,55 +770,6 @@ func (c *kcpServer) loadCfg() error {
return nil
}

func (c *kcpServer) waitForEndpoint(client *rest.RESTClient, endpoint string) {
var lastError error
if err := wait.PollImmediateWithContext(c.ctx, 100*time.Millisecond, time.Minute, func(ctx context.Context) (bool, error) {
req := rest.NewRequest(client).RequestURI(endpoint)
_, err := req.Do(ctx).Raw()
if err != nil {
lastError = fmt.Errorf("error contacting %s: failed components: %v", req.URL(), unreadyComponentsFromError(err))
return false, nil
}

c.t.Logf("success contacting %s", req.URL())
return true, nil
}); err != nil && lastError != nil {
c.t.Error(lastError)
}
}

func (c *kcpServer) monitorEndpoint(client *rest.RESTClient, endpoint string) {
// we need a shorter deadline than the server, or else:
// timeout.go:135] post-timeout activity - time-elapsed: 23.784917ms, GET "/livez" result: Header called after Handler finished
ctx := c.ctx
if deadline, ok := c.t.Deadline(); ok {
deadlinedCtx, deadlinedCancel := context.WithDeadline(c.ctx, deadline.Add(-20*time.Second))
ctx = deadlinedCtx
c.t.Cleanup(deadlinedCancel) // this does not really matter but govet is upset
}
var errCount int
errs := sets.NewString()
wait.UntilWithContext(ctx, func(ctx context.Context) {
_, err := rest.NewRequest(client).RequestURI(endpoint).Do(ctx).Raw()
if errors.Is(err, context.Canceled) || c.ctx.Err() != nil {
return
}
// if we're noticing an error, record it and fail the test if things stay failed for two consecutive polls
if err != nil {
errCount++
errs.Insert(fmt.Sprintf("failed components: %v", unreadyComponentsFromError(err)))
if errCount == 2 {
c.t.Errorf("error contacting %s: %v", endpoint, errs.List())
}
}
// otherwise, reset the counters
errCount = 0
if errs.Len() > 0 {
errs = sets.NewString()
}
}, 100*time.Millisecond)
}

// there doesn't seem to be any simple way to get a metav1.Status from the Go client, so we get
// the content in a string-formatted error, unfortunately.
func unreadyComponentsFromError(err error) string {
Expand Down Expand Up @@ -1054,3 +975,85 @@ func NoGoRunEnvSet() bool {
envSet, _ := strconv.ParseBool(os.Getenv("NO_GORUN"))
return envSet
}

func WaitForReady(ctx context.Context, t *testing.T, cfg *rest.Config, keepMonitoring bool) error {
t.Logf("waiting for readiness for server at %s", cfg.Host)

cfg = rest.CopyConfig(cfg)
if cfg.NegotiatedSerializer == nil {
cfg.NegotiatedSerializer = kubernetesscheme.Codecs.WithoutConversion()
}

client, err := rest.UnversionedRESTClientFor(cfg)
if err != nil {
return fmt.Errorf("failed to create unversioned client: %w", err)
}

wg := sync.WaitGroup{}
wg.Add(2)
for _, endpoint := range []string{"/livez", "/readyz"} {
go func(endpoint string) {
defer wg.Done()
waitForEndpoint(ctx, t, client, endpoint)
}(endpoint)
}
wg.Wait()
t.Logf("server at %s is ready", cfg.Host)

if keepMonitoring {
for _, endpoint := range []string{"/livez", "/readyz"} {
go func(endpoint string) {
monitorEndpoint(ctx, t, client, endpoint)
}(endpoint)
}
}
return nil
}

func waitForEndpoint(ctx context.Context, t *testing.T, client *rest.RESTClient, endpoint string) {
var lastError error
if err := wait.PollImmediateWithContext(ctx, 100*time.Millisecond, time.Minute, func(ctx context.Context) (bool, error) {
req := rest.NewRequest(client).RequestURI(endpoint)
_, err := req.Do(ctx).Raw()
if err != nil {
lastError = fmt.Errorf("error contacting %s: failed components: %v", req.URL(), unreadyComponentsFromError(err))
return false, nil
}

t.Logf("success contacting %s", req.URL())
return true, nil
}); err != nil && lastError != nil {
t.Error(lastError)
}
}

func monitorEndpoint(ctx context.Context, t *testing.T, client *rest.RESTClient, endpoint string) {
// we need a shorter deadline than the server, or else:
// timeout.go:135] post-timeout activity - time-elapsed: 23.784917ms, GET "/livez" result: Header called after Handler finished
if deadline, ok := t.Deadline(); ok {
deadlinedCtx, deadlinedCancel := context.WithDeadline(ctx, deadline.Add(-20*time.Second))
ctx = deadlinedCtx
t.Cleanup(deadlinedCancel) // this does not really matter but govet is upset
}
var errCount int
errs := sets.NewString()
wait.UntilWithContext(ctx, func(ctx context.Context) {
_, err := rest.NewRequest(client).RequestURI(endpoint).Do(ctx).Raw()
if errors.Is(err, context.Canceled) || ctx.Err() != nil {
return
}
// if we're noticing an error, record it and fail the test if things stay failed for two consecutive polls
if err != nil {
errCount++
errs.Insert(fmt.Sprintf("failed components: %v", unreadyComponentsFromError(err)))
if errCount == 2 {
t.Errorf("error contacting %s: %v", endpoint, errs.List())
}
}
// otherwise, reset the counters
errCount = 0
if errs.Len() > 0 {
errs = sets.NewString()
}
}, 100*time.Millisecond)
}

0 comments on commit 650772a

Please sign in to comment.