Skip to content

Commit

Permalink
tests: Validate etcd linearizability
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Sep 22, 2022
1 parent 997260a commit 320ef73
Show file tree
Hide file tree
Showing 21 changed files with 476 additions and 42 deletions.
1 change: 1 addition & 0 deletions etcdutl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/etcd/client/v2 v2.306.0-alpha.0 // indirect
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/trace v1.7.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions etcdutl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 h1:TcXBU/YdVROXQ7FUowVK1ih9gu2yi3YMLE+tQb9q964=
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0/go.mod h1:bOzzUWJ5bNHifkNkoIN6Ydf/z/UPT0bYuPghFYVC8+4=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ require (
github.com/stretchr/testify v1.7.2 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0 // indirect
go.opentelemetry.io/otel v1.7.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.7.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/anishathalye/porcupine v0.1.2/go.mod h1:/X9OQYnVb7DzfKCQVO4tI1Aq+o56UJW+RvN/5U4EuZA=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -327,6 +328,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 h1:TcXBU/YdVROXQ7FUowVK1ih9gu2yi3YMLE+tQb9q964=
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0/go.mod h1:bOzzUWJ5bNHifkNkoIN6Ydf/z/UPT0bYuPghFYVC8+4=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
106 changes: 76 additions & 30 deletions pkg/expect/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ const DEBUG_LINES_TAIL = 40
type ExpectProcess struct {
cfg expectConfig

cmd *exec.Cmd
fpty *os.File
wg sync.WaitGroup
// StopSignal is the signal Stop sends to the process; defaults to SIGTERM.
StopSignal os.Signal

cmd *exec.Cmd
fpty *os.File
closech chan struct{}
logsCollected sync.WaitGroup

mu sync.Mutex // protects lines and err
lines []string
Expand All @@ -50,35 +54,38 @@ type ExpectProcess struct {
// NewExpect creates a new process for expect testing.
func NewExpect(name string, arg ...string) (ep *ExpectProcess, err error) {
// if env[] is nil, use current system env and the default command as name
return NewExpectWithEnv(name, arg, nil, name)
return NewExpectWithEnv(name, arg, nil, name, false)
}

// NewExpectWithEnv creates a new process with user defined env variables for expect testing.
func NewExpectWithEnv(name string, args []string, env []string, serverProcessConfigName string) (ep *ExpectProcess, err error) {
func NewExpectWithEnv(name string, args []string, env []string, serverProcessConfigName string, restart bool) (ep *ExpectProcess, err error) {
ep = &ExpectProcess{
cfg: expectConfig{
name: serverProcessConfigName,
cmd: name,
args: args,
env: env,
name: serverProcessConfigName,
cmd: name,
args: args,
env: env,
restart: restart,
},
closech: make(chan struct{}),
}
ep.cmd = commandFromConfig(ep.cfg)

if ep.fpty, err = pty.Start(ep.cmd); err != nil {
return nil, err
}

ep.wg.Add(1)
ep.logsCollected.Add(1)
go ep.read()
return ep, nil
}

type expectConfig struct {
name string
cmd string
args []string
env []string
name string
cmd string
args []string
env []string
restart bool
}

func commandFromConfig(config expectConfig) *exec.Cmd {
Expand All @@ -94,23 +101,52 @@ func (ep *ExpectProcess) Pid() int {
}

func (ep *ExpectProcess) read() {
defer ep.wg.Done()
defer ep.logsCollected.Done()
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
r := bufio.NewReader(ep.fpty)
for {
l, err := r.ReadString('\n')
ep.mu.Lock()
if l != "" {
if printDebugLines {
fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, ep.cmd.Process.Pid, l)
cmd := ep.cmd
r := bufio.NewReader(ep.fpty)
ep.mu.Unlock()
if cmd == nil {
break
}
pid := cmd.Process.Pid
for {
l, err := r.ReadString('\n')
ep.mu.Lock()
if l != "" {
if printDebugLines {
fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, pid, l)
}
ep.lines = append(ep.lines, l)
ep.count++
}
ep.lines = append(ep.lines, l)
ep.count++
if err != nil {
ep.err = err
ep.mu.Unlock()
break
}
ep.mu.Unlock()
}
if err != nil {
select {
case <-ep.closech:
return
default:
}
ep.mu.Lock()
cmd = ep.cmd
ep.mu.Unlock()
if cmd != nil {
cmd.Wait()
}
ep.mu.Lock()
var err error
ep.cmd = commandFromConfig(ep.cfg)
if ep.fpty, err = pty.Start(ep.cmd); err != nil {
fmt.Printf("Error %s\n", err)
ep.err = err
ep.mu.Unlock()
break
ep.cmd = nil
}
ep.mu.Unlock()
}
Expand Down Expand Up @@ -179,7 +215,10 @@ func (ep *ExpectProcess) Stop() error { return ep.close(true) }

// Signal sends a signal to the expect process
func (ep *ExpectProcess) Signal(sig os.Signal) error {
return ep.cmd.Process.Signal(sig)
ep.mu.Lock()
err := ep.cmd.Process.Signal(sig)
ep.mu.Unlock()
return err
}

// Close waits for the expect process to exit.
Expand All @@ -188,16 +227,22 @@ func (ep *ExpectProcess) Signal(sig os.Signal) error {
func (ep *ExpectProcess) Close() error { return ep.close(false) }

func (ep *ExpectProcess) close(kill bool) error {
if ep.cmd == nil {
ep.mu.Lock()
cmd := ep.cmd
ep.mu.Unlock()

if cmd == nil {
return ep.err
}
close(ep.closech)

if kill {
ep.Signal(syscall.SIGTERM)
}

err := ep.cmd.Wait()
err := cmd.Wait()
ep.fpty.Close()
ep.wg.Wait()
ep.logsCollected.Wait()

if err != nil {
if !kill && strings.Contains(err.Error(), "exit status") {
Expand All @@ -207,8 +252,9 @@ func (ep *ExpectProcess) close(kill bool) error {
err = nil
}
}

ep.mu.Lock()
ep.cmd = nil
ep.mu.Unlock()
return err
}

Expand Down
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
go.etcd.io/etcd/client/v3 v3.6.0-alpha.0
go.etcd.io/etcd/pkg/v3 v3.6.0-alpha.0
go.etcd.io/etcd/raft/v3 v3.6.0-alpha.0
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.32.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.7.0
Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0 h1:TcXBU/YdVROXQ7FUowVK1ih9gu2yi3YMLE+tQb9q964=
go.etcd.io/gofail v0.0.0-20220826035847-d0d2a96a6ef0/go.mod h1:bOzzUWJ5bNHifkNkoIN6Ydf/z/UPT0bYuPghFYVC8+4=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down
1 change: 1 addition & 0 deletions tests/framework/config/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ type ClusterConfig struct {
QuotaBackendBytes int64
DisableStrictReconfigCheck bool
SnapshotCount int
Restart bool
}
5 changes: 5 additions & 0 deletions tests/framework/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, cfg config.Clus
QuotaBackendBytes: cfg.QuotaBackendBytes,
DisableStrictReconfigCheck: cfg.DisableStrictReconfigCheck,
SnapshotCount: cfg.SnapshotCount,
Restart: cfg.Restart,
}
switch cfg.ClientTLS {
case config.NoTLS:
Expand Down Expand Up @@ -89,6 +90,10 @@ func (c *e2eCluster) Client() Client {
return e2eClient{e2e.NewEtcdctl(c.Cfg, c.EndpointsV3())}
}

func (c *e2eCluster) Endpoints() []string {
return c.EndpointsV3()
}

func (c *e2eCluster) Members() (ms []Member) {
for _, proc := range c.EtcdProcessCluster.Procs {
ms = append(ms, e2eMember{EtcdProcess: proc, Cfg: c.Cfg})
Expand Down
10 changes: 9 additions & 1 deletion tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ type EtcdProcessClusterConfig struct {
CorruptCheckTime time.Duration
CompactHashCheckEnabled bool
CompactHashCheckTime time.Duration
Restart bool
GoFailEnabled bool
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
Expand Down Expand Up @@ -357,12 +359,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
if cfg.CompactHashCheckTime != 0 {
args = append(args, "--experimental-compact-hash-check-time", cfg.CompactHashCheckTime.String())
}
envVars := map[string]string{}
if cfg.GoFailEnabled {
port = (i+1)*10000 + 2381
envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", port)
}

etcdCfgs[i] = &EtcdServerProcessConfig{
lg: lg,
ExecPath: cfg.ExecPath,
Args: args,
EnvVars: cfg.EnvVars,
TlsArgs: cfg.TlsArgs(),
DataDirPath: dataDirPath,
KeepDataDir: cfg.KeepDataDir,
Expand All @@ -371,6 +377,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
Acurl: curl,
Murl: murl,
InitialToken: cfg.InitialToken,
Restart: cfg.Restart,
EnvVars: envVars,
}
}

Expand Down
2 changes: 1 addition & 1 deletion tests/framework/e2e/cluster_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (pp *proxyProc) start() error {
if pp.proc != nil {
panic("already started")
}
proc, err := SpawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil, pp.name)
proc, err := SpawnCmdWithLogger(pp.lg, append([]string{pp.execPath}, pp.args...), nil, pp.name, false)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import (
"testing"
"time"

"go.uber.org/zap"

"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/expect"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -80,6 +79,7 @@ type EtcdServerProcessConfig struct {

InitialToken string
InitialCluster string
Restart bool
}

func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
Expand All @@ -104,7 +104,7 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error {
panic("already started")
}
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name))
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name)
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name, ep.cfg.Restart)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion tests/framework/e2e/etcd_spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ func SpawnCmd(args []string, envVars map[string]string) (*expect.ExpectProcess,
}

func SpawnNamedCmd(processName string, args []string, envVars map[string]string) (*expect.ExpectProcess, error) {
return SpawnCmdWithLogger(zap.NewNop(), args, envVars, processName)
return SpawnCmdWithLogger(zap.NewNop(), args, envVars, processName, false)
}
2 changes: 1 addition & 1 deletion tests/framework/e2e/etcd_spawn_cov.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string
zap.String("working-dir", wd),
zap.String("name", name),
zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(cmd, allArgs, env, name)
return expect.NewExpectWithEnv(cmd, allArgs, env, name, false)
}

func getCovArgs() ([]string, error) {
Expand Down
14 changes: 9 additions & 5 deletions tests/framework/e2e/etcd_spawn_nocov.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

const noOutputLineCount = 0 // regular binaries emit no extra lines

func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string, name string) (*expect.ExpectProcess, error) {
func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string, name string, restart bool) (*expect.ExpectProcess, error) {
wd, err := os.Getwd()
if err != nil {
return nil, err
Expand All @@ -40,13 +40,17 @@ func SpawnCmdWithLogger(lg *zap.Logger, args []string, envVars map[string]string
zap.Strings("args", args),
zap.String("working-dir", wd),
zap.String("name", name),
zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(CtlBinPath, args[1:], env, name)
zap.Strings("environment-variables", env),
zap.Bool("restart", restart),
)
return expect.NewExpectWithEnv(CtlBinPath, args[1:], env, name, restart)
}
lg.Info("spawning process",
zap.Strings("args", args),
zap.String("working-dir", wd),
zap.String("name", name),
zap.Strings("environment-variables", env))
return expect.NewExpectWithEnv(args[0], args[1:], env, name)
zap.Strings("environment-variables", env),
zap.Bool("restart", restart),
)
return expect.NewExpectWithEnv(args[0], args[1:], env, name, restart)
}
1 change: 1 addition & 0 deletions tests/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Cluster interface {
Client() Client
WaitLeader(t testing.TB) int
Close() error
Endpoints() []string
}

type Member interface {
Expand Down
Loading

0 comments on commit 320ef73

Please sign in to comment.